diff options
Diffstat (limited to 'Lib/test/test_hashlib.py')
-rw-r--r-- | Lib/test/test_hashlib.py | 253 |
1 files changed, 201 insertions, 52 deletions
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py index 5e3356a02f3..5bad483ae9d 100644 --- a/Lib/test/test_hashlib.py +++ b/Lib/test/test_hashlib.py @@ -12,12 +12,12 @@ import io import itertools import logging import os +import re import sys import sysconfig import tempfile import threading import unittest -import warnings from test import support from test.support import _4G, bigmemtest from test.support import hashlib_helper @@ -98,6 +98,14 @@ def read_vectors(hash_name): yield parts +DEPRECATED_STRING_PARAMETER = re.escape( + "the 'string' keyword parameter is deprecated since " + "Python 3.15 and slated for removal in Python 3.19; " + "use the 'data' keyword parameter or pass the data " + "to hash as a positional argument instead" +) + + class HashLibTestCase(unittest.TestCase): supported_hash_names = ( 'md5', 'MD5', 'sha1', 'SHA1', 'sha224', 'SHA224', 'sha256', 'SHA256', @@ -141,19 +149,18 @@ class HashLibTestCase(unittest.TestCase): # of hashlib.new given the algorithm name. for algorithm, constructors in self.constructors_to_test.items(): constructors.add(getattr(hashlib, algorithm)) - def _test_algorithm_via_hashlib_new(data=None, _alg=algorithm, **kwargs): - if data is None: - return hashlib.new(_alg, **kwargs) - return hashlib.new(_alg, data, **kwargs) - constructors.add(_test_algorithm_via_hashlib_new) + def c(*args, __algorithm_name=algorithm, **kwargs): + return hashlib.new(__algorithm_name, *args, **kwargs) + c.__name__ = f'do_test_algorithm_via_hashlib_new_{algorithm}' + constructors.add(c) _hashlib = self._conditional_import_module('_hashlib') self._hashlib = _hashlib if _hashlib: # These algorithms should always be present when this module # is compiled. If not, something was compiled wrong. - self.assertTrue(hasattr(_hashlib, 'openssl_md5')) - self.assertTrue(hasattr(_hashlib, 'openssl_sha1')) + self.assertHasAttr(_hashlib, 'openssl_md5') + self.assertHasAttr(_hashlib, 'openssl_sha1') for algorithm, constructors in self.constructors_to_test.items(): constructor = getattr(_hashlib, 'openssl_'+algorithm, None) if constructor: @@ -201,6 +208,11 @@ class HashLibTestCase(unittest.TestCase): return itertools.chain.from_iterable(constructors) @property + def shake_constructors(self): + for shake_name in self.shakes: + yield from self.constructors_to_test.get(shake_name, ()) + + @property def is_fips_mode(self): return get_fips_mode() @@ -250,6 +262,85 @@ class HashLibTestCase(unittest.TestCase): self._hashlib.new("md5", usedforsecurity=False) self._hashlib.openssl_md5(usedforsecurity=False) + @unittest.skipIf(get_fips_mode(), "skip in FIPS mode") + def test_clinic_signature(self): + for constructor in self.hash_constructors: + with self.subTest(constructor.__name__): + constructor(b'') + constructor(data=b'') + with self.assertWarnsRegex(DeprecationWarning, + DEPRECATED_STRING_PARAMETER): + constructor(string=b'') + + digest_name = constructor(b'').name + with self.subTest(digest_name): + hashlib.new(digest_name, b'') + hashlib.new(digest_name, data=b'') + with self.assertWarnsRegex(DeprecationWarning, + DEPRECATED_STRING_PARAMETER): + hashlib.new(digest_name, string=b'') + # Make sure that _hashlib contains the constructor + # to test when using a combination of libcrypto and + # interned hash implementations. + if self._hashlib and digest_name in self._hashlib._constructors: + self._hashlib.new(digest_name, b'') + self._hashlib.new(digest_name, data=b'') + with self.assertWarnsRegex(DeprecationWarning, + DEPRECATED_STRING_PARAMETER): + self._hashlib.new(digest_name, string=b'') + + @unittest.skipIf(get_fips_mode(), "skip in FIPS mode") + def test_clinic_signature_errors(self): + nomsg = b'' + mymsg = b'msg' + conflicting_call = re.escape( + "'data' and 'string' are mutually exclusive " + "and support for 'string' keyword parameter " + "is slated for removal in a future version." + ) + duplicated_param = re.escape("given by name ('data') and position") + unexpected_param = re.escape("got an unexpected keyword argument '_'") + for args, kwds, errmsg in [ + # Reject duplicated arguments before unknown keyword arguments. + ((nomsg,), dict(data=nomsg, _=nomsg), duplicated_param), + ((mymsg,), dict(data=nomsg, _=nomsg), duplicated_param), + # Reject duplicated arguments before conflicting ones. + *itertools.product( + [[nomsg], [mymsg]], + [dict(data=nomsg), dict(data=nomsg, string=nomsg)], + [duplicated_param] + ), + # Reject unknown keyword arguments before conflicting ones. + *itertools.product( + [()], + [ + dict(_=None), + dict(data=nomsg, _=None), + dict(string=nomsg, _=None), + dict(string=nomsg, data=nomsg, _=None), + ], + [unexpected_param] + ), + ((nomsg,), dict(_=None), unexpected_param), + ((mymsg,), dict(_=None), unexpected_param), + # Reject conflicting arguments. + [(nomsg,), dict(string=nomsg), conflicting_call], + [(mymsg,), dict(string=nomsg), conflicting_call], + [(), dict(data=nomsg, string=nomsg), conflicting_call], + ]: + for constructor in self.hash_constructors: + digest_name = constructor(b'').name + with self.subTest(constructor.__name__, args=args, kwds=kwds): + with self.assertRaisesRegex(TypeError, errmsg): + constructor(*args, **kwds) + with self.subTest(digest_name, args=args, kwds=kwds): + with self.assertRaisesRegex(TypeError, errmsg): + hashlib.new(digest_name, *args, **kwds) + if (self._hashlib and + digest_name in self._hashlib._constructors): + with self.assertRaisesRegex(TypeError, errmsg): + self._hashlib.new(digest_name, *args, **kwds) + def test_unknown_hash(self): self.assertRaises(ValueError, hashlib.new, 'spam spam spam spam spam') self.assertRaises(TypeError, hashlib.new, 1) @@ -284,6 +375,16 @@ class HashLibTestCase(unittest.TestCase): self.assertIs(constructor, _md5.md5) self.assertEqual(sorted(builtin_constructor_cache), ['MD5', 'md5']) + def test_copy(self): + for cons in self.hash_constructors: + h1 = cons(os.urandom(16), usedforsecurity=False) + h2 = h1.copy() + self.assertIs(type(h1), type(h2)) + self.assertEqual(h1.name, h2.name) + size = (16,) if h1.name in self.shakes else () + self.assertEqual(h1.digest(*size), h2.digest(*size)) + self.assertEqual(h1.hexdigest(*size), h2.hexdigest(*size)) + def test_hexdigest(self): for cons in self.hash_constructors: h = cons(usedforsecurity=False) @@ -294,21 +395,50 @@ class HashLibTestCase(unittest.TestCase): self.assertIsInstance(h.digest(), bytes) self.assertEqual(hexstr(h.digest()), h.hexdigest()) - def test_digest_length_overflow(self): - # See issue #34922 - large_sizes = (2**29, 2**32-10, 2**32+10, 2**61, 2**64-10, 2**64+10) - for cons in self.hash_constructors: - h = cons(usedforsecurity=False) - if h.name not in self.shakes: - continue - if HASH is not None and isinstance(h, HASH): - # _hashopenssl's take a size_t - continue - for digest in h.digest, h.hexdigest: - self.assertRaises(ValueError, digest, -10) - for length in large_sizes: - with self.assertRaises((ValueError, OverflowError)): - digest(length) + def test_shakes_zero_digest_length(self): + for constructor in self.shake_constructors: + with self.subTest(constructor=constructor): + h = constructor(b'abcdef', usedforsecurity=False) + self.assertEqual(h.digest(0), b'') + self.assertEqual(h.hexdigest(0), '') + + def test_shakes_invalid_digest_length(self): + # See https://github.com/python/cpython/issues/79103. + for constructor in self.shake_constructors: + with self.subTest(constructor=constructor): + h = constructor(usedforsecurity=False) + # Note: digest() and hexdigest() take a signed input and + # raise if it is negative; the rationale is that we use + # internally PyBytes_FromStringAndSize() and _Py_strhex() + # which both take a Py_ssize_t. + for negative_size in (-1, -10, -(1 << 31), -sys.maxsize): + self.assertRaises(ValueError, h.digest, negative_size) + self.assertRaises(ValueError, h.hexdigest, negative_size) + + def test_shakes_overflow_digest_length(self): + # See https://github.com/python/cpython/issues/135759. + + exc_types = (OverflowError, ValueError) + # HACL* accepts an 'uint32_t' while OpenSSL accepts a 'size_t'. + openssl_overflown_sizes = (sys.maxsize + 1, 2 * sys.maxsize) + # https://github.com/python/cpython/issues/79103 restricts + # the accepted built-in lengths to 2 ** 29, even if OpenSSL + # accepts such lengths. + builtin_overflown_sizes = openssl_overflown_sizes + ( + 2 ** 29, 2 ** 32 - 10, 2 ** 32, 2 ** 32 + 10, + 2 ** 61, 2 ** 64 - 10, 2 ** 64, 2 ** 64 + 10, + ) + + for constructor in self.shake_constructors: + with self.subTest(constructor=constructor): + h = constructor(usedforsecurity=False) + if HASH is not None and isinstance(h, HASH): + overflown_sizes = openssl_overflown_sizes + else: + overflown_sizes = builtin_overflown_sizes + for invalid_size in overflown_sizes: + self.assertRaises(exc_types, h.digest, invalid_size) + self.assertRaises(exc_types, h.hexdigest, invalid_size) def test_name_attribute(self): for cons in self.hash_constructors: @@ -719,8 +849,6 @@ class HashLibTestCase(unittest.TestCase): self.assertRaises(ValueError, constructor, node_offset=-1) self.assertRaises(OverflowError, constructor, node_offset=max_offset+1) - self.assertRaises(TypeError, constructor, data=b'') - self.assertRaises(TypeError, constructor, string=b'') self.assertRaises(TypeError, constructor, '') constructor( @@ -929,49 +1057,67 @@ class HashLibTestCase(unittest.TestCase): def test_sha256_gil(self): gil_minsize = hashlib_helper.find_gil_minsize(['_sha2', '_hashlib']) + data = b'1' + b'#' * gil_minsize + b'1' + expected = hashlib.sha256(data).hexdigest() + m = hashlib.sha256() m.update(b'1') m.update(b'#' * gil_minsize) m.update(b'1') - self.assertEqual( - m.hexdigest(), - '1cfceca95989f51f658e3f3ffe7f1cd43726c9e088c13ee10b46f57cef135b94' - ) + self.assertEqual(m.hexdigest(), expected) - m = hashlib.sha256(b'1' + b'#' * gil_minsize + b'1') - self.assertEqual( - m.hexdigest(), - '1cfceca95989f51f658e3f3ffe7f1cd43726c9e088c13ee10b46f57cef135b94' - ) + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_threaded_hashing_fast(self): + # Same as test_threaded_hashing_slow() but only tests some functions + # since otherwise test_hashlib.py becomes too slow during development. + for name in ['md5', 'sha1', 'sha256', 'sha3_256', 'blake2s']: + if constructor := getattr(hashlib, name, None): + with self.subTest(name): + self.do_test_threaded_hashing(constructor, is_shake=False) + if shake_128 := getattr(hashlib, 'shake_128', None): + self.do_test_threaded_hashing(shake_128, is_shake=True) + @requires_resource('cpu') @threading_helper.reap_threads @threading_helper.requires_working_threading() - def test_threaded_hashing(self): + def test_threaded_hashing_slow(self): + for algorithm, constructors in self.constructors_to_test.items(): + is_shake = algorithm in self.shakes + for constructor in constructors: + with self.subTest(constructor.__name__, is_shake=is_shake): + self.do_test_threaded_hashing(constructor, is_shake) + + def do_test_threaded_hashing(self, constructor, is_shake): # Updating the same hash object from several threads at once # using data chunk sizes containing the same byte sequences. # # If the internal locks are working to prevent multiple # updates on the same object from running at once, the resulting # hash will be the same as doing it single threaded upfront. - hasher = hashlib.sha1() - num_threads = 5 - smallest_data = b'swineflu' - data = smallest_data * 200000 - expected_hash = hashlib.sha1(data*num_threads).hexdigest() - - def hash_in_chunks(chunk_size): - index = 0 - while index < len(data): - hasher.update(data[index:index + chunk_size]) - index += chunk_size + + # The data to hash has length s|M|q^N and the chunk size for the i-th + # thread is s|M|q^(N-i), where N is the number of threads, M is a fixed + # message of small length, and s >= 1 and q >= 2 are small integers. + smallest_size, num_threads, s, q = 8, 5, 2, 10 + + smallest_data = os.urandom(smallest_size) + data = s * smallest_data * (q ** num_threads) + + h1 = constructor(usedforsecurity=False) + h2 = constructor(data * num_threads, usedforsecurity=False) + + def update(chunk_size): + for index in range(0, len(data), chunk_size): + h1.update(data[index:index + chunk_size]) threads = [] - for threadnum in range(num_threads): - chunk_size = len(data) // (10 ** threadnum) + for thread_num in range(num_threads): + # chunk_size = len(data) // (q ** thread_num) + chunk_size = s * smallest_size * q ** (num_threads - thread_num) self.assertGreater(chunk_size, 0) - self.assertEqual(chunk_size % len(smallest_data), 0) - thread = threading.Thread(target=hash_in_chunks, - args=(chunk_size,)) + self.assertEqual(chunk_size % smallest_size, 0) + thread = threading.Thread(target=update, args=(chunk_size,)) threads.append(thread) for thread in threads: @@ -979,7 +1125,10 @@ class HashLibTestCase(unittest.TestCase): for thread in threads: thread.join() - self.assertEqual(expected_hash, hasher.hexdigest()) + if is_shake: + self.assertEqual(h1.hexdigest(16), h2.hexdigest(16)) + else: + self.assertEqual(h1.hexdigest(), h2.hexdigest()) def test_get_fips_mode(self): fips_mode = self.is_fips_mode |