diff options
Diffstat (limited to 'Lib/test/test_telnetlib.py')
-rw-r--r-- | Lib/test/test_telnetlib.py | 566 |
1 files changed, 278 insertions, 288 deletions
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py index 7fdb49e8e62..95eebf3cbc4 100644 --- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -1,39 +1,21 @@ import socket +import select import telnetlib import time -import Queue +import contextlib import unittest from unittest import TestCase -from test import test_support -threading = test_support.import_module('threading') - -HOST = test_support.HOST -EOF_sigil = object() - -def server(evt, serv, dataq=None): - """ Open a tcp server in three steps - 1) set evt to true to let the parent know we are ready - 2) [optional] if is not False, write the list of data from dataq.get() - to the socket. - """ +from test import support +threading = support.import_module('threading') + +HOST = support.HOST + +def server(evt, serv): serv.listen(5) evt.set() try: conn, addr = serv.accept() - if dataq: - data = '' - new_data = dataq.get(True, 0.5) - dataq.task_done() - for item in new_data: - if item == EOF_sigil: - break - if type(item) in [int, float]: - time.sleep(item) - else: - data += item - written = conn.send(data) - data = data[written:] conn.close() except socket.timeout: pass @@ -46,7 +28,7 @@ class GeneralTests(TestCase): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(60) # Safety net. Look issue 11812 - self.port = test_support.bind_port(self.sock) + self.port = support.bind_port(self.sock) self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) self.thread.setDaemon(True) self.thread.start() @@ -92,184 +74,217 @@ class GeneralTests(TestCase): self.assertEqual(telnet.sock.gettimeout(), 30) telnet.sock.close() -def _read_setUp(self): - self.evt = threading.Event() - self.dataq = Queue.Queue() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(10) - self.port = test_support.bind_port(self.sock) - self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) - self.thread.start() - self.evt.wait() - -def _read_tearDown(self): - self.thread.join() - -class ReadTests(TestCase): - setUp = _read_setUp - tearDown = _read_tearDown - - # use a similar approach to testing timeouts as test_timeout.py - # these will never pass 100% but make the fuzz big enough that it is rare - block_long = 0.6 - block_short = 0.3 - def test_read_until_A(self): +class SocketStub(object): + ''' a socket proxy that re-defines sendall() ''' + def __init__(self, reads=()): + self.reads = list(reads) # Intentionally make a copy. + self.writes = [] + self.block = False + def sendall(self, data): + self.writes.append(data) + def recv(self, size): + out = b'' + while self.reads and len(out) < size: + out += self.reads.pop(0) + if len(out) > size: + self.reads.insert(0, out[size:]) + out = out[:size] + return out + +class TelnetAlike(telnetlib.Telnet): + def fileno(self): + raise NotImplementedError() + def close(self): pass + def sock_avail(self): + return (not self.sock.block) + def msg(self, msg, *args): + with support.captured_stdout() as out: + telnetlib.Telnet.msg(self, msg, *args) + self._messages += out.getvalue() + return + +def mock_select(*s_args): + block = False + for l in s_args: + for fob in l: + if isinstance(fob, TelnetAlike): + block = fob.sock.block + if block: + return [[], [], []] + else: + return s_args + +class MockPoller(object): + test_case = None # Set during TestCase setUp. + + def __init__(self): + self._file_objs = [] + + def register(self, fd, eventmask): + self.test_case.assertTrue(hasattr(fd, 'fileno'), fd) + self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI) + self._file_objs.append(fd) + + def poll(self, timeout=None): + block = False + for fob in self._file_objs: + if isinstance(fob, TelnetAlike): + block = fob.sock.block + if block: + return [] + else: + return zip(self._file_objs, [select.POLLIN]*len(self._file_objs)) + + def unregister(self, fd): + self._file_objs.remove(fd) + +@contextlib.contextmanager +def test_socket(reads): + def new_conn(*ignored): + return SocketStub(reads) + try: + old_conn = socket.create_connection + socket.create_connection = new_conn + yield None + finally: + socket.create_connection = old_conn + return + +def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): + ''' return a telnetlib.Telnet object that uses a SocketStub with + reads queued up to be read ''' + for x in reads: + assert type(x) is bytes, x + with test_socket(reads): + telnet = cls('dummy', 0) + telnet._messages = '' # debuglevel output + if use_poll is not None: + if use_poll and not telnet._has_poll: + raise unittest.SkipTest('select.poll() required.') + telnet._has_poll = use_poll + return telnet + + +class ExpectAndReadTestCase(TestCase): + def setUp(self): + self.old_select = select.select + select.select = mock_select + self.old_poll = False + if hasattr(select, 'poll'): + self.old_poll = select.poll + select.poll = MockPoller + MockPoller.test_case = self + + def tearDown(self): + if self.old_poll: + MockPoller.test_case = None + select.poll = self.old_poll + select.select = self.old_select + + +class ReadTests(ExpectAndReadTestCase): + def test_read_until(self): """ - read_until(expected, [timeout]) - Read until the expected string has been seen, or a timeout is - hit (default is no timeout); may block. + read_until(expected, timeout=None) + test the blocking version of read_util """ - want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - data = telnet.read_until('match') - self.assertEqual(data, ''.join(want[:-2])) - - def test_read_until_B(self): - # test the timeout - it does NOT raise socket.timeout - want = ['hello', self.block_long, 'not seen', EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - data = telnet.read_until('not seen', self.block_short) - self.assertEqual(data, want[0]) - self.assertEqual(telnet.read_all(), 'not seen') + want = [b'xxxmatchyyy'] + telnet = test_telnet(want) + data = telnet.read_until(b'match') + self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads)) + + reads = [b'x' * 50, b'match', b'y' * 50] + expect = b''.join(reads[:-1]) + telnet = test_telnet(reads) + data = telnet.read_until(b'match') + self.assertEqual(data, expect) def test_read_until_with_poll(self): """Use select.poll() to implement telnet.read_until().""" - want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - if not telnet._has_poll: - raise unittest.SkipTest('select.poll() is required') - telnet._has_poll = True - self.dataq.join() - data = telnet.read_until('match') - self.assertEqual(data, ''.join(want[:-2])) + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=True) + select.select = lambda *_: self.fail('unexpected select() call.') + data = telnet.read_until(b'match') + self.assertEqual(data, b''.join(want[:-1])) def test_read_until_with_select(self): """Use select.select() to implement telnet.read_until().""" - want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - telnet._has_poll = False - self.dataq.join() - data = telnet.read_until('match') - self.assertEqual(data, ''.join(want[:-2])) - - def test_read_all_A(self): + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=False) + if self.old_poll: + select.poll = lambda *_: self.fail('unexpected poll() call.') + data = telnet.read_until(b'match') + self.assertEqual(data, b''.join(want[:-1])) + + def test_read_all(self): """ read_all() Read all data until EOF; may block. """ - want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + reads = [b'x' * 500, b'y' * 500, b'z' * 500] + expect = b''.join(reads) + telnet = test_telnet(reads) data = telnet.read_all() - self.assertEqual(data, ''.join(want[:-1])) + self.assertEqual(data, expect) return - def _test_blocking(self, func): - self.dataq.put([self.block_long, EOF_sigil]) - self.dataq.join() - start = time.time() - data = func() - self.assertTrue(self.block_short <= time.time() - start) - - def test_read_all_B(self): - self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) - - def test_read_all_C(self): - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - telnet.read_all() - telnet.read_all() # shouldn't raise - - def test_read_some_A(self): + def test_read_some(self): """ read_some() Read at least one byte or EOF; may block. """ # test 'at least one byte' - want = ['x' * 500, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - data = telnet.read_all() + telnet = test_telnet([b'x' * 500]) + data = telnet.read_some() self.assertTrue(len(data) >= 1) - - def test_read_some_B(self): # test EOF - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - self.assertEqual('', telnet.read_some()) - - def test_read_some_C(self): - self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) + telnet = test_telnet() + data = telnet.read_some() + self.assertEqual(b'', data) - def _test_read_any_eager_A(self, func_name): + def _read_eager(self, func_name): """ - read_very_eager() + read_*_eager() Read all data available already queued or on the socket, without blocking. """ - want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil] - expects = want[1] + want[2] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + want = b'x' * 100 + telnet = test_telnet([want]) func = getattr(telnet, func_name) - data = '' + telnet.sock.block = True + self.assertEqual(b'', func()) + telnet.sock.block = False + data = b'' while True: try: data += func() - self.assertTrue(expects.startswith(data)) except EOFError: break - self.assertEqual(expects, data) - - def _test_read_any_eager_B(self, func_name): - # test EOF - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) - func = getattr(telnet, func_name) - self.assertRaises(EOFError, func) - - # read_eager and read_very_eager make the same gaurantees - # (they behave differently but we only test the gaurantees) - def test_read_very_eager_A(self): - self._test_read_any_eager_A('read_very_eager') - def test_read_very_eager_B(self): - self._test_read_any_eager_B('read_very_eager') - def test_read_eager_A(self): - self._test_read_any_eager_A('read_eager') - def test_read_eager_B(self): - self._test_read_any_eager_B('read_eager') - # NB -- we need to test the IAC block which is mentioned in the docstring - # but not in the module docs - - def _test_read_any_lazy_B(self, func_name): - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - func = getattr(telnet, func_name) - telnet.fill_rawq() - self.assertRaises(EOFError, func) - - def test_read_lazy_A(self): - want = ['x' * 100, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) - self.assertEqual('', telnet.read_lazy()) - data = '' + self.assertEqual(data, want) + + def test_read_eager(self): + # read_eager and read_very_eager make the same gaurantees + # (they behave differently but we only test the gaurantees) + self._read_eager('read_eager') + self._read_eager('read_very_eager') + # NB -- we need to test the IAC block which is mentioned in the + # docstring but not in the module docs + + def read_very_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_very_lazy()) + while telnet.sock.reads: + telnet.fill_rawq() + data = telnet.read_very_lazy() + self.assertEqual(want, data) + self.assertRaises(EOFError, telnet.read_very_lazy) + + def test_read_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_lazy()) + data = b'' while True: try: read_data = telnet.read_lazy() @@ -278,41 +293,14 @@ class ReadTests(TestCase): telnet.fill_rawq() except EOFError: break - self.assertTrue(want[0].startswith(data)) - self.assertEqual(data, want[0]) - - def test_read_lazy_B(self): - self._test_read_any_lazy_B('read_lazy') - - def test_read_very_lazy_A(self): - want = ['x' * 100, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) - self.assertEqual('', telnet.read_very_lazy()) - data = '' - while True: - try: - read_data = telnet.read_very_lazy() - except EOFError: - break - data += read_data - if not read_data: - telnet.fill_rawq() - self.assertEqual('', telnet.cookedq) - telnet.process_rawq() - self.assertTrue(want[0].startswith(data)) - self.assertEqual(data, want[0]) - - def test_read_very_lazy_B(self): - self._test_read_any_lazy_B('read_very_lazy') + self.assertTrue(want.startswith(data)) + self.assertEqual(data, want) class nego_collector(object): def __init__(self, sb_getter=None): - self.seen = '' + self.seen = b'' self.sb_getter = sb_getter - self.sb_seen = '' + self.sb_seen = b'' def do_nego(self, sock, cmd, opt): self.seen += cmd + opt @@ -321,134 +309,136 @@ class nego_collector(object): self.sb_seen += sb_data tl = telnetlib + +class WriteTests(TestCase): + '''The only thing that write does is replace each tl.IAC for + tl.IAC+tl.IAC''' + + def test_write(self): + data_sample = [b'data sample without IAC', + b'data sample with' + tl.IAC + b' one IAC', + b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, + tl.IAC, + b''] + for data in data_sample: + telnet = test_telnet() + telnet.write(data) + written = b''.join(telnet.sock.writes) + self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written) + class OptionTests(TestCase): - setUp = _read_setUp - tearDown = _read_tearDown # RFC 854 commands cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] def _test_command(self, data): """ helper for testing IAC + cmd """ - self.setUp() - self.dataq.put(data) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + telnet = test_telnet(data) + data_len = len(b''.join(data)) nego = nego_collector() telnet.set_option_negotiation_callback(nego.do_nego) txt = telnet.read_all() cmd = nego.seen self.assertTrue(len(cmd) > 0) # we expect at least one command - self.assertIn(cmd[0], self.cmds) - self.assertEqual(cmd[1], tl.NOOPT) - self.assertEqual(len(''.join(data[:-1])), len(txt + cmd)) + self.assertIn(cmd[:1], self.cmds) + self.assertEqual(cmd[1:2], tl.NOOPT) + self.assertEqual(data_len, len(txt + cmd)) nego.sb_getter = None # break the nego => telnet cycle - self.tearDown() def test_IAC_commands(self): - # reset our setup - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - self.tearDown() - for cmd in self.cmds: - self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil]) - self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil]) - self._test_command([tl.IAC + cmd, EOF_sigil]) + self._test_command([tl.IAC, cmd]) + self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100]) + self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10]) # all at once - self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) - self.assertEqual('', telnet.read_sb_data()) + self._test_command([tl.IAC + cmd for (cmd) in self.cmds]) def test_SB_commands(self): # RFC 855, subnegotiations portion send = [tl.IAC + tl.SB + tl.IAC + tl.SE, tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, - tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE, - tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, - tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE, - EOF_sigil, + tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, + tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, + tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, ] - self.dataq.put(send) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + telnet = test_telnet(send) nego = nego_collector(telnet.read_sb_data) telnet.set_option_negotiation_callback(nego.do_nego) txt = telnet.read_all() - self.assertEqual(txt, '') - want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd' + self.assertEqual(txt, b'') + want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd' self.assertEqual(nego.sb_seen, want_sb_data) - self.assertEqual('', telnet.read_sb_data()) + self.assertEqual(b'', telnet.read_sb_data()) nego.sb_getter = None # break the nego => telnet cycle + def test_debuglevel_reads(self): + # test all the various places that self.msg(...) is called + given_a_expect_b = [ + # Telnet.fill_rawq + (b'a', ": recv b''\n"), + # Telnet.process_rawq + (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"), + (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"), + (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), + (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), + (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), + ] + for a, b in given_a_expect_b: + telnet = test_telnet([a]) + telnet.set_debuglevel(1) + txt = telnet.read_all() + self.assertIn(b, telnet._messages) + return -class ExpectTests(TestCase): - def setUp(self): - self.evt = threading.Event() - self.dataq = Queue.Queue() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(10) - self.port = test_support.bind_port(self.sock) - self.thread = threading.Thread(target=server, args=(self.evt,self.sock, - self.dataq)) - self.thread.start() - self.evt.wait() - - def tearDown(self): - self.thread.join() - - # use a similar approach to testing timeouts as test_timeout.py - # these will never pass 100% but make the fuzz big enough that it is rare - block_long = 0.6 - block_short = 0.3 - def test_expect_A(self): + def test_debuglevel_write(self): + telnet = test_telnet() + telnet.set_debuglevel(1) + telnet.write(b'xxx') + expected = "send b'xxx'\n" + self.assertIn(expected, telnet._messages) + + def test_debug_accepts_str_port(self): + # Issue 10695 + with test_socket([]): + telnet = TelnetAlike('dummy', '0') + telnet._messages = '' + telnet.set_debuglevel(1) + telnet.msg('test') + self.assertRegex(telnet._messages, r'0.*test') + + +class ExpectTests(ExpectAndReadTestCase): + def test_expect(self): """ expect(expected, [timeout]) Read until the expected string has been seen, or a timeout is hit (default is no timeout); may block. """ - want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - (_,_,data) = telnet.expect(['match']) - self.assertEqual(data, ''.join(want[:-2])) - - def test_expect_B(self): - # test the timeout - it does NOT raise socket.timeout - want = ['hello', self.block_long, 'not seen', EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - (_,_,data) = telnet.expect(['not seen'], self.block_short) - self.assertEqual(data, want[0]) - self.assertEqual(telnet.read_all(), 'not seen') + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want) + (_,_,data) = telnet.expect([b'match']) + self.assertEqual(data, b''.join(want[:-1])) def test_expect_with_poll(self): """Use select.poll() to implement telnet.expect().""" - want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - if not telnet._has_poll: - raise unittest.SkipTest('select.poll() is required') - telnet._has_poll = True - self.dataq.join() - (_,_,data) = telnet.expect(['match']) - self.assertEqual(data, ''.join(want[:-2])) + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=True) + select.select = lambda *_: self.fail('unexpected select() call.') + (_,_,data) = telnet.expect([b'match']) + self.assertEqual(data, b''.join(want[:-1])) def test_expect_with_select(self): """Use select.select() to implement telnet.expect().""" - want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - telnet._has_poll = False - self.dataq.join() - (_,_,data) = telnet.expect(['match']) - self.assertEqual(data, ''.join(want[:-2])) + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want, use_poll=False) + if self.old_poll: + select.poll = lambda *_: self.fail('unexpected poll() call.') + (_,_,data) = telnet.expect([b'match']) + self.assertEqual(data, b''.join(want[:-1])) def test_main(verbose=None): - test_support.run_unittest(GeneralTests, ReadTests, OptionTests, - ExpectTests) + support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests, + ExpectTests) if __name__ == '__main__': test_main() |