diff options
Diffstat (limited to 'Lib/test/test_smtpd.py')
-rw-r--r-- | Lib/test/test_smtpd.py | 397 |
1 files changed, 383 insertions, 14 deletions
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py index 93f14c4562b..6eb47f1deec 100644 --- a/Lib/test/test_smtpd.py +++ b/Lib/test/test_smtpd.py @@ -1,4 +1,5 @@ import unittest +import textwrap from test import support, mock_socket import socket import io @@ -7,15 +8,22 @@ import asyncore class DummyServer(smtpd.SMTPServer): - def __init__(self, localaddr, remoteaddr): - smtpd.SMTPServer.__init__(self, localaddr, remoteaddr) + def __init__(self, *args, **kwargs): + smtpd.SMTPServer.__init__(self, *args, **kwargs) self.messages = [] + if self._decode_data: + self.return_status = 'return status' + else: + self.return_status = b'return status' def process_message(self, peer, mailfrom, rcpttos, data): self.messages.append((peer, mailfrom, rcpttos, data)) - if data == 'return status': + if data == self.return_status: return '250 Okish' + def process_smtputf8_message(self, *args, **kwargs): + return '250 SMTPUTF8 message okish' + class DummyDispatcherBroken(Exception): pass @@ -31,9 +39,10 @@ class SMTPDServerTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket def test_process_message_unimplemented(self): - server = smtpd.SMTPServer('a', 'b') + server = smtpd.SMTPServer((support.HOST, 0), ('b', 0), + decode_data=True) conn, addr = server.accept() - channel = smtpd.SMTPChannel(server, conn, addr) + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) def write_line(line): channel.socket.queue_recv(line) @@ -45,19 +54,163 @@ class SMTPDServerTest(unittest.TestCase): write_line(b'DATA') self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') + def test_process_smtputf8_message_unimplemented(self): + server = smtpd.SMTPServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + + def write_line(line): + channel.socket.queue_recv(line) + channel.handle_read() + + write_line(b'EHLO example') + write_line(b'MAIL From: <eggs@example> BODY=8BITMIME SMTPUTF8') + write_line(b'RCPT To: <spam@example>') + write_line(b'DATA') + self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') + + def test_decode_data_default_warns(self): + with self.assertWarns(DeprecationWarning): + smtpd.SMTPServer((support.HOST, 0), ('b', 0)) + + def test_decode_data_and_enable_SMTPUTF8_raises(self): + self.assertRaises( + ValueError, + smtpd.SMTPServer, + (support.HOST, 0), + ('b', 0), + enable_SMTPUTF8=True, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + + +class DebuggingServerTest(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + + def send_data(self, channel, data, enable_SMTPUTF8=False): + def write_line(line): + channel.socket.queue_recv(line) + channel.handle_read() + write_line(b'EHLO example') + if enable_SMTPUTF8: + write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') + else: + write_line(b'MAIL From:eggs@example') + write_line(b'RCPT To:spam@example') + write_line(b'DATA') + write_line(data) + write_line(b'.') + + def test_process_message_with_decode_data_true(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nhello\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + From: test + X-Peer: peer-address + + hello + ------------ END MESSAGE ------------ + """)) + + def test_process_message_with_decode_data_false(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + decode_data=False) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + + def test_process_message_with_enable_SMTPUTF8_true(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + + def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', + enable_SMTPUTF8=True) + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ----- SMTPUTF8 MESSAGE FOLLOWS ------ + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + def tearDown(self): asyncore.close_all() asyncore.socket = smtpd.socket = socket +class TestFamilyDetection(unittest.TestCase): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + + @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") + def test_socket_uses_IPv6(self): + server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOST, 0), + decode_data=False) + self.assertEqual(server.socket.family, socket.AF_INET6) + + def test_socket_uses_IPv4(self): + server = smtpd.SMTPServer((support.HOST, 0), (support.HOSTv6, 0), + decode_data=False) + self.assertEqual(server.socket.family, socket.AF_INET) + + class SMTPDChannelTest(unittest.TestCase): def setUp(self): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer('a', 'b') + self.server = DummyServer((support.HOST, 0), ('b', 0), + decode_data=True) conn, addr = self.server.accept() - self.channel = smtpd.SMTPChannel(self.server, conn, addr) + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) def tearDown(self): asyncore.close_all() @@ -69,7 +222,9 @@ class SMTPDChannelTest(unittest.TestCase): self.channel.handle_read() def test_broken_connect(self): - self.assertRaises(DummyDispatcherBroken, BrokenDummyServer, 'a', 'b') + self.assertRaises( + DummyDispatcherBroken, BrokenDummyServer, + (support.HOST, 0), ('b', 0), decode_data=True) def test_server_accept(self): self.server.handle_accept() @@ -214,6 +369,12 @@ class SMTPDChannelTest(unittest.TestCase): self.assertEqual(self.channel.socket.last, b'500 Error: line too long\r\n') + def test_MAIL_command_rejects_SMTPUTF8_by_default(self): + self.write_line(b'EHLO example') + self.write_line( + b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8') + self.assertEqual(self.channel.socket.last[0:1], b'5') + def test_data_longer_than_default_data_size_limit(self): # Hack the default so we don't have to generate so much data. self.channel.data_size_limit = 1048 @@ -387,7 +548,10 @@ class SMTPDChannelTest(unittest.TestCase): self.write_line(b'data\r\nmore\r\n.') self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.assertEqual(self.server.messages, - [('peer', 'eggs@example', ['spam@example'], 'data\nmore')]) + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example'], + 'data\nmore')]) def test_DATA_syntax(self): self.write_line(b'HELO example') @@ -417,7 +581,10 @@ class SMTPDChannelTest(unittest.TestCase): self.write_line(b'DATA') self.write_line(b'data\r\n.') self.assertEqual(self.server.messages, - [('peer', 'eggs@example', ['spam@example','ham@example'], 'data')]) + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example','ham@example'], + 'data')]) def test_manual_status(self): # checks that the Channel is able to return a custom status message @@ -439,7 +606,10 @@ class SMTPDChannelTest(unittest.TestCase): self.write_line(b'DATA') self.write_line(b'data\r\n.') self.assertEqual(self.server.messages, - [('peer', 'foo@example', ['eggs@example'], 'data')]) + [(('peer-address', 'peer-port'), + 'foo@example', + ['eggs@example'], + 'data')]) def test_HELO_RSET(self): self.write_line(b'HELO example') @@ -502,6 +672,24 @@ class SMTPDChannelTest(unittest.TestCase): with support.check_warnings(('', DeprecationWarning)): self.channel._SMTPChannel__addr = 'spam' + def test_decode_data_default_warning(self): + with self.assertWarns(DeprecationWarning): + server = DummyServer((support.HOST, 0), ('b', 0)) + conn, addr = self.server.accept() + with self.assertWarns(DeprecationWarning): + smtpd.SMTPChannel(server, conn, addr) + +@unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") +class SMTPDChannelIPv6Test(SMTPDChannelTest): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((support.HOSTv6, 0), ('b', 0), + decode_data=True) + conn, addr = self.server.accept() + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): @@ -509,10 +697,12 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer('a', 'b') + self.server = DummyServer((support.HOST, 0), ('b', 0), + decode_data=True) conn, addr = self.server.accept() # Set DATA size limit to 32 bytes for easy testing - self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32) + self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, + decode_data=True) def tearDown(self): asyncore.close_all() @@ -536,7 +726,10 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): self.write_line(b'data\r\nmore\r\n.') self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.assertEqual(self.server.messages, - [('peer', 'eggs@example', ['spam@example'], 'data\nmore')]) + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example'], + 'data\nmore')]) def test_data_limit_dialog_too_much_data(self): self.write_line(b'HELO example') @@ -553,5 +746,181 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): b'552 Error: Too much mail data\r\n') +class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((support.HOST, 0), ('b', 0), + decode_data=False) + conn, addr = self.server.accept() + # Set decode_data to False + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=False) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_ascii_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'plain ascii text') + self.write_line(b'.') + self.assertEqual(self.channel.received_data, b'plain ascii text') + + def test_utf8_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'and some plain ascii') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' + b'and some plain ascii') + + +class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((support.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = self.server.accept() + # Set decode_data to True + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_ascii_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'plain ascii text') + self.write_line(b'.') + self.assertEqual(self.channel.received_data, 'plain ascii text') + + def test_utf8_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'and some plain ascii') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + 'utf8 enriched text: żźć\nand some plain ascii') + + +class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = self.server.accept() + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + enable_SMTPUTF8=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): + self.write_line(b'EHLO example') + self.write_line( + 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode( + 'utf-8') + ) + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_process_smtputf8_message(self): + self.write_line(b'EHLO example') + for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: + self.write_line(b'MAIL from: <a@example> ' + mail_parameters) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'rcpt to:<b@example.com>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'data') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'c\r\n.') + if mail_parameters == b'': + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + else: + self.assertEqual(self.channel.socket.last, + b'250 SMTPUTF8 message okish\r\n') + + def test_utf8_data(self): + self.write_line(b'EHLO example') + self.write_line( + 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8')) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line('RCPT To:späm@examplé'.encode('utf-8')) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + + def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): + self.write_line(b'ehlo example') + fill_len = (512 + 26 + 10) - len('mail from:<@example>') + self.write_line(b'MAIL from:<' + + b'a' * (fill_len + 1) + + b'@example>') + self.assertEqual(self.channel.socket.last, + b'500 Error: line too long\r\n') + self.write_line(b'MAIL from:<' + + b'a' * fill_len + + b'@example>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_multiple_emails_with_extended_command_length(self): + self.write_line(b'ehlo example') + fill_len = (512 + 26 + 10) - len('mail from:<@example>') + for char in [b'a', b'b', b'c']: + self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') + self.assertEqual(self.channel.socket.last[0:3], b'500') + self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'rcpt to:<hans@example.com>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'data') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'test\r\n.') + self.assertEqual(self.channel.socket.last[0:3], b'250') + if __name__ == "__main__": unittest.main() |