diff options
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 1393 |
1 files changed, 859 insertions, 534 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 9f5138719e8..4f254a98f32 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -2,8 +2,7 @@ import sys import unittest -from test import test_support -import asyncore +from test import support import socket import select import time @@ -11,66 +10,74 @@ import gc import os import errno import pprint -import urllib, urlparse +import tempfile +import urllib.request import traceback +import asyncore import weakref -import functools import platform +import functools + +ssl = support.import_module("ssl") + +PROTOCOLS = [ + ssl.PROTOCOL_SSLv3, + ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1 +] +if hasattr(ssl, 'PROTOCOL_SSLv2'): + PROTOCOLS.append(ssl.PROTOCOL_SSLv2) + +HOST = support.HOST + +data_file = lambda name: os.path.join(os.path.dirname(__file__), name) -from BaseHTTPServer import HTTPServer -from SimpleHTTPServer import SimpleHTTPRequestHandler +# The custom key and certificate files used in test_ssl are generated +# using Lib/test/make_ssl_certs.py. +# Other certificates are simply fetched from the Internet servers they +# are meant to authenticate. -ssl = test_support.import_module("ssl") +CERTFILE = data_file("keycert.pem") +BYTES_CERTFILE = os.fsencode(CERTFILE) +ONLYCERT = data_file("ssl_cert.pem") +ONLYKEY = data_file("ssl_key.pem") +BYTES_ONLYCERT = os.fsencode(ONLYCERT) +BYTES_ONLYKEY = os.fsencode(ONLYKEY) +CAPATH = data_file("capath") +BYTES_CAPATH = os.fsencode(CAPATH) + +SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem") + +EMPTYCERT = data_file("nullcert.pem") +BADCERT = data_file("badcert.pem") +WRONGCERT = data_file("XXXnonexisting.pem") +BADKEY = data_file("badkey.pem") +NOKIACERT = data_file("nokia.pem") -HOST = test_support.HOST -CERTFILE = None -SVN_PYTHON_ORG_ROOT_CERT = None def handle_error(prefix): exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) - if test_support.verbose: + if support.verbose: sys.stdout.write(prefix + exc_format) +def can_clear_options(): + # 0.9.8m or higher + return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) -class BasicTests(unittest.TestCase): +def no_sslv2_implies_sslv3_hello(): + # 0.9.7h or higher + return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) - def test_sslwrap_simple(self): - # A crude test for the legacy API - try: - ssl.sslwrap_simple(socket.socket(socket.AF_INET)) - except IOError, e: - if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that - pass - else: - raise - try: - ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) - except IOError, e: - if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that - pass - else: - raise # Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 def skip_if_broken_ubuntu_ssl(func): if hasattr(ssl, 'PROTOCOL_SSLv2'): - # We need to access the lower-level wrapper in order to create an - # implicit SSL context without trying to connect or listen. - try: - import _ssl - except ImportError: - # The returned function won't get executed, just ignore the error - pass @functools.wraps(func) def f(*args, **kwargs): try: - s = socket.socket(socket.AF_INET) - _ssl.sslwrap(s._sock, 0, None, None, - ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None) - except ssl.SSLError as e: + ssl.SSLContext(ssl.PROTOCOL_SSLv2) + except ssl.SSLError: if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and - platform.linux_distribution() == ('debian', 'squeeze/sid', '') - and 'Invalid SSL protocol variant specified' in str(e)): + platform.linux_distribution() == ('debian', 'squeeze/sid', '')): raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") return func(*args, **kwargs) return f @@ -88,10 +95,11 @@ class BasicSocketTests(unittest.TestCase): ssl.CERT_NONE ssl.CERT_OPTIONAL ssl.CERT_REQUIRED + self.assertIn(ssl.HAS_SNI, {True, False}) def test_random(self): v = ssl.RAND_status() - if test_support.verbose: + if support.verbose: sys.stdout.write("\n RAND_status is %d (%s)\n" % (v, (v and "sufficient randomness") or "insufficient randomness")) @@ -103,21 +111,29 @@ class BasicSocketTests(unittest.TestCase): # note that this uses an 'unofficial' function in _ssl.c, # provided solely for this test, to exercise the certificate # parsing code - p = ssl._ssl._test_decode_cert(CERTFILE, False) - if test_support.verbose: + p = ssl._ssl._test_decode_cert(CERTFILE) + if support.verbose: sys.stdout.write("\n" + pprint.pformat(p) + "\n") + self.assertEqual(p['issuer'], + ((('countryName', 'XY'),), + (('localityName', 'Castle Anthrax'),), + (('organizationName', 'Python Software Foundation'),), + (('commonName', 'localhost'),)) + ) + self.assertEqual(p['notAfter'], 'Oct 5 23:01:56 2020 GMT') + self.assertEqual(p['notBefore'], 'Oct 8 23:01:56 2010 GMT') + self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E') self.assertEqual(p['subject'], - ((('countryName', u'US'),), - (('stateOrProvinceName', u'Delaware'),), - (('localityName', u'Wilmington'),), - (('organizationName', u'Python Software Foundation'),), - (('organizationalUnitName', u'SSL'),), - (('commonName', u'somemachine.python.org'),)), + ((('countryName', 'XY'),), + (('localityName', 'Castle Anthrax'),), + (('organizationName', 'Python Software Foundation'),), + (('commonName', 'localhost'),)) ) + self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) # Issue #13034: the subjectAltName in some certificates # (notably projects.developer.nokia.com:443) wasn't parsed p = ssl._ssl._test_decode_cert(NOKIACERT) - if test_support.verbose: + if support.verbose: sys.stdout.write("\n" + pprint.pformat(p) + "\n") self.assertEqual(p['subjectAltName'], (('DNS', 'projects.developer.nokia.com'), @@ -140,7 +156,7 @@ class BasicSocketTests(unittest.TestCase): n = ssl.OPENSSL_VERSION_NUMBER t = ssl.OPENSSL_VERSION_INFO s = ssl.OPENSSL_VERSION - self.assertIsInstance(n, (int, long)) + self.assertIsInstance(n, int) self.assertIsInstance(t, tuple) self.assertIsInstance(s, str) # Some sanity checks follow @@ -163,25 +179,7 @@ class BasicSocketTests(unittest.TestCase): self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), (s, t)) - def test_ciphers(self): - if not test_support.is_resource_enabled('network'): - return - remote = ("svn.python.org", 443) - with test_support.transient_internet(remote[0]): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="ALL") - s.connect(remote) - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") - s.connect(remote) - # Error checking occurs when connecting, because the SSL context - # isn't created before. - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") - with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): - s.connect(remote) - - @test_support.cpython_only + @support.cpython_only def test_refcycle(self): # Issue #7943: an SSL object doesn't create reference cycles with # itself. @@ -192,9 +190,8 @@ class BasicSocketTests(unittest.TestCase): self.assertEqual(wr(), None) def test_wrapped_unconnected(self): - # The _delegate_methods in socket.py are correctly delegated to by an - # unconnected SSLSocket, so they will raise a socket.error rather than - # something unexpected like TypeError. + # Methods on an unconnected SSLSocket propagate the original + # socket.error raise by the underlying socket object. s = socket.socket(socket.AF_INET) ss = ssl.wrap_socket(s) self.assertRaises(socket.error, ss.recv, 1) @@ -204,41 +201,305 @@ class BasicSocketTests(unittest.TestCase): self.assertRaises(socket.error, ss.send, b'x') self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) + def test_timeout(self): + # Issue #8524: when creating an SSL socket, the timeout of the + # original socket should be retained. + for timeout in (None, 0.0, 5.0): + s = socket.socket(socket.AF_INET) + s.settimeout(timeout) + ss = ssl.wrap_socket(s) + self.assertEqual(timeout, ss.gettimeout()) + + def test_errors(self): + sock = socket.socket() + self.assertRaisesRegex(ValueError, + "certfile must be specified", + ssl.wrap_socket, sock, keyfile=CERTFILE) + self.assertRaisesRegex(ValueError, + "certfile must be specified for server-side operations", + ssl.wrap_socket, sock, server_side=True) + self.assertRaisesRegex(ValueError, + "certfile must be specified for server-side operations", + ssl.wrap_socket, sock, server_side=True, certfile="") + s = ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) + self.assertRaisesRegex(ValueError, "can't connect in server-side mode", + s.connect, (HOST, 8080)) + with self.assertRaises(IOError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, certfile=WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaises(IOError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaises(IOError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + + def test_match_hostname(self): + def ok(cert, hostname): + ssl.match_hostname(cert, hostname) + def fail(cert, hostname): + self.assertRaises(ssl.CertificateError, + ssl.match_hostname, cert, hostname) + + cert = {'subject': ((('commonName', 'example.com'),),)} + ok(cert, 'example.com') + ok(cert, 'ExAmple.cOm') + fail(cert, 'www.example.com') + fail(cert, '.example.com') + fail(cert, 'example.org') + fail(cert, 'exampleXcom') + + cert = {'subject': ((('commonName', '*.a.com'),),)} + ok(cert, 'foo.a.com') + fail(cert, 'bar.foo.a.com') + fail(cert, 'a.com') + fail(cert, 'Xa.com') + fail(cert, '.a.com') + + cert = {'subject': ((('commonName', 'a.*.com'),),)} + ok(cert, 'a.foo.com') + fail(cert, 'a..com') + fail(cert, 'a.com') + + cert = {'subject': ((('commonName', 'f*.com'),),)} + ok(cert, 'foo.com') + ok(cert, 'f.com') + fail(cert, 'bar.com') + fail(cert, 'foo.a.com') + fail(cert, 'bar.foo.com') + + # Slightly fake real-world example + cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', + 'subject': ((('commonName', 'linuxfrz.org'),),), + 'subjectAltName': (('DNS', 'linuxfr.org'), + ('DNS', 'linuxfr.com'), + ('othername', '<unsupported>'))} + ok(cert, 'linuxfr.org') + ok(cert, 'linuxfr.com') + # Not a "DNS" entry + fail(cert, '<unsupported>') + # When there is a subjectAltName, commonName isn't used + fail(cert, 'linuxfrz.org') + + # A pristine real-world example + cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),), + (('commonName', 'mail.google.com'),))} + ok(cert, 'mail.google.com') + fail(cert, 'gmail.com') + # Only commonName is considered + fail(cert, 'California') + + # Neither commonName nor subjectAltName + cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),))} + fail(cert, 'mail.google.com') + + # No DNS entry in subjectAltName but a commonName + cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('commonName', 'mail.google.com'),)), + 'subjectAltName': (('othername', 'blabla'), )} + ok(cert, 'mail.google.com') + + # No DNS entry subjectAltName and no commonName + cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),)), + 'subjectAltName': (('othername', 'blabla'),)} + fail(cert, 'google.com') + + # Empty cert / no cert + self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') + self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') + + def test_server_side(self): + # server_hostname doesn't work for server sockets + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with socket.socket() as sock: + self.assertRaises(ValueError, ctx.wrap_socket, sock, True, + server_hostname="some.hostname") + +class ContextTests(unittest.TestCase): + + @skip_if_broken_ubuntu_ssl + def test_constructor(self): + if hasattr(ssl, 'PROTOCOL_SSLv2'): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv2) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv3) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ssl.SSLContext) + self.assertRaises(ValueError, ssl.SSLContext, -1) + self.assertRaises(ValueError, ssl.SSLContext, 42) + + @skip_if_broken_ubuntu_ssl + def test_protocol(self): + for proto in PROTOCOLS: + ctx = ssl.SSLContext(proto) + self.assertEqual(ctx.protocol, proto) + + def test_ciphers(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_ciphers("ALL") + ctx.set_ciphers("DEFAULT") + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + ctx.set_ciphers("^$:,;?*'dorothyx") + + @skip_if_broken_ubuntu_ssl + def test_options(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # OP_ALL is the default value + self.assertEqual(ssl.OP_ALL, ctx.options) + ctx.options |= ssl.OP_NO_SSLv2 + self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2, + ctx.options) + ctx.options |= ssl.OP_NO_SSLv3 + self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3, + ctx.options) + if can_clear_options(): + ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1 + self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3, + ctx.options) + ctx.options = 0 + self.assertEqual(0, ctx.options) + else: + with self.assertRaises(ValueError): + ctx.options = 0 + + def test_verify(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # Default value + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + ctx.verify_mode = ssl.CERT_OPTIONAL + self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) + ctx.verify_mode = ssl.CERT_REQUIRED + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + ctx.verify_mode = ssl.CERT_NONE + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + with self.assertRaises(TypeError): + ctx.verify_mode = None + with self.assertRaises(ValueError): + ctx.verify_mode = 42 + + def test_load_cert_chain(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # Combined key and cert in a single file + ctx.load_cert_chain(CERTFILE) + ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) + self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) + with self.assertRaises(IOError) as cm: + ctx.load_cert_chain(WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(BADCERT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(EMPTYCERT) + # Separate key and cert + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_cert_chain(ONLYCERT, ONLYKEY) + ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY) + ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(ONLYCERT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(ONLYKEY) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT) + # Mismatching key and cert + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"): + ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY) + + def test_load_verify_locations(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_verify_locations(CERTFILE) + ctx.load_verify_locations(cafile=CERTFILE, capath=None) + ctx.load_verify_locations(BYTES_CERTFILE) + ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) + self.assertRaises(TypeError, ctx.load_verify_locations) + self.assertRaises(TypeError, ctx.load_verify_locations, None, None) + with self.assertRaises(IOError) as cm: + ctx.load_verify_locations(WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_verify_locations(BADCERT) + ctx.load_verify_locations(CERTFILE, CAPATH) + ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) + + # Issue #10989: crash if the second argument type is invalid + self.assertRaises(TypeError, ctx.load_verify_locations, None, True) + + @skip_if_broken_ubuntu_ssl + def test_session_stats(self): + for proto in PROTOCOLS: + ctx = ssl.SSLContext(proto) + self.assertEqual(ctx.session_stats(), { + 'number': 0, + 'connect': 0, + 'connect_good': 0, + 'connect_renegotiate': 0, + 'accept': 0, + 'accept_good': 0, + 'accept_renegotiate': 0, + 'hits': 0, + 'misses': 0, + 'timeouts': 0, + 'cache_full': 0, + }) + + def test_set_default_verify_paths(self): + # There's not much we can do to test that it acts as expected, + # so just check it doesn't crash or raise an exception. + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_default_verify_paths() + class NetworkedTests(unittest.TestCase): def test_connect(self): - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_NONE) - s.connect(("svn.python.org", 443)) - c = s.getpeercert() - if c: - self.fail("Peer cert %s shouldn't be here!") - s.close() - - # this should fail because we have no verification certs - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED) try: s.connect(("svn.python.org", 443)) - except ssl.SSLError: - pass + self.assertEqual({}, s.getpeercert()) finally: s.close() + # this should fail because we have no verification certs + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, ("svn.python.org", 443)) + s.close() + # this should succeed because we specify the root cert s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SVN_PYTHON_ORG_ROOT_CERT) try: s.connect(("svn.python.org", 443)) + self.assertTrue(s.getpeercert()) finally: s.close() def test_connect_ex(self): # Issue #11326: check connect_ex() implementation - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SVN_PYTHON_ORG_ROOT_CERT) @@ -251,7 +512,7 @@ class NetworkedTests(unittest.TestCase): def test_non_blocking_connect_ex(self): # Issue #11326: non-blocking connect_ex() should allow handshake # to proceed after the socket gets ready. - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SVN_PYTHON_ORG_ROOT_CERT, @@ -283,7 +544,7 @@ class NetworkedTests(unittest.TestCase): def test_timeout_connect_ex(self): # Issue #12065: on a timeout, connect_ex() should return the original # errno (mimicking the behaviour of non-SSL sockets). - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SVN_PYTHON_ORG_ROOT_CERT, @@ -298,7 +559,7 @@ class NetworkedTests(unittest.TestCase): s.close() def test_connect_ex_error(self): - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SVN_PYTHON_ORG_ROOT_CERT) @@ -308,12 +569,75 @@ class NetworkedTests(unittest.TestCase): finally: s.close() + def test_connect_with_context(self): + with support.transient_internet("svn.python.org"): + # Same as test_connect, but with a separately created context + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + self.assertEqual({}, s.getpeercert()) + finally: + s.close() + # Same with a server hostname + s = ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="svn.python.org") + if ssl.HAS_SNI: + s.connect(("svn.python.org", 443)) + s.close() + else: + self.assertRaises(ValueError, s.connect, ("svn.python.org", 443)) + # This should fail because we have no verification certs + ctx.verify_mode = ssl.CERT_REQUIRED + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, ("svn.python.org", 443)) + s.close() + # This should succeed because we specify the root cert + ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + + def test_connect_capath(self): + # Verify server certificates using the `capath` argument + # NOTE: the subject hashing algorithm has been changed between + # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must + # contain both versions of each certificate (same content, different + # filename) for this test to be portable across OpenSSL releases. + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + # Same with a bytes `capath` argument + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=BYTES_CAPATH) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") def test_makefile_close(self): # Issue #5238: creating a file-like object with makefile() shouldn't # delay closing the underlying "real socket" (here tested with its # file descriptor, hence skipping the test under Windows). - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) ss.connect(("svn.python.org", 443)) fd = ss.fileno() @@ -329,7 +653,7 @@ class NetworkedTests(unittest.TestCase): self.assertEqual(e.exception.errno, errno.EBADF) def test_non_blocking_handshake(self): - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): s = socket.socket(socket.AF_INET) s.connect(("svn.python.org", 443)) s.setblocking(False) @@ -342,7 +666,7 @@ class NetworkedTests(unittest.TestCase): count += 1 s.do_handshake() break - except ssl.SSLError, err: + except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_WANT_READ: select.select([s], [], []) elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: @@ -350,46 +674,67 @@ class NetworkedTests(unittest.TestCase): else: raise s.close() - if test_support.verbose: + if support.verbose: sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) def test_get_server_certificate(self): - with test_support.transient_internet("svn.python.org"): + with support.transient_internet("svn.python.org"): pem = ssl.get_server_certificate(("svn.python.org", 443)) if not pem: self.fail("No server certificate on svn.python.org:443!") try: pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) - except ssl.SSLError: + except ssl.SSLError as x: #should fail - pass + if support.verbose: + sys.stdout.write("%s\n" % x) else: self.fail("Got server certificate %s for svn.python.org!" % pem) pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) if not pem: self.fail("No server certificate on svn.python.org:443!") - if test_support.verbose: + if support.verbose: sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) + def test_ciphers(self): + remote = ("svn.python.org", 443) + with support.transient_internet(remote[0]): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="ALL") + s.connect(remote) + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") + s.connect(remote) + # Error checking can happen at instantiation or when connecting + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + with socket.socket(socket.AF_INET) as sock: + s = ssl.wrap_socket(sock, + cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") + s.connect(remote) + def test_algorithms(self): # Issue #8484: all algorithms should be available when verifying a # certificate. # SHA256 was added in OpenSSL 0.9.8 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) - self.skipTest("remote host needs SNI, only available on Python 3.2+") - # NOTE: https://sha2.hboeck.de is another possible test host + # sha256.tbs-internet.com needs SNI to use the correct certificate + if not ssl.HAS_SNI: + self.skipTest("SNI needed for this test") + # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) remote = ("sha256.tbs-internet.com", 443) sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") - with test_support.transient_internet("sha256.tbs-internet.com"): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=sha256_cert,) + with support.transient_internet("sha256.tbs-internet.com"): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(sha256_cert) + s = ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="sha256.tbs-internet.com") try: s.connect(remote) - if test_support.verbose: + if support.verbose: sys.stdout.write("\nCipher with %r is %r\n" % (remote, s.cipher())) sys.stdout.write("Certificate is:\n%s\n" % @@ -405,6 +750,8 @@ except ImportError: else: _have_threads = True + from test.ssl_servers import make_https_server + class ThreadedEchoServer(threading.Thread): class ConnectionHandler(threading.Thread): @@ -413,48 +760,42 @@ else: with and without the SSL wrapper around the socket connection, so that we can test the STARTTLS functionality.""" - def __init__(self, server, connsock): + def __init__(self, server, connsock, addr): self.server = server self.running = False self.sock = connsock + self.addr = addr self.sock.setblocking(1) self.sslconn = None threading.Thread.__init__(self) self.daemon = True - def show_conn_details(self): - if self.server.certreqs == ssl.CERT_REQUIRED: - cert = self.sslconn.getpeercert() - if test_support.verbose and self.server.chatty: - sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") - cert_binary = self.sslconn.getpeercert(True) - if test_support.verbose and self.server.chatty: - sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") - cipher = self.sslconn.cipher() - if test_support.verbose and self.server.chatty: - sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") - def wrap_conn(self): try: - self.sslconn = ssl.wrap_socket(self.sock, server_side=True, - certfile=self.server.certificate, - ssl_version=self.server.protocol, - ca_certs=self.server.cacerts, - cert_reqs=self.server.certreqs, - ciphers=self.server.ciphers) + self.sslconn = self.server.context.wrap_socket( + self.sock, server_side=True) except ssl.SSLError as e: # XXX Various errors can have happened here, for example # a mismatching protocol version, an invalid certificate, # or a low-level bug. This should be made more discriminating. self.server.conn_errors.append(e) if self.server.chatty: - handle_error("\n server: bad connection attempt from " + - str(self.sock.getpeername()) + ":\n") - self.close() + handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") self.running = False self.server.stop() + self.close() return False else: + if self.server.context.verify_mode == ssl.CERT_REQUIRED: + cert = self.sslconn.getpeercert() + if support.verbose and self.server.chatty: + sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") + cert_binary = self.sslconn.getpeercert(True) + if support.verbose and self.server.chatty: + sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") + cipher = self.sslconn.cipher() + if support.verbose and self.server.chatty: + sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") return True def read(self): @@ -473,50 +814,50 @@ else: if self.sslconn: self.sslconn.close() else: - self.sock._sock.close() + self.sock.close() def run(self): self.running = True if not self.server.starttls_server: - if isinstance(self.sock, ssl.SSLSocket): - self.sslconn = self.sock - elif not self.wrap_conn(): + if not self.wrap_conn(): return - self.show_conn_details() while self.running: try: msg = self.read() - if not msg: + stripped = msg.strip() + if not stripped: # eof, so quit this handler self.running = False self.close() - elif msg.strip() == 'over': - if test_support.verbose and self.server.connectionchatty: + elif stripped == b'over': + if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: client closed connection\n") self.close() return - elif self.server.starttls_server and msg.strip() == 'STARTTLS': - if test_support.verbose and self.server.connectionchatty: + elif (self.server.starttls_server and + stripped == b'STARTTLS'): + if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") - self.write("OK\n") + self.write(b"OK\n") if not self.wrap_conn(): return - elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS': - if test_support.verbose and self.server.connectionchatty: + elif (self.server.starttls_server and self.sslconn + and stripped == b'ENDTLS'): + if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") - self.write("OK\n") - self.sslconn.unwrap() + self.write(b"OK\n") + self.sock = self.sslconn.unwrap() self.sslconn = None - if test_support.verbose and self.server.connectionchatty: + if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: connection is now unencrypted...\n") else: - if (test_support.verbose and + if (support.verbose and self.server.connectionchatty): ctype = (self.sslconn and "encrypted") or "unencrypted" - sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n" - % (repr(msg), ctype, repr(msg.lower()), ctype)) + sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n" + % (msg, ctype, msg.lower(), ctype)) self.write(msg.lower()) - except ssl.SSLError: + except socket.error: if self.server.chatty: handle_error("Test server failure:\n") self.close() @@ -525,35 +866,30 @@ else: # harness, we want to stop the server self.server.stop() - def __init__(self, certificate, ssl_version=None, + def __init__(self, certificate=None, ssl_version=None, certreqs=None, cacerts=None, chatty=True, connectionchatty=False, starttls_server=False, - wrap_accepting_socket=False, ciphers=None): - - if ssl_version is None: - ssl_version = ssl.PROTOCOL_TLSv1 - if certreqs is None: - certreqs = ssl.CERT_NONE - self.certificate = certificate - self.protocol = ssl_version - self.certreqs = certreqs - self.cacerts = cacerts - self.ciphers = ciphers + ciphers=None, context=None): + if context: + self.context = context + else: + self.context = ssl.SSLContext(ssl_version + if ssl_version is not None + else ssl.PROTOCOL_TLSv1) + self.context.verify_mode = (certreqs if certreqs is not None + else ssl.CERT_NONE) + if cacerts: + self.context.load_verify_locations(cacerts) + if certificate: + self.context.load_cert_chain(certificate) + if ciphers: + self.context.set_ciphers(ciphers) self.chatty = chatty self.connectionchatty = connectionchatty self.starttls_server = starttls_server self.sock = socket.socket() + self.port = support.bind_port(self.sock) self.flag = None - if wrap_accepting_socket: - self.sock = ssl.wrap_socket(self.sock, server_side=True, - certfile=self.certificate, - cert_reqs = self.certreqs, - ca_certs = self.cacerts, - ssl_version = self.protocol, - ciphers = self.ciphers) - if test_support.verbose and self.chatty: - sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock)) - self.port = test_support.bind_port(self.sock) self.active = False self.conn_errors = [] threading.Thread.__init__(self) @@ -582,10 +918,10 @@ else: while self.active: try: newconn, connaddr = self.sock.accept() - if test_support.verbose and self.chatty: + if support.verbose and self.chatty: sys.stdout.write(' server: new connection from ' - + str(connaddr) + '\n') - handler = self.ConnectionHandler(self, newconn) + + repr(connaddr) + '\n') + handler = self.ConnectionHandler(self, newconn, connaddr) handler.start() handler.join() except socket.timeout: @@ -599,16 +935,19 @@ else: class AsyncoreEchoServer(threading.Thread): - class EchoServer(asyncore.dispatcher): + # this one's based on asyncore.dispatcher - class ConnectionHandler(asyncore.dispatcher_with_send): + class EchoServer (asyncore.dispatcher): + + class ConnectionHandler (asyncore.dispatcher_with_send): def __init__(self, conn, certfile): - asyncore.dispatcher_with_send.__init__(self, conn) self.socket = ssl.wrap_socket(conn, server_side=True, certfile=certfile, do_handshake_on_connect=False) + asyncore.dispatcher_with_send.__init__(self, self.socket) self._ssl_accepting = True + self._do_ssl_handshake() def readable(self): if isinstance(self.socket, ssl.SSLSocket): @@ -619,14 +958,14 @@ else: def _do_ssl_handshake(self): try: self.socket.do_handshake() - except ssl.SSLError, err: + except ssl.SSLError as err: if err.args[0] in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE): return elif err.args[0] == ssl.SSL_ERROR_EOF: return self.handle_close() raise - except socket.error, err: + except socket.error as err: if err.args[0] == errno.ECONNABORTED: return self.handle_close() else: @@ -637,12 +976,16 @@ else: self._do_ssl_handshake() else: data = self.recv(1024) - if data and data.strip() != 'over': + if support.verbose: + sys.stdout.write(" server: read %s from client\n" % repr(data)) + if not data: + self.close() + else: self.send(data.lower()) def handle_close(self): self.close() - if test_support.verbose: + if support.verbose: sys.stdout.write(" server: closed connection %s\n" % self.socket) def handle_error(self): @@ -650,14 +993,13 @@ else: def __init__(self, certfile): self.certfile = certfile - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = test_support.bind_port(self.socket) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.port = support.bind_port(sock, '') + asyncore.dispatcher.__init__(self, sock) self.listen(5) - def handle_accept(self): - sock_obj, addr = self.accept() - if test_support.verbose: + def handle_accepted(self, sock_obj, addr): + if support.verbose: sys.stdout.write(" server: new connection from %s:%s\n" %addr) self.ConnectionHandler(sock_obj, self.certfile) @@ -681,16 +1023,16 @@ else: return self def __exit__(self, *args): - if test_support.verbose: + if support.verbose: sys.stdout.write(" cleanup: stopping server.\n") self.stop() - if test_support.verbose: + if support.verbose: sys.stdout.write(" cleanup: joining server thread.\n") self.join() - if test_support.verbose: + if support.verbose: sys.stdout.write(" cleanup: successfully joined.\n") - def start(self, flag=None): + def start (self, flag=None): self.flag = flag threading.Thread.start(self) @@ -699,103 +1041,15 @@ else: if self.flag: self.flag.set() while self.active: - asyncore.loop(0.05) + try: + asyncore.loop(1) + except: + pass def stop(self): self.active = False self.server.close() - class SocketServerHTTPSServer(threading.Thread): - - class HTTPSServer(HTTPServer): - - def __init__(self, server_address, RequestHandlerClass, certfile): - HTTPServer.__init__(self, server_address, RequestHandlerClass) - # we assume the certfile contains both private key and certificate - self.certfile = certfile - self.allow_reuse_address = True - - def __str__(self): - return ('<%s %s:%s>' % - (self.__class__.__name__, - self.server_name, - self.server_port)) - - def get_request(self): - # override this to wrap socket with SSL - sock, addr = self.socket.accept() - sslconn = ssl.wrap_socket(sock, server_side=True, - certfile=self.certfile) - return sslconn, addr - - class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): - # need to override translate_path to get a known root, - # instead of using os.curdir, since the test could be - # run from anywhere - - server_version = "TestHTTPS/1.0" - - root = None - - def translate_path(self, path): - """Translate a /-separated PATH to the local filename syntax. - - Components that mean special things to the local file system - (e.g. drive or directory names) are ignored. (XXX They should - probably be diagnosed.) - - """ - # abandon query parameters - path = urlparse.urlparse(path)[2] - path = os.path.normpath(urllib.unquote(path)) - words = path.split('/') - words = filter(None, words) - path = self.root - for word in words: - drive, word = os.path.splitdrive(word) - head, word = os.path.split(word) - if word in self.root: continue - path = os.path.join(path, word) - return path - - def log_message(self, format, *args): - - # we override this to suppress logging unless "verbose" - - if test_support.verbose: - sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % - (self.server.server_address, - self.server.server_port, - self.request.cipher(), - self.log_date_time_string(), - format%args)) - - - def __init__(self, certfile): - self.flag = None - self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] - self.server = self.HTTPSServer( - (HOST, 0), self.RootedHTTPRequestHandler, certfile) - self.port = self.server.server_port - threading.Thread.__init__(self) - self.daemon = True - - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, self.server) - - def start(self, flag=None): - self.flag = flag - threading.Thread.start(self) - - def run(self): - if self.flag: - self.flag.set() - self.server.serve_forever(0.05) - - def stop(self): - self.server.shutdown() - - def bad_cert_test(certfile): """ Launch a server with CERT_REQUIRED, and check that trying to @@ -803,74 +1057,64 @@ else: """ server = ThreadedEchoServer(CERTFILE, certreqs=ssl.CERT_REQUIRED, - cacerts=CERTFILE, chatty=False) + cacerts=CERTFILE, chatty=False, + connectionchatty=False) with server: try: - s = ssl.wrap_socket(socket.socket(), - certfile=certfile, - ssl_version=ssl.PROTOCOL_TLSv1) - s.connect((HOST, server.port)) - except ssl.SSLError, x: - if test_support.verbose: - sys.stdout.write("\nSSLError is %s\n" % x[1]) - except socket.error, x: - if test_support.verbose: - sys.stdout.write("\nsocket.error is %s\n" % x[1]) + with socket.socket() as sock: + s = ssl.wrap_socket(sock, + certfile=certfile, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) + except ssl.SSLError as x: + if support.verbose: + sys.stdout.write("\nSSLError is %s\n" % x.args[1]) + except socket.error as x: + if support.verbose: + sys.stdout.write("\nsocket.error is %s\n" % x.args[1]) + except IOError as x: + if x.errno != errno.ENOENT: + raise + if support.verbose: + sys.stdout.write("\IOError is %s\n" % str(x)) else: raise AssertionError("Use of invalid cert should have failed!") - def server_params_test(certfile, protocol, certreqs, cacertsfile, - client_certfile, client_protocol=None, indata="FOO\n", - ciphers=None, chatty=True, connectionchatty=False, - wrap_accepting_socket=False): + def server_params_test(client_context, server_context, indata=b"FOO\n", + chatty=True, connectionchatty=False): """ Launch a server, connect a client to it and try various reads and writes. """ - server = ThreadedEchoServer(certfile, - certreqs=certreqs, - ssl_version=protocol, - cacerts=cacertsfile, - ciphers=ciphers, + server = ThreadedEchoServer(context=server_context, chatty=chatty, - connectionchatty=connectionchatty, - wrap_accepting_socket=wrap_accepting_socket) + connectionchatty=False) with server: - # try to connect - if client_protocol is None: - client_protocol = protocol - s = ssl.wrap_socket(socket.socket(), - certfile=client_certfile, - ca_certs=cacertsfile, - ciphers=ciphers, - cert_reqs=certreqs, - ssl_version=client_protocol) - s.connect((HOST, server.port)) - for arg in [indata, bytearray(indata), memoryview(indata)]: - if connectionchatty: - if test_support.verbose: - sys.stdout.write( - " client: sending %s...\n" % (repr(arg))) - s.write(arg) - outdata = s.read() + with client_context.wrap_socket(socket.socket()) as s: + s.connect((HOST, server.port)) + for arg in [indata, bytearray(indata), memoryview(indata)]: + if connectionchatty: + if support.verbose: + sys.stdout.write( + " client: sending %r...\n" % indata) + s.write(arg) + outdata = s.read() + if connectionchatty: + if support.verbose: + sys.stdout.write(" client: read %r\n" % outdata) + if outdata != indata.lower(): + raise AssertionError( + "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" + % (outdata[:20], len(outdata), + indata[:20].lower(), len(indata))) + s.write(b"over\n") if connectionchatty: - if test_support.verbose: - sys.stdout.write(" client: read %s\n" % repr(outdata)) - if outdata != indata.lower(): - raise AssertionError( - "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" - % (outdata[:min(len(outdata),20)], len(outdata), - indata[:min(len(indata),20)].lower(), len(indata))) - s.write("over\n") - if connectionchatty: - if test_support.verbose: - sys.stdout.write(" client: closing connection.\n") - s.close() + if support.verbose: + sys.stdout.write(" client: closing connection.\n") + s.close() - def try_protocol_combo(server_protocol, - client_protocol, - expect_success, - certsreqs=None): + def try_protocol_combo(server_protocol, client_protocol, expect_success, + certsreqs=None, server_options=0, client_options=0): if certsreqs is None: certsreqs = ssl.CERT_NONE certtype = { @@ -878,19 +1122,27 @@ else: ssl.CERT_OPTIONAL: "CERT_OPTIONAL", ssl.CERT_REQUIRED: "CERT_REQUIRED", }[certsreqs] - if test_support.verbose: + if support.verbose: formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" sys.stdout.write(formatstr % (ssl.get_protocol_name(client_protocol), ssl.get_protocol_name(server_protocol), certtype)) - try: + client_context = ssl.SSLContext(client_protocol) + client_context.options = ssl.OP_ALL | client_options + server_context = ssl.SSLContext(server_protocol) + server_context.options = ssl.OP_ALL | server_options + for ctx in (client_context, server_context): + ctx.verify_mode = certsreqs # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client # will send an SSLv3 hello (rather than SSLv2) starting from # OpenSSL 1.0.0 (see issue #8322). - server_params_test(CERTFILE, server_protocol, certsreqs, - CERTFILE, CERTFILE, client_protocol, - ciphers="ALL", chatty=False) + ctx.set_ciphers("ALL") + ctx.load_cert_chain(CERTFILE) + ctx.load_verify_locations(CERTFILE) + try: + server_params_test(client_context, server_context, + chatty=False, connectionchatty=False) # Protocol mismatch can result in either an SSLError, or a # "Connection reset by peer" error. except ssl.SSLError: @@ -909,75 +1161,32 @@ else: class ThreadedTests(unittest.TestCase): - def test_rude_shutdown(self): - """A brutal shutdown of an SSL server should raise an IOError - in the client when attempting handshake. - """ - listener_ready = threading.Event() - listener_gone = threading.Event() - - s = socket.socket() - port = test_support.bind_port(s, HOST) - - # `listener` runs in a thread. It sits in an accept() until - # the main thread connects. Then it rudely closes the socket, - # and sets Event `listener_gone` to let the main thread know - # the socket is gone. - def listener(): - s.listen(5) - listener_ready.set() - s.accept() - s.close() - listener_gone.set() - - def connector(): - listener_ready.wait() - c = socket.socket() - c.connect((HOST, port)) - listener_gone.wait() - try: - ssl_sock = ssl.wrap_socket(c) - except IOError: - pass - else: - self.fail('connecting to closed SSL socket should have failed') - - t = threading.Thread(target=listener) - t.start() - try: - connector() - finally: - t.join() - @skip_if_broken_ubuntu_ssl def test_echo(self): """Basic test of an SSL client connecting to a server""" - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") - server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, - CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, - chatty=True, connectionchatty=True) + for protocol in PROTOCOLS: + context = ssl.SSLContext(protocol) + context.load_cert_chain(CERTFILE) + server_params_test(context, context, + chatty=True, connectionchatty=True) def test_getpeercert(self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") - s2 = socket.socket() - server = ThreadedEchoServer(CERTFILE, - certreqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_SSLv23, - cacerts=CERTFILE, - chatty=False) + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(CERTFILE) + context.load_cert_chain(CERTFILE) + server = ThreadedEchoServer(context=context, chatty=False) with server: - s = ssl.wrap_socket(socket.socket(), - certfile=CERTFILE, - ca_certs=CERTFILE, - cert_reqs=ssl.CERT_REQUIRED, - ssl_version=ssl.PROTOCOL_SSLv23) + s = context.wrap_socket(socket.socket()) s.connect((HOST, server.port)) cert = s.getpeercert() self.assertTrue(cert, "Can't get peer certificate.") cipher = s.cipher() - if test_support.verbose: + if support.verbose: sys.stdout.write(pprint.pformat(cert) + '\n') sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') if 'subject' not in cert: @@ -988,6 +1197,11 @@ else: self.fail( "Missing or invalid 'organizationName' field in certificate subject; " "should be 'Python Software Foundation'.") + self.assertIn('notBefore', cert) + self.assertIn('notAfter', cert) + before = ssl.cert_time_to_seconds(cert['notBefore']) + after = ssl.cert_time_to_seconds(cert['notAfter']) + self.assertLess(before, after) s.close() def test_empty_cert(self): @@ -1007,25 +1221,83 @@ else: bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, "badkey.pem")) + def test_rude_shutdown(self): + """A brutal shutdown of an SSL server should raise an IOError + in the client when attempting handshake. + """ + listener_ready = threading.Event() + listener_gone = threading.Event() + + s = socket.socket() + port = support.bind_port(s, HOST) + + # `listener` runs in a thread. It sits in an accept() until + # the main thread connects. Then it rudely closes the socket, + # and sets Event `listener_gone` to let the main thread know + # the socket is gone. + def listener(): + s.listen(5) + listener_ready.set() + newsock, addr = s.accept() + newsock.close() + s.close() + listener_gone.set() + + def connector(): + listener_ready.wait() + with socket.socket() as c: + c.connect((HOST, port)) + listener_gone.wait() + try: + ssl_sock = ssl.wrap_socket(c) + except IOError: + pass + else: + self.fail('connecting to closed SSL socket should have failed') + + t = threading.Thread(target=listener) + t.start() + try: + connector() + finally: + t.join() + @skip_if_broken_ubuntu_ssl + @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), "need SSLv2") def test_protocol_sslv2(self): """Connecting to an SSLv2 server with various client options""" - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") - if not hasattr(ssl, 'PROTOCOL_SSLv2'): - self.skipTest("PROTOCOL_SSLv2 needed") try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) + # SSLv23 client with specific SSL options + if no_sslv2_implies_sslv3_hello(): + # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, + client_options=ssl.OP_NO_SSLv2) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, + client_options=ssl.OP_NO_SSLv3) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, + client_options=ssl.OP_NO_TLSv1) @skip_if_broken_ubuntu_ssl def test_protocol_sslv23(self): """Connecting to an SSLv23 server with various client options""" - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") + if hasattr(ssl, 'PROTOCOL_SSLv2'): + try: + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) + except (ssl.SSLError, socket.error) as x: + # this fails on some older versions of OpenSSL (0.9.7l, for instance) + if support.verbose: + sys.stdout.write( + " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" + % str(x)) try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) @@ -1038,22 +1310,38 @@ else: try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) + # Server with specific SSL options + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, + server_options=ssl.OP_NO_SSLv3) + # Will choose TLSv1 + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, + server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False, + server_options=ssl.OP_NO_TLSv1) + + @skip_if_broken_ubuntu_ssl def test_protocol_sslv3(self): """Connecting to an SSLv3 server with various client options""" - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) if hasattr(ssl, 'PROTOCOL_SSLv2'): try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False, + client_options=ssl.OP_NO_SSLv3) try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) + if no_sslv2_implies_sslv3_hello(): + # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, True, + client_options=ssl.OP_NO_SSLv2) @skip_if_broken_ubuntu_ssl def test_protocol_tlsv1(self): """Connecting to a TLSv1 server with various client options""" - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) @@ -1061,10 +1349,12 @@ else: if hasattr(ssl, 'PROTOCOL_SSLv2'): try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False, + client_options=ssl.OP_NO_TLSv1) def test_starttls(self): """Switching from clear text to encrypted and back again.""" - msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") + msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6") server = ThreadedEchoServer(CERTFILE, ssl_version=ssl.PROTOCOL_TLSv1, @@ -1076,119 +1366,109 @@ else: s = socket.socket() s.setblocking(1) s.connect((HOST, server.port)) - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") for indata in msgs: - if test_support.verbose: + if support.verbose: sys.stdout.write( - " client: sending %s...\n" % repr(indata)) + " client: sending %r...\n" % indata) if wrapped: conn.write(indata) outdata = conn.read() else: s.send(indata) outdata = s.recv(1024) - if (indata == "STARTTLS" and - outdata.strip().lower().startswith("ok")): + msg = outdata.strip().lower() + if indata == b"STARTTLS" and msg.startswith(b"ok"): # STARTTLS ok, switch to secure mode - if test_support.verbose: + if support.verbose: sys.stdout.write( - " client: read %s from server, starting TLS...\n" - % repr(outdata)) + " client: read %r from server, starting TLS...\n" + % msg) conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) wrapped = True - elif (indata == "ENDTLS" and - outdata.strip().lower().startswith("ok")): + elif indata == b"ENDTLS" and msg.startswith(b"ok"): # ENDTLS ok, switch back to clear text - if test_support.verbose: + if support.verbose: sys.stdout.write( - " client: read %s from server, ending TLS...\n" - % repr(outdata)) + " client: read %r from server, ending TLS...\n" + % msg) s = conn.unwrap() wrapped = False else: - if test_support.verbose: + if support.verbose: sys.stdout.write( - " client: read %s from server\n" % repr(outdata)) - if test_support.verbose: + " client: read %r from server\n" % msg) + if support.verbose: sys.stdout.write(" client: closing connection.\n") if wrapped: - conn.write("over\n") + conn.write(b"over\n") else: - s.send("over\n") - s.close() + s.send(b"over\n") + if wrapped: + conn.close() + else: + s.close() def test_socketserver(self): """Using a SocketServer to create and manage SSL connections.""" - server = SocketServerHTTPSServer(CERTFILE) - flag = threading.Event() - server.start(flag) - # wait for it to start - flag.wait() + server = make_https_server(self, CERTFILE) # try to connect + if support.verbose: + sys.stdout.write('\n') + with open(CERTFILE, 'rb') as f: + d1 = f.read() + d2 = '' + # now fetch the same data from the HTTPS server + url = 'https://%s:%d/%s' % ( + HOST, server.port, os.path.split(CERTFILE)[1]) + f = urllib.request.urlopen(url) try: - if test_support.verbose: - sys.stdout.write('\n') - with open(CERTFILE, 'rb') as f: - d1 = f.read() - d2 = '' - # now fetch the same data from the HTTPS server - url = 'https://127.0.0.1:%d/%s' % ( - server.port, os.path.split(CERTFILE)[1]) - with test_support.check_py3k_warnings(): - f = urllib.urlopen(url) - dlen = f.info().getheader("content-length") + dlen = f.info().get("content-length") if dlen and (int(dlen) > 0): d2 = f.read(int(dlen)) - if test_support.verbose: + if support.verbose: sys.stdout.write( " client: read %d bytes from remote server '%s'\n" % (len(d2), server)) - f.close() - self.assertEqual(d1, d2) finally: - server.stop() - server.join() - - def test_wrapped_accept(self): - """Check the accept() method on SSL sockets.""" - if test_support.verbose: - sys.stdout.write("\n") - server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, - CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, - chatty=True, connectionchatty=True, - wrap_accepting_socket=True) + f.close() + self.assertEqual(d1, d2) def test_asyncore_server(self): """Check the example asyncore integration.""" indata = "TEST MESSAGE of mixed case\n" - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") + + indata = b"FOO\n" server = AsyncoreEchoServer(CERTFILE) with server: s = ssl.wrap_socket(socket.socket()) s.connect(('127.0.0.1', server.port)) - if test_support.verbose: + if support.verbose: sys.stdout.write( - " client: sending %s...\n" % (repr(indata))) + " client: sending %r...\n" % indata) s.write(indata) outdata = s.read() - if test_support.verbose: - sys.stdout.write(" client: read %s\n" % repr(outdata)) + if support.verbose: + sys.stdout.write(" client: read %r\n" % outdata) if outdata != indata.lower(): self.fail( - "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" - % (outdata[:min(len(outdata),20)], len(outdata), - indata[:min(len(indata),20)].lower(), len(indata))) - s.write("over\n") - if test_support.verbose: + "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" + % (outdata[:20], len(outdata), + indata[:20].lower(), len(indata))) + s.write(b"over\n") + if support.verbose: sys.stdout.write(" client: closing connection.\n") s.close() + if support.verbose: + sys.stdout.write(" client: connection closed.\n") def test_recv_send(self): """Test recv(), send() and friends.""" - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") server = ThreadedEchoServer(CERTFILE, @@ -1207,12 +1487,12 @@ else: s.connect((HOST, server.port)) # helper methods for standardising recv* method signatures def _recv_into(): - b = bytearray("\0"*100) + b = bytearray(b"\0"*100) count = s.recv_into(b) return b[:count] def _recvfrom_into(): - b = bytearray("\0"*100) + b = bytearray(b"\0"*100) count, addr = s.recvfrom_into(b) return b[:count] @@ -1228,76 +1508,75 @@ else: ('recv_into', _recv_into, True, []), ('recvfrom_into', _recvfrom_into, False, []), ] - data_prefix = u"PREFIX_" + data_prefix = "PREFIX_" for meth_name, send_meth, expect_success, args in send_methods: - indata = data_prefix + meth_name + indata = (data_prefix + meth_name).encode('ascii') try: - send_meth(indata.encode('ASCII', 'strict'), *args) + send_meth(indata, *args) outdata = s.read() - outdata = outdata.decode('ASCII', 'strict') if outdata != indata.lower(): self.fail( - "While sending with <<%s>> bad data " - "<<%r>> (%d) received; " - "expected <<%r>> (%d)\n" % ( - meth_name, outdata[:20], len(outdata), - indata[:20], len(indata) + "While sending with <<{name:s}>> bad data " + "<<{outdata:r}>> ({nout:d}) received; " + "expected <<{indata:r}>> ({nin:d})\n".format( + name=meth_name, outdata=outdata[:20], + nout=len(outdata), + indata=indata[:20], nin=len(indata) ) ) except ValueError as e: if expect_success: self.fail( - "Failed to send with method <<%s>>; " - "expected to succeed.\n" % (meth_name,) + "Failed to send with method <<{name:s}>>; " + "expected to succeed.\n".format(name=meth_name) ) if not str(e).startswith(meth_name): self.fail( - "Method <<%s>> failed with unexpected " - "exception message: %s\n" % ( - meth_name, e + "Method <<{name:s}>> failed with unexpected " + "exception message: {exp:s}\n".format( + name=meth_name, exp=e ) ) for meth_name, recv_meth, expect_success, args in recv_methods: - indata = data_prefix + meth_name + indata = (data_prefix + meth_name).encode('ascii') try: - s.send(indata.encode('ASCII', 'strict')) + s.send(indata) outdata = recv_meth(*args) - outdata = outdata.decode('ASCII', 'strict') if outdata != indata.lower(): self.fail( - "While receiving with <<%s>> bad data " - "<<%r>> (%d) received; " - "expected <<%r>> (%d)\n" % ( - meth_name, outdata[:20], len(outdata), - indata[:20], len(indata) + "While receiving with <<{name:s}>> bad data " + "<<{outdata:r}>> ({nout:d}) received; " + "expected <<{indata:r}>> ({nin:d})\n".format( + name=meth_name, outdata=outdata[:20], + nout=len(outdata), + indata=indata[:20], nin=len(indata) ) ) except ValueError as e: if expect_success: self.fail( - "Failed to receive with method <<%s>>; " - "expected to succeed.\n" % (meth_name,) + "Failed to receive with method <<{name:s}>>; " + "expected to succeed.\n".format(name=meth_name) ) if not str(e).startswith(meth_name): self.fail( - "Method <<%s>> failed with unexpected " - "exception message: %s\n" % ( - meth_name, e + "Method <<{name:s}>> failed with unexpected " + "exception message: {exp:s}\n".format( + name=meth_name, exp=e ) ) # consume data s.read() - - s.write("over\n".encode("ASCII", "strict")) + s.write(b"over\n") s.close() def test_handshake_timeout(self): # Issue #5103: SSL handshake must respect the socket timeout server = socket.socket(socket.AF_INET) host = "127.0.0.1" - port = test_support.bind_port(server) + port = support.bind_port(server) started = threading.Event() finish = False @@ -1311,6 +1590,8 @@ else: # Let the socket hang around rather than having # it closed by garbage collection. conns.append(server.accept()[0]) + for sock in conns: + sock.close() t = threading.Thread(target=serve) t.start() @@ -1322,17 +1603,17 @@ else: c.settimeout(0.2) c.connect((host, port)) # Will attempt handshake and time out - self.assertRaisesRegexp(ssl.SSLError, "timed out", - ssl.wrap_socket, c) + self.assertRaisesRegex(socket.timeout, "timed out", + ssl.wrap_socket, c) finally: c.close() try: c = socket.socket(socket.AF_INET) - c.settimeout(0.2) c = ssl.wrap_socket(c) + c.settimeout(0.2) # Will attempt handshake and time out - self.assertRaisesRegexp(ssl.SSLError, "timed out", - c.connect, (host, port)) + self.assertRaisesRegex(socket.timeout, "timed out", + c.connect, (host, port)) finally: c.close() finally: @@ -1340,56 +1621,100 @@ else: t.join() server.close() + def test_server_accept(self): + # Issue #16357: accept() on a SSLSocket created through + # SSLContext.wrap_socket(). + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(CERTFILE) + context.load_cert_chain(CERTFILE) + server = socket.socket(socket.AF_INET) + host = "127.0.0.1" + port = support.bind_port(server) + server = context.wrap_socket(server, server_side=True) + + evt = threading.Event() + remote = None + peer = None + def serve(): + nonlocal remote, peer + server.listen(5) + # Block on the accept and wait on the connection to close. + evt.set() + remote, peer = server.accept() + remote.recv(1) + + t = threading.Thread(target=serve) + t.start() + # Client wait until server setup and perform a connect. + evt.wait() + client = context.wrap_socket(socket.socket()) + client.connect((host, port)) + client_addr = client.getsockname() + client.close() + t.join() + # Sanity checks. + self.assertIsInstance(remote, ssl.SSLSocket) + self.assertEqual(peer, client_addr) + def test_default_ciphers(self): + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + try: + # Force a set of weak ciphers on our client context + context.set_ciphers("DES") + except ssl.SSLError: + self.skipTest("no DES cipher available") with ThreadedEchoServer(CERTFILE, ssl_version=ssl.PROTOCOL_SSLv23, chatty=False) as server: - sock = socket.socket() - try: - # Force a set of weak ciphers on our client socket - try: - s = ssl.wrap_socket(sock, - ssl_version=ssl.PROTOCOL_SSLv23, - ciphers="DES") - except ssl.SSLError: - self.skipTest("no DES cipher available") + with socket.socket() as sock: + s = context.wrap_socket(sock) with self.assertRaises((OSError, ssl.SSLError)): s.connect((HOST, server.port)) - finally: - sock.close() self.assertIn("no shared cipher", str(server.conn_errors[0])) def test_main(verbose=False): - global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT - CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "keycert.pem") - SVN_PYTHON_ORG_ROOT_CERT = os.path.join( - os.path.dirname(__file__) or os.curdir, - "https_svn_python_org_root.pem") - NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir, - "nokia.pem") - - if (not os.path.exists(CERTFILE) or - not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or - not os.path.exists(NOKIACERT)): - raise test_support.TestFailed("Can't read certificate files!") - - tests = [BasicTests, BasicSocketTests] - - if test_support.is_resource_enabled('network'): + if support.verbose: + plats = { + 'Linux': platform.linux_distribution, + 'Mac': platform.mac_ver, + 'Windows': platform.win32_ver, + } + for name, func in plats.items(): + plat = func() + if plat and plat[0]: + plat = '%s %r' % (name, plat) + break + else: + plat = repr(platform.platform()) + print("test_ssl: testing with %r %r" % + (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) + print(" under %s" % plat) + print(" HAS_SNI = %r" % ssl.HAS_SNI) + + for filename in [ + CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE, + ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, + BADCERT, BADKEY, EMPTYCERT]: + if not os.path.exists(filename): + raise support.TestFailed("Can't read certificate file %r" % filename) + + tests = [ContextTests, BasicSocketTests] + + if support.is_resource_enabled('network'): tests.append(NetworkedTests) if _have_threads: - thread_info = test_support.threading_setup() - if thread_info and test_support.is_resource_enabled('network'): + thread_info = support.threading_setup() + if thread_info and support.is_resource_enabled('network'): tests.append(ThreadedTests) try: - test_support.run_unittest(*tests) + support.run_unittest(*tests) finally: if _have_threads: - test_support.threading_cleanup(*thread_info) + support.threading_cleanup(*thread_info) if __name__ == "__main__": test_main() |