aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_hashlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_hashlib.py')
-rw-r--r--Lib/test/test_hashlib.py253
1 files changed, 201 insertions, 52 deletions
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index 5e3356a02f3..5bad483ae9d 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -12,12 +12,12 @@ import io
import itertools
import logging
import os
+import re
import sys
import sysconfig
import tempfile
import threading
import unittest
-import warnings
from test import support
from test.support import _4G, bigmemtest
from test.support import hashlib_helper
@@ -98,6 +98,14 @@ def read_vectors(hash_name):
yield parts
+DEPRECATED_STRING_PARAMETER = re.escape(
+ "the 'string' keyword parameter is deprecated since "
+ "Python 3.15 and slated for removal in Python 3.19; "
+ "use the 'data' keyword parameter or pass the data "
+ "to hash as a positional argument instead"
+)
+
+
class HashLibTestCase(unittest.TestCase):
supported_hash_names = ( 'md5', 'MD5', 'sha1', 'SHA1',
'sha224', 'SHA224', 'sha256', 'SHA256',
@@ -141,19 +149,18 @@ class HashLibTestCase(unittest.TestCase):
# of hashlib.new given the algorithm name.
for algorithm, constructors in self.constructors_to_test.items():
constructors.add(getattr(hashlib, algorithm))
- def _test_algorithm_via_hashlib_new(data=None, _alg=algorithm, **kwargs):
- if data is None:
- return hashlib.new(_alg, **kwargs)
- return hashlib.new(_alg, data, **kwargs)
- constructors.add(_test_algorithm_via_hashlib_new)
+ def c(*args, __algorithm_name=algorithm, **kwargs):
+ return hashlib.new(__algorithm_name, *args, **kwargs)
+ c.__name__ = f'do_test_algorithm_via_hashlib_new_{algorithm}'
+ constructors.add(c)
_hashlib = self._conditional_import_module('_hashlib')
self._hashlib = _hashlib
if _hashlib:
# These algorithms should always be present when this module
# is compiled. If not, something was compiled wrong.
- self.assertTrue(hasattr(_hashlib, 'openssl_md5'))
- self.assertTrue(hasattr(_hashlib, 'openssl_sha1'))
+ self.assertHasAttr(_hashlib, 'openssl_md5')
+ self.assertHasAttr(_hashlib, 'openssl_sha1')
for algorithm, constructors in self.constructors_to_test.items():
constructor = getattr(_hashlib, 'openssl_'+algorithm, None)
if constructor:
@@ -201,6 +208,11 @@ class HashLibTestCase(unittest.TestCase):
return itertools.chain.from_iterable(constructors)
@property
+ def shake_constructors(self):
+ for shake_name in self.shakes:
+ yield from self.constructors_to_test.get(shake_name, ())
+
+ @property
def is_fips_mode(self):
return get_fips_mode()
@@ -250,6 +262,85 @@ class HashLibTestCase(unittest.TestCase):
self._hashlib.new("md5", usedforsecurity=False)
self._hashlib.openssl_md5(usedforsecurity=False)
+ @unittest.skipIf(get_fips_mode(), "skip in FIPS mode")
+ def test_clinic_signature(self):
+ for constructor in self.hash_constructors:
+ with self.subTest(constructor.__name__):
+ constructor(b'')
+ constructor(data=b'')
+ with self.assertWarnsRegex(DeprecationWarning,
+ DEPRECATED_STRING_PARAMETER):
+ constructor(string=b'')
+
+ digest_name = constructor(b'').name
+ with self.subTest(digest_name):
+ hashlib.new(digest_name, b'')
+ hashlib.new(digest_name, data=b'')
+ with self.assertWarnsRegex(DeprecationWarning,
+ DEPRECATED_STRING_PARAMETER):
+ hashlib.new(digest_name, string=b'')
+ # Make sure that _hashlib contains the constructor
+ # to test when using a combination of libcrypto and
+ # interned hash implementations.
+ if self._hashlib and digest_name in self._hashlib._constructors:
+ self._hashlib.new(digest_name, b'')
+ self._hashlib.new(digest_name, data=b'')
+ with self.assertWarnsRegex(DeprecationWarning,
+ DEPRECATED_STRING_PARAMETER):
+ self._hashlib.new(digest_name, string=b'')
+
+ @unittest.skipIf(get_fips_mode(), "skip in FIPS mode")
+ def test_clinic_signature_errors(self):
+ nomsg = b''
+ mymsg = b'msg'
+ conflicting_call = re.escape(
+ "'data' and 'string' are mutually exclusive "
+ "and support for 'string' keyword parameter "
+ "is slated for removal in a future version."
+ )
+ duplicated_param = re.escape("given by name ('data') and position")
+ unexpected_param = re.escape("got an unexpected keyword argument '_'")
+ for args, kwds, errmsg in [
+ # Reject duplicated arguments before unknown keyword arguments.
+ ((nomsg,), dict(data=nomsg, _=nomsg), duplicated_param),
+ ((mymsg,), dict(data=nomsg, _=nomsg), duplicated_param),
+ # Reject duplicated arguments before conflicting ones.
+ *itertools.product(
+ [[nomsg], [mymsg]],
+ [dict(data=nomsg), dict(data=nomsg, string=nomsg)],
+ [duplicated_param]
+ ),
+ # Reject unknown keyword arguments before conflicting ones.
+ *itertools.product(
+ [()],
+ [
+ dict(_=None),
+ dict(data=nomsg, _=None),
+ dict(string=nomsg, _=None),
+ dict(string=nomsg, data=nomsg, _=None),
+ ],
+ [unexpected_param]
+ ),
+ ((nomsg,), dict(_=None), unexpected_param),
+ ((mymsg,), dict(_=None), unexpected_param),
+ # Reject conflicting arguments.
+ [(nomsg,), dict(string=nomsg), conflicting_call],
+ [(mymsg,), dict(string=nomsg), conflicting_call],
+ [(), dict(data=nomsg, string=nomsg), conflicting_call],
+ ]:
+ for constructor in self.hash_constructors:
+ digest_name = constructor(b'').name
+ with self.subTest(constructor.__name__, args=args, kwds=kwds):
+ with self.assertRaisesRegex(TypeError, errmsg):
+ constructor(*args, **kwds)
+ with self.subTest(digest_name, args=args, kwds=kwds):
+ with self.assertRaisesRegex(TypeError, errmsg):
+ hashlib.new(digest_name, *args, **kwds)
+ if (self._hashlib and
+ digest_name in self._hashlib._constructors):
+ with self.assertRaisesRegex(TypeError, errmsg):
+ self._hashlib.new(digest_name, *args, **kwds)
+
def test_unknown_hash(self):
self.assertRaises(ValueError, hashlib.new, 'spam spam spam spam spam')
self.assertRaises(TypeError, hashlib.new, 1)
@@ -284,6 +375,16 @@ class HashLibTestCase(unittest.TestCase):
self.assertIs(constructor, _md5.md5)
self.assertEqual(sorted(builtin_constructor_cache), ['MD5', 'md5'])
+ def test_copy(self):
+ for cons in self.hash_constructors:
+ h1 = cons(os.urandom(16), usedforsecurity=False)
+ h2 = h1.copy()
+ self.assertIs(type(h1), type(h2))
+ self.assertEqual(h1.name, h2.name)
+ size = (16,) if h1.name in self.shakes else ()
+ self.assertEqual(h1.digest(*size), h2.digest(*size))
+ self.assertEqual(h1.hexdigest(*size), h2.hexdigest(*size))
+
def test_hexdigest(self):
for cons in self.hash_constructors:
h = cons(usedforsecurity=False)
@@ -294,21 +395,50 @@ class HashLibTestCase(unittest.TestCase):
self.assertIsInstance(h.digest(), bytes)
self.assertEqual(hexstr(h.digest()), h.hexdigest())
- def test_digest_length_overflow(self):
- # See issue #34922
- large_sizes = (2**29, 2**32-10, 2**32+10, 2**61, 2**64-10, 2**64+10)
- for cons in self.hash_constructors:
- h = cons(usedforsecurity=False)
- if h.name not in self.shakes:
- continue
- if HASH is not None and isinstance(h, HASH):
- # _hashopenssl's take a size_t
- continue
- for digest in h.digest, h.hexdigest:
- self.assertRaises(ValueError, digest, -10)
- for length in large_sizes:
- with self.assertRaises((ValueError, OverflowError)):
- digest(length)
+ def test_shakes_zero_digest_length(self):
+ for constructor in self.shake_constructors:
+ with self.subTest(constructor=constructor):
+ h = constructor(b'abcdef', usedforsecurity=False)
+ self.assertEqual(h.digest(0), b'')
+ self.assertEqual(h.hexdigest(0), '')
+
+ def test_shakes_invalid_digest_length(self):
+ # See https://github.com/python/cpython/issues/79103.
+ for constructor in self.shake_constructors:
+ with self.subTest(constructor=constructor):
+ h = constructor(usedforsecurity=False)
+ # Note: digest() and hexdigest() take a signed input and
+ # raise if it is negative; the rationale is that we use
+ # internally PyBytes_FromStringAndSize() and _Py_strhex()
+ # which both take a Py_ssize_t.
+ for negative_size in (-1, -10, -(1 << 31), -sys.maxsize):
+ self.assertRaises(ValueError, h.digest, negative_size)
+ self.assertRaises(ValueError, h.hexdigest, negative_size)
+
+ def test_shakes_overflow_digest_length(self):
+ # See https://github.com/python/cpython/issues/135759.
+
+ exc_types = (OverflowError, ValueError)
+ # HACL* accepts an 'uint32_t' while OpenSSL accepts a 'size_t'.
+ openssl_overflown_sizes = (sys.maxsize + 1, 2 * sys.maxsize)
+ # https://github.com/python/cpython/issues/79103 restricts
+ # the accepted built-in lengths to 2 ** 29, even if OpenSSL
+ # accepts such lengths.
+ builtin_overflown_sizes = openssl_overflown_sizes + (
+ 2 ** 29, 2 ** 32 - 10, 2 ** 32, 2 ** 32 + 10,
+ 2 ** 61, 2 ** 64 - 10, 2 ** 64, 2 ** 64 + 10,
+ )
+
+ for constructor in self.shake_constructors:
+ with self.subTest(constructor=constructor):
+ h = constructor(usedforsecurity=False)
+ if HASH is not None and isinstance(h, HASH):
+ overflown_sizes = openssl_overflown_sizes
+ else:
+ overflown_sizes = builtin_overflown_sizes
+ for invalid_size in overflown_sizes:
+ self.assertRaises(exc_types, h.digest, invalid_size)
+ self.assertRaises(exc_types, h.hexdigest, invalid_size)
def test_name_attribute(self):
for cons in self.hash_constructors:
@@ -719,8 +849,6 @@ class HashLibTestCase(unittest.TestCase):
self.assertRaises(ValueError, constructor, node_offset=-1)
self.assertRaises(OverflowError, constructor, node_offset=max_offset+1)
- self.assertRaises(TypeError, constructor, data=b'')
- self.assertRaises(TypeError, constructor, string=b'')
self.assertRaises(TypeError, constructor, '')
constructor(
@@ -929,49 +1057,67 @@ class HashLibTestCase(unittest.TestCase):
def test_sha256_gil(self):
gil_minsize = hashlib_helper.find_gil_minsize(['_sha2', '_hashlib'])
+ data = b'1' + b'#' * gil_minsize + b'1'
+ expected = hashlib.sha256(data).hexdigest()
+
m = hashlib.sha256()
m.update(b'1')
m.update(b'#' * gil_minsize)
m.update(b'1')
- self.assertEqual(
- m.hexdigest(),
- '1cfceca95989f51f658e3f3ffe7f1cd43726c9e088c13ee10b46f57cef135b94'
- )
+ self.assertEqual(m.hexdigest(), expected)
- m = hashlib.sha256(b'1' + b'#' * gil_minsize + b'1')
- self.assertEqual(
- m.hexdigest(),
- '1cfceca95989f51f658e3f3ffe7f1cd43726c9e088c13ee10b46f57cef135b94'
- )
+ @threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
+ def test_threaded_hashing_fast(self):
+ # Same as test_threaded_hashing_slow() but only tests some functions
+ # since otherwise test_hashlib.py becomes too slow during development.
+ for name in ['md5', 'sha1', 'sha256', 'sha3_256', 'blake2s']:
+ if constructor := getattr(hashlib, name, None):
+ with self.subTest(name):
+ self.do_test_threaded_hashing(constructor, is_shake=False)
+ if shake_128 := getattr(hashlib, 'shake_128', None):
+ self.do_test_threaded_hashing(shake_128, is_shake=True)
+ @requires_resource('cpu')
@threading_helper.reap_threads
@threading_helper.requires_working_threading()
- def test_threaded_hashing(self):
+ def test_threaded_hashing_slow(self):
+ for algorithm, constructors in self.constructors_to_test.items():
+ is_shake = algorithm in self.shakes
+ for constructor in constructors:
+ with self.subTest(constructor.__name__, is_shake=is_shake):
+ self.do_test_threaded_hashing(constructor, is_shake)
+
+ def do_test_threaded_hashing(self, constructor, is_shake):
# Updating the same hash object from several threads at once
# using data chunk sizes containing the same byte sequences.
#
# If the internal locks are working to prevent multiple
# updates on the same object from running at once, the resulting
# hash will be the same as doing it single threaded upfront.
- hasher = hashlib.sha1()
- num_threads = 5
- smallest_data = b'swineflu'
- data = smallest_data * 200000
- expected_hash = hashlib.sha1(data*num_threads).hexdigest()
-
- def hash_in_chunks(chunk_size):
- index = 0
- while index < len(data):
- hasher.update(data[index:index + chunk_size])
- index += chunk_size
+
+ # The data to hash has length s|M|q^N and the chunk size for the i-th
+ # thread is s|M|q^(N-i), where N is the number of threads, M is a fixed
+ # message of small length, and s >= 1 and q >= 2 are small integers.
+ smallest_size, num_threads, s, q = 8, 5, 2, 10
+
+ smallest_data = os.urandom(smallest_size)
+ data = s * smallest_data * (q ** num_threads)
+
+ h1 = constructor(usedforsecurity=False)
+ h2 = constructor(data * num_threads, usedforsecurity=False)
+
+ def update(chunk_size):
+ for index in range(0, len(data), chunk_size):
+ h1.update(data[index:index + chunk_size])
threads = []
- for threadnum in range(num_threads):
- chunk_size = len(data) // (10 ** threadnum)
+ for thread_num in range(num_threads):
+ # chunk_size = len(data) // (q ** thread_num)
+ chunk_size = s * smallest_size * q ** (num_threads - thread_num)
self.assertGreater(chunk_size, 0)
- self.assertEqual(chunk_size % len(smallest_data), 0)
- thread = threading.Thread(target=hash_in_chunks,
- args=(chunk_size,))
+ self.assertEqual(chunk_size % smallest_size, 0)
+ thread = threading.Thread(target=update, args=(chunk_size,))
threads.append(thread)
for thread in threads:
@@ -979,7 +1125,10 @@ class HashLibTestCase(unittest.TestCase):
for thread in threads:
thread.join()
- self.assertEqual(expected_hash, hasher.hexdigest())
+ if is_shake:
+ self.assertEqual(h1.hexdigest(16), h2.hexdigest(16))
+ else:
+ self.assertEqual(h1.hexdigest(), h2.hexdigest())
def test_get_fips_mode(self):
fips_mode = self.is_fips_mode