diff options
Diffstat (limited to 'Lib/test')
48 files changed, 1934 insertions, 735 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 1b690cb88bf..6a20a1eb03e 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -513,9 +513,14 @@ class _TestProcess(BaseTestCase): time.sleep(100) @classmethod - def _sleep_no_int_handler(cls): + def _sleep_some_event(cls, event): + event.set() + time.sleep(100) + + @classmethod + def _sleep_no_int_handler(cls, event): signal.signal(signal.SIGINT, signal.SIG_DFL) - cls._sleep_some() + cls._sleep_some_event(event) @classmethod def _test_sleep(cls, delay): @@ -525,7 +530,10 @@ class _TestProcess(BaseTestCase): if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) - p = self.Process(target=target or self._sleep_some) + event = self.Event() + if not target: + target = self._sleep_some_event + p = self.Process(target=target, args=(event,)) p.daemon = True p.start() @@ -543,8 +551,11 @@ class _TestProcess(BaseTestCase): self.assertTimingAlmostEqual(join.elapsed, 0.0) self.assertEqual(p.is_alive(), True) - # XXX maybe terminating too soon causes the problems on Gentoo... - time.sleep(1) + timeout = support.SHORT_TIMEOUT + if not event.wait(timeout): + p.terminate() + p.join() + self.fail(f"event not signaled in {timeout} seconds") meth(p) diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py index d1882a310bb..345698cfb5f 100644 --- a/Lib/test/datetimetester.py +++ b/Lib/test/datetimetester.py @@ -3571,6 +3571,10 @@ class TestDateTime(TestDate): '2009-04-19T12:30:45.400 +02:30', # Space between ms and timezone (gh-130959) '2009-04-19T12:30:45.400 ', # Trailing space (gh-130959) '2009-04-19T12:30:45. 400', # Space before fraction (gh-130959) + '2009-04-19T12:30:45+00:90:00', # Time zone field out from range + '2009-04-19T12:30:45+00:00:90', # Time zone field out from range + '2009-04-19T12:30:45-00:90:00', # Time zone field out from range + '2009-04-19T12:30:45-00:00:90', # Time zone field out from range ] for bad_str in bad_strs: @@ -4795,6 +4799,11 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase): '12:30:45.400 +02:30', # Space between ms and timezone (gh-130959) '12:30:45.400 ', # Trailing space (gh-130959) '12:30:45. 400', # Space before fraction (gh-130959) + '24:00:00.000001', # Has non-zero microseconds on 24:00 + '24:00:01.000000', # Has non-zero seconds on 24:00 + '24:01:00.000000', # Has non-zero minutes on 24:00 + '12:30:45+00:90:00', # Time zone field out from range + '12:30:45+00:00:90', # Time zone field out from range ] for bad_str in bad_strs: diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index 009e04e9c0b..d64b2b9fe28 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -124,6 +124,11 @@ class BaseLockTests(BaseTestCase): lock = self.locktype() del lock + def test_constructor_noargs(self): + self.assertRaises(TypeError, self.locktype, 1) + self.assertRaises(TypeError, self.locktype, x=1) + self.assertRaises(TypeError, self.locktype, 1, x=2) + def test_repr(self): lock = self.locktype() self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>") diff --git a/Lib/test/pythoninfo.py b/Lib/test/pythoninfo.py index 682815c3fdd..e1830f2e6eb 100644 --- a/Lib/test/pythoninfo.py +++ b/Lib/test/pythoninfo.py @@ -658,6 +658,16 @@ def collect_zlib(info_add): copy_attributes(info_add, zlib, 'zlib.%s', attributes) +def collect_zstd(info_add): + try: + import _zstd + except ImportError: + return + + attributes = ('zstd_version',) + copy_attributes(info_add, _zstd, 'zstd.%s', attributes) + + def collect_expat(info_add): try: from xml.parsers import expat @@ -1051,6 +1061,7 @@ def collect_info(info): collect_tkinter, collect_windows, collect_zlib, + collect_zstd, collect_libregrtest_utils, # Collecting from tests should be last as they have side effects. diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index c74c3a31909..9b6e80fdad9 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -696,9 +696,11 @@ def sortdict(dict): return "{%s}" % withcommas -def run_code(code: str) -> dict[str, object]: +def run_code(code: str, extra_names: dict[str, object] | None = None) -> dict[str, object]: """Run a piece of code after dedenting it, and return its global namespace.""" ns = {} + if extra_names: + ns.update(extra_names) exec(textwrap.dedent(code), ns) return ns diff --git a/Lib/test/support/hashlib_helper.py b/Lib/test/support/hashlib_helper.py index 5043f08dd93..7032257b068 100644 --- a/Lib/test/support/hashlib_helper.py +++ b/Lib/test/support/hashlib_helper.py @@ -23,6 +23,22 @@ def requires_builtin_hmac(): return unittest.skipIf(_hmac is None, "requires _hmac") +def _missing_hash(digestname, implementation=None, *, exc=None): + parts = ["missing", implementation, f"hash algorithm: {digestname!r}"] + msg = " ".join(filter(None, parts)) + raise unittest.SkipTest(msg) from exc + + +def _openssl_availabillity(digestname, *, usedforsecurity): + try: + _hashlib.new(digestname, usedforsecurity=usedforsecurity) + except AttributeError: + assert _hashlib is None + _missing_hash(digestname, "OpenSSL") + except ValueError as exc: + _missing_hash(digestname, "OpenSSL", exc=exc) + + def _decorate_func_or_class(func_or_class, decorator_func): if not isinstance(func_or_class, type): return decorator_func(func_or_class) @@ -71,8 +87,7 @@ def requires_hashdigest(digestname, openssl=None, usedforsecurity=True): try: test_availability() except ValueError as exc: - msg = f"missing hash algorithm: {digestname!r}" - raise unittest.SkipTest(msg) from exc + _missing_hash(digestname, exc=exc) return func(*args, **kwargs) return wrapper @@ -87,14 +102,44 @@ def requires_openssl_hashdigest(digestname, *, usedforsecurity=True): The hashing algorithm may be missing or blocked by a strict crypto policy. """ def decorator_func(func): - @requires_hashlib() + @requires_hashlib() # avoid checking at each call @functools.wraps(func) def wrapper(*args, **kwargs): + _openssl_availabillity(digestname, usedforsecurity=usedforsecurity) + return func(*args, **kwargs) + return wrapper + + def decorator(func_or_class): + return _decorate_func_or_class(func_or_class, decorator_func) + return decorator + + +def find_openssl_hashdigest_constructor(digestname, *, usedforsecurity=True): + """Find the OpenSSL hash function constructor by its name.""" + assert isinstance(digestname, str), digestname + _openssl_availabillity(digestname, usedforsecurity=usedforsecurity) + # This returns a function of the form _hashlib.openssl_<name> and + # not a lambda function as it is rejected by _hashlib.hmac_new(). + return getattr(_hashlib, f"openssl_{digestname}") + + +def requires_builtin_hashdigest( + module_name, digestname, *, usedforsecurity=True +): + """Decorator raising SkipTest if a HACL* hashing algorithm is missing. + + - The *module_name* is the C extension module name based on HACL*. + - The *digestname* is one of its member, e.g., 'md5'. + """ + def decorator_func(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + module = import_module(module_name) try: - _hashlib.new(digestname, usedforsecurity=usedforsecurity) - except ValueError: - msg = f"missing OpenSSL hash algorithm: {digestname!r}" - raise unittest.SkipTest(msg) + getattr(module, digestname) + except AttributeError: + fullname = f'{module_name}.{digestname}' + _missing_hash(fullname, implementation="HACL") return func(*args, **kwargs) return wrapper @@ -103,6 +148,168 @@ def requires_openssl_hashdigest(digestname, *, usedforsecurity=True): return decorator +def find_builtin_hashdigest_constructor( + module_name, digestname, *, usedforsecurity=True +): + """Find the HACL* hash function constructor. + + - The *module_name* is the C extension module name based on HACL*. + - The *digestname* is one of its member, e.g., 'md5'. + """ + module = import_module(module_name) + try: + constructor = getattr(module, digestname) + constructor(b'', usedforsecurity=usedforsecurity) + except (AttributeError, TypeError, ValueError): + _missing_hash(f'{module_name}.{digestname}', implementation="HACL") + return constructor + + +class HashFunctionsTrait: + """Mixin trait class containing hash functions. + + This class is assumed to have all unitest.TestCase methods but should + not directly inherit from it to prevent the test suite being run on it. + + Subclasses should implement the hash functions by returning an object + that can be recognized as a valid digestmod parameter for both hashlib + and HMAC. In particular, it cannot be a lambda function as it will not + be recognized by hashlib (it will still be accepted by the pure Python + implementation of HMAC). + """ + + ALGORITHMS = [ + 'md5', 'sha1', + 'sha224', 'sha256', 'sha384', 'sha512', + 'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512', + ] + + # Default 'usedforsecurity' to use when looking up a hash function. + usedforsecurity = True + + def _find_constructor(self, name): + # By default, a missing algorithm skips the test that uses it. + self.assertIn(name, self.ALGORITHMS) + self.skipTest(f"missing hash function: {name}") + + @property + def md5(self): + return self._find_constructor("md5") + + @property + def sha1(self): + return self._find_constructor("sha1") + + @property + def sha224(self): + return self._find_constructor("sha224") + + @property + def sha256(self): + return self._find_constructor("sha256") + + @property + def sha384(self): + return self._find_constructor("sha384") + + @property + def sha512(self): + return self._find_constructor("sha512") + + @property + def sha3_224(self): + return self._find_constructor("sha3_224") + + @property + def sha3_256(self): + return self._find_constructor("sha3_256") + + @property + def sha3_384(self): + return self._find_constructor("sha3_384") + + @property + def sha3_512(self): + return self._find_constructor("sha3_512") + + +class NamedHashFunctionsTrait(HashFunctionsTrait): + """Trait containing named hash functions. + + Hash functions are available if and only if they are available in hashlib. + """ + + def _find_constructor(self, name): + self.assertIn(name, self.ALGORITHMS) + return name + + +class OpenSSLHashFunctionsTrait(HashFunctionsTrait): + """Trait containing OpenSSL hash functions. + + Hash functions are available if and only if they are available in _hashlib. + """ + + def _find_constructor(self, name): + self.assertIn(name, self.ALGORITHMS) + return find_openssl_hashdigest_constructor( + name, usedforsecurity=self.usedforsecurity + ) + + +class BuiltinHashFunctionsTrait(HashFunctionsTrait): + """Trait containing HACL* hash functions. + + Hash functions are available if and only if they are available in C. + In particular, HACL* HMAC-MD5 may be available even though HACL* md5 + is not since the former is unconditionally built. + """ + + def _find_constructor_in(self, module, name): + self.assertIn(name, self.ALGORITHMS) + return find_builtin_hashdigest_constructor(module, name) + + @property + def md5(self): + return self._find_constructor_in("_md5", "md5") + + @property + def sha1(self): + return self._find_constructor_in("_sha1", "sha1") + + @property + def sha224(self): + return self._find_constructor_in("_sha2", "sha224") + + @property + def sha256(self): + return self._find_constructor_in("_sha2", "sha256") + + @property + def sha384(self): + return self._find_constructor_in("_sha2", "sha384") + + @property + def sha512(self): + return self._find_constructor_in("_sha2", "sha512") + + @property + def sha3_224(self): + return self._find_constructor_in("_sha3", "sha3_224") + + @property + def sha3_256(self): + return self._find_constructor_in("_sha3","sha3_256") + + @property + def sha3_384(self): + return self._find_constructor_in("_sha3","sha3_384") + + @property + def sha3_512(self): + return self._find_constructor_in("_sha3","sha3_512") + + def find_gil_minsize(modules_names, default=2048): """Get the largest GIL_MINSIZE value for the given cryptographic modules. diff --git a/Lib/test/support/strace_helper.py b/Lib/test/support/strace_helper.py index 1a9d2b520b7..cf95f7bdc7d 100644 --- a/Lib/test/support/strace_helper.py +++ b/Lib/test/support/strace_helper.py @@ -38,7 +38,7 @@ class StraceResult: This assumes the program under inspection doesn't print any non-utf8 strings which would mix into the strace output.""" - decoded_events = self.event_bytes.decode('utf-8') + decoded_events = self.event_bytes.decode('utf-8', 'surrogateescape') matches = [ _syscall_regex.match(event) for event in decoded_events.splitlines() diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py index 5a6be1180c1..58853ba4eb3 100644 --- a/Lib/test/test_argparse.py +++ b/Lib/test/test_argparse.py @@ -5469,11 +5469,60 @@ class TestHelpMetavarTypeFormatter(HelpTestCase): version = '' -class TestHelpUsageLongSubparserCommand(TestCase): - """Test that subparser commands are formatted correctly in help""" +class TestHelpCustomHelpFormatter(TestCase): maxDiff = None - def test_parent_help(self): + def test_custom_formatter_function(self): + def custom_formatter(prog): + return argparse.RawTextHelpFormatter(prog, indent_increment=5) + + parser = argparse.ArgumentParser( + prog='PROG', + prefix_chars='-+', + formatter_class=custom_formatter + ) + parser.add_argument('+f', '++foo', help="foo help") + parser.add_argument('spam', help="spam help") + + parser_help = parser.format_help() + self.assertEqual(parser_help, textwrap.dedent('''\ + usage: PROG [-h] [+f FOO] spam + + positional arguments: + spam spam help + + options: + -h, --help show this help message and exit + +f, ++foo FOO foo help + ''')) + + def test_custom_formatter_class(self): + class CustomFormatter(argparse.RawTextHelpFormatter): + def __init__(self, prog): + super().__init__(prog, indent_increment=5) + + parser = argparse.ArgumentParser( + prog='PROG', + prefix_chars='-+', + formatter_class=CustomFormatter + ) + parser.add_argument('+f', '++foo', help="foo help") + parser.add_argument('spam', help="spam help") + + parser_help = parser.format_help() + self.assertEqual(parser_help, textwrap.dedent('''\ + usage: PROG [-h] [+f FOO] spam + + positional arguments: + spam spam help + + options: + -h, --help show this help message and exit + +f, ++foo FOO foo help + ''')) + + def test_usage_long_subparser_command(self): + """Test that subparser commands are formatted correctly in help""" def custom_formatter(prog): return argparse.RawTextHelpFormatter(prog, max_help_position=50) @@ -6973,7 +7022,7 @@ class TestProgName(TestCase): def check_usage(self, expected, *args, **kwargs): res = script_helper.assert_python_ok('-Xutf8', *args, '-h', **kwargs) - self.assertEqual(res.out.splitlines()[0].decode(), + self.assertEqual(os.fsdecode(res.out.splitlines()[0]), f'usage: {expected} [-h]') def test_script(self, compiled=False): @@ -7053,6 +7102,7 @@ class TestTranslations(TestTranslationsBase): class TestColorized(TestCase): + maxDiff = None def setUp(self): super().setUp() @@ -7211,6 +7261,79 @@ class TestColorized(TestCase): ), ) + def test_custom_formatter_function(self): + def custom_formatter(prog): + return argparse.RawTextHelpFormatter(prog, indent_increment=5) + + parser = argparse.ArgumentParser( + prog="PROG", + prefix_chars="-+", + formatter_class=custom_formatter, + color=True, + ) + parser.add_argument('+f', '++foo', help="foo help") + parser.add_argument('spam', help="spam help") + + prog = self.theme.prog + heading = self.theme.heading + short = self.theme.summary_short_option + label = self.theme.summary_label + pos = self.theme.summary_action + long_b = self.theme.long_option + short_b = self.theme.short_option + label_b = self.theme.label + pos_b = self.theme.action + reset = self.theme.reset + + parser_help = parser.format_help() + self.assertEqual(parser_help, textwrap.dedent(f'''\ + {heading}usage: {reset}{prog}PROG{reset} [{short}-h{reset}] [{short}+f {label}FOO{reset}] {pos}spam{reset} + + {heading}positional arguments:{reset} + {pos_b}spam{reset} spam help + + {heading}options:{reset} + {short_b}-h{reset}, {long_b}--help{reset} show this help message and exit + {short_b}+f{reset}, {long_b}++foo{reset} {label_b}FOO{reset} foo help + ''')) + + def test_custom_formatter_class(self): + class CustomFormatter(argparse.RawTextHelpFormatter): + def __init__(self, prog): + super().__init__(prog, indent_increment=5) + + parser = argparse.ArgumentParser( + prog="PROG", + prefix_chars="-+", + formatter_class=CustomFormatter, + color=True, + ) + parser.add_argument('+f', '++foo', help="foo help") + parser.add_argument('spam', help="spam help") + + prog = self.theme.prog + heading = self.theme.heading + short = self.theme.summary_short_option + label = self.theme.summary_label + pos = self.theme.summary_action + long_b = self.theme.long_option + short_b = self.theme.short_option + label_b = self.theme.label + pos_b = self.theme.action + reset = self.theme.reset + + parser_help = parser.format_help() + self.assertEqual(parser_help, textwrap.dedent(f'''\ + {heading}usage: {reset}{prog}PROG{reset} [{short}-h{reset}] [{short}+f {label}FOO{reset}] {pos}spam{reset} + + {heading}positional arguments:{reset} + {pos_b}spam{reset} spam help + + {heading}options:{reset} + {short_b}-h{reset}, {long_b}--help{reset} show this help message and exit + {short_b}+f{reset}, {long_b}++foo{reset} {label_b}FOO{reset} foo help + ''')) + def tearDownModule(): # Remove global references to avoid looking like we have refleaks. diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index 8b51522278a..39bef465bdb 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -413,7 +413,7 @@ class BaseFutureTests: def test_copy_state(self): from asyncio.futures import _copy_future_state - f = self._new_future(loop=self.loop) + f = concurrent.futures.Future() f.set_result(10) newf = self._new_future(loop=self.loop) @@ -421,7 +421,7 @@ class BaseFutureTests: self.assertTrue(newf.done()) self.assertEqual(newf.result(), 10) - f_exception = self._new_future(loop=self.loop) + f_exception = concurrent.futures.Future() f_exception.set_exception(RuntimeError()) newf_exception = self._new_future(loop=self.loop) @@ -429,7 +429,7 @@ class BaseFutureTests: self.assertTrue(newf_exception.done()) self.assertRaises(RuntimeError, newf_exception.result) - f_cancelled = self._new_future(loop=self.loop) + f_cancelled = concurrent.futures.Future() f_cancelled.cancel() newf_cancelled = self._new_future(loop=self.loop) @@ -441,7 +441,7 @@ class BaseFutureTests: except BaseException as e: f_exc = e - f_conexc = self._new_future(loop=self.loop) + f_conexc = concurrent.futures.Future() f_conexc.set_exception(f_exc) newf_conexc = self._new_future(loop=self.loop) @@ -454,6 +454,56 @@ class BaseFutureTests: newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__)) self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1) + def test_copy_state_from_concurrent_futures(self): + """Test _copy_future_state from concurrent.futures.Future. + + This tests the optimized path using _get_snapshot when available. + """ + from asyncio.futures import _copy_future_state + + # Test with a result + f_concurrent = concurrent.futures.Future() + f_concurrent.set_result(42) + f_asyncio = self._new_future(loop=self.loop) + _copy_future_state(f_concurrent, f_asyncio) + self.assertTrue(f_asyncio.done()) + self.assertEqual(f_asyncio.result(), 42) + + # Test with an exception + f_concurrent_exc = concurrent.futures.Future() + f_concurrent_exc.set_exception(ValueError("test exception")) + f_asyncio_exc = self._new_future(loop=self.loop) + _copy_future_state(f_concurrent_exc, f_asyncio_exc) + self.assertTrue(f_asyncio_exc.done()) + with self.assertRaises(ValueError) as cm: + f_asyncio_exc.result() + self.assertEqual(str(cm.exception), "test exception") + + # Test with cancelled state + f_concurrent_cancelled = concurrent.futures.Future() + f_concurrent_cancelled.cancel() + f_asyncio_cancelled = self._new_future(loop=self.loop) + _copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled) + self.assertTrue(f_asyncio_cancelled.cancelled()) + + # Test that destination already cancelled prevents copy + f_concurrent_result = concurrent.futures.Future() + f_concurrent_result.set_result(10) + f_asyncio_precancelled = self._new_future(loop=self.loop) + f_asyncio_precancelled.cancel() + _copy_future_state(f_concurrent_result, f_asyncio_precancelled) + self.assertTrue(f_asyncio_precancelled.cancelled()) + + # Test exception type conversion + f_concurrent_invalid = concurrent.futures.Future() + f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid")) + f_asyncio_invalid = self._new_future(loop=self.loop) + _copy_future_state(f_concurrent_invalid, f_asyncio_invalid) + self.assertTrue(f_asyncio_invalid.done()) + with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm: + f_asyncio_invalid.result() + self.assertEqual(str(cm.exception), "invalid") + def test_iter(self): fut = self._new_future(loop=self.loop) diff --git a/Lib/test/test_asyncio/test_tools.py b/Lib/test/test_asyncio/test_tools.py index 0413e236c27..ba36e759ccd 100644 --- a/Lib/test/test_asyncio/test_tools.py +++ b/Lib/test/test_asyncio/test_tools.py @@ -791,21 +791,21 @@ class TestAsyncioToolsBasic(unittest.TestCase): class TestAsyncioToolsEdgeCases(unittest.TestCase): def test_task_awaits_self(self): - """A task directly awaits itself – should raise a cycle.""" + """A task directly awaits itself - should raise a cycle.""" input_ = [(1, [(1, "Self-Awaiter", [[["loopback"], 1]])])] with self.assertRaises(tools.CycleFoundException) as ctx: tools.build_async_tree(input_) self.assertIn([1, 1], ctx.exception.cycles) def test_task_with_missing_awaiter_id(self): - """Awaiter ID not in task list – should not crash, just show 'Unknown'.""" + """Awaiter ID not in task list - should not crash, just show 'Unknown'.""" input_ = [(1, [(1, "Task-A", [[["coro"], 999]])])] # 999 not defined table = tools.build_task_table(input_) self.assertEqual(len(table), 1) self.assertEqual(table[0][4], "Unknown") def test_duplicate_coroutine_frames(self): - """Same coroutine frame repeated under a parent – should deduplicate.""" + """Same coroutine frame repeated under a parent - should deduplicate.""" input_ = [ ( 1, @@ -829,7 +829,7 @@ class TestAsyncioToolsEdgeCases(unittest.TestCase): self.assertIn("Task-1", flat) def test_task_with_no_name(self): - """Task with no name in id2name – should still render with fallback.""" + """Task with no name in id2name - should still render with fallback.""" input_ = [(1, [(1, "root", [[["f1"], 2]]), (2, None, [])])] # If name is None, fallback to string should not crash tree = tools.build_async_tree(input_) diff --git a/Lib/test/test_capi/test_opt.py b/Lib/test/test_capi/test_opt.py index 651148336f7..f7a4200e1ee 100644 --- a/Lib/test/test_capi/test_opt.py +++ b/Lib/test/test_capi/test_opt.py @@ -1955,9 +1955,143 @@ class TestUopsOptimization(unittest.TestCase): self.assertEqual(res, TIER2_THRESHOLD) self.assertIsNotNone(ex) uops = get_opnames(ex) - self.assertIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_CALL_ISINSTANCE", uops) self.assertNotIn("_GUARD_THIRD_NULL", uops) self.assertNotIn("_GUARD_CALLABLE_ISINSTANCE", uops) + self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops) + + def test_call_list_append(self): + def testfunc(n): + a = [] + for i in range(n): + a.append(i) + return sum(a) + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, sum(range(TIER2_THRESHOLD))) + uops = get_opnames(ex) + self.assertIn("_CALL_LIST_APPEND", uops) + # We should remove these in the future + self.assertIn("_GUARD_NOS_LIST", uops) + self.assertIn("_GUARD_CALLABLE_LIST_APPEND", uops) + + def test_call_isinstance_is_true(self): + def testfunc(n): + x = 0 + for _ in range(n): + y = isinstance(42, int) + if y: + x += 1 + return x + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD) + self.assertIsNotNone(ex) + uops = get_opnames(ex) + self.assertNotIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_TO_BOOL_BOOL", uops) + self.assertNotIn("_GUARD_IS_TRUE_POP", uops) + self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops) + + def test_call_isinstance_is_false(self): + def testfunc(n): + x = 0 + for _ in range(n): + y = isinstance(42, str) + if not y: + x += 1 + return x + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD) + self.assertIsNotNone(ex) + uops = get_opnames(ex) + self.assertNotIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_TO_BOOL_BOOL", uops) + self.assertNotIn("_GUARD_IS_FALSE_POP", uops) + self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops) + + def test_call_isinstance_subclass(self): + def testfunc(n): + x = 0 + for _ in range(n): + y = isinstance(True, int) + if y: + x += 1 + return x + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD) + self.assertIsNotNone(ex) + uops = get_opnames(ex) + self.assertNotIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_TO_BOOL_BOOL", uops) + self.assertNotIn("_GUARD_IS_TRUE_POP", uops) + self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops) + + def test_call_isinstance_unknown_object(self): + def testfunc(n): + x = 0 + for _ in range(n): + # The optimizer doesn't know the return type here: + bar = eval("42") + # This will only narrow to bool: + y = isinstance(bar, int) + if y: + x += 1 + return x + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD) + self.assertIsNotNone(ex) + uops = get_opnames(ex) + self.assertIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_TO_BOOL_BOOL", uops) + self.assertIn("_GUARD_IS_TRUE_POP", uops) + + def test_call_isinstance_tuple_of_classes(self): + def testfunc(n): + x = 0 + for _ in range(n): + # A tuple of classes is currently not optimized, + # so this is only narrowed to bool: + y = isinstance(42, (int, str)) + if y: + x += 1 + return x + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD) + self.assertIsNotNone(ex) + uops = get_opnames(ex) + self.assertIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_TO_BOOL_BOOL", uops) + self.assertIn("_GUARD_IS_TRUE_POP", uops) + + def test_call_isinstance_metaclass(self): + class EvenNumberMeta(type): + def __instancecheck__(self, number): + return number % 2 == 0 + + class EvenNumber(metaclass=EvenNumberMeta): + pass + + def testfunc(n): + x = 0 + for _ in range(n): + # Only narrowed to bool + y = isinstance(42, EvenNumber) + if y: + x += 1 + return x + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD) + self.assertIsNotNone(ex) + uops = get_opnames(ex) + self.assertIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_TO_BOOL_BOOL", uops) + self.assertIn("_GUARD_IS_TRUE_POP", uops) def global_identity(x): diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py index 86e5e5c1474..a767f67a02c 100644 --- a/Lib/test/test_codeccallbacks.py +++ b/Lib/test/test_codeccallbacks.py @@ -2,6 +2,7 @@ from _codecs import _unregister_error as _codecs_unregister_error import codecs import html.entities import itertools +import re import sys import unicodedata import unittest @@ -1125,7 +1126,7 @@ class CodecCallbackTest(unittest.TestCase): text = 'abc<def>ghi'*n text.translate(charmap) - def test_mutatingdecodehandler(self): + def test_mutating_decode_handler(self): baddata = [ ("ascii", b"\xff"), ("utf-7", b"++"), @@ -1160,6 +1161,42 @@ class CodecCallbackTest(unittest.TestCase): for (encoding, data) in baddata: self.assertEqual(data.decode(encoding, "test.mutating"), "\u4242") + def test_mutating_decode_handler_unicode_escape(self): + decode = codecs.unicode_escape_decode + def mutating(exc): + if isinstance(exc, UnicodeDecodeError): + r = data.get(exc.object[:exc.end]) + if r is not None: + exc.object = r[0] + exc.object[exc.end:] + return ('\u0404', r[1]) + raise AssertionError("don't know how to handle %r" % exc) + + codecs.register_error('test.mutating2', mutating) + data = { + br'\x0': (b'\\', 0), + br'\x3': (b'xxx\\', 3), + br'\x5': (b'x\\', 1), + } + def check(input, expected, msg): + with self.assertWarns(DeprecationWarning) as cm: + self.assertEqual(decode(input, 'test.mutating2'), (expected, len(input))) + self.assertIn(msg, str(cm.warning)) + + check(br'\x0n\z', '\u0404\n\\z', r'"\z" is an invalid escape sequence') + check(br'\x0n\501', '\u0404\n\u0141', r'"\501" is an invalid octal escape sequence') + check(br'\x0z', '\u0404\\z', r'"\z" is an invalid escape sequence') + + check(br'\x3n\zr', '\u0404\n\\zr', r'"\z" is an invalid escape sequence') + check(br'\x3zr', '\u0404\\zr', r'"\z" is an invalid escape sequence') + check(br'\x3z5', '\u0404\\z5', r'"\z" is an invalid escape sequence') + check(memoryview(br'\x3z5x')[:-1], '\u0404\\z5', r'"\z" is an invalid escape sequence') + check(memoryview(br'\x3z5xy')[:-2], '\u0404\\z5', r'"\z" is an invalid escape sequence') + + check(br'\x5n\z', '\u0404\n\\z', r'"\z" is an invalid escape sequence') + check(br'\x5n\501', '\u0404\n\u0141', r'"\501" is an invalid octal escape sequence') + check(br'\x5z', '\u0404\\z', r'"\z" is an invalid escape sequence') + check(memoryview(br'\x5zy')[:-1], '\u0404\\z', r'"\z" is an invalid escape sequence') + # issue32583 def test_crashing_decode_handler(self): # better generating one more character to fill the extra space slot diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 94fcf98e757..d42270da15e 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1196,23 +1196,39 @@ class EscapeDecodeTest(unittest.TestCase): check(br"[\1010]", b"[A0]") check(br"[\x41]", b"[A]") check(br"[\x410]", b"[A0]") + + def test_warnings(self): + decode = codecs.escape_decode + check = coding_checker(self, decode) for i in range(97, 123): b = bytes([i]) if b not in b'abfnrtvx': - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\%c" is an invalid escape sequence' % i): check(b"\\" + b, b"\\" + b) - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\%c" is an invalid escape sequence' % (i-32)): check(b"\\" + b.upper(), b"\\" + b.upper()) - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\8" is an invalid escape sequence'): check(br"\8", b"\\8") with self.assertWarns(DeprecationWarning): check(br"\9", b"\\9") - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\\xfa" is an invalid escape sequence') as cm: check(b"\\\xfa", b"\\\xfa") for i in range(0o400, 0o1000): - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\%o" is an invalid octal escape sequence' % i): check(rb'\%o' % i, bytes([i & 0o377])) + with self.assertWarnsRegex(DeprecationWarning, + r'"\\z" is an invalid escape sequence'): + self.assertEqual(decode(br'\x\z', 'ignore'), (b'\\z', 4)) + with self.assertWarnsRegex(DeprecationWarning, + r'"\\501" is an invalid octal escape sequence'): + self.assertEqual(decode(br'\x\501', 'ignore'), (b'A', 6)) + def test_errors(self): decode = codecs.escape_decode self.assertRaises(ValueError, decode, br"\x") @@ -2661,24 +2677,40 @@ class UnicodeEscapeTest(ReadTest, unittest.TestCase): check(br"[\x410]", "[A0]") check(br"\u20ac", "\u20ac") check(br"\U0001d120", "\U0001d120") + + def test_decode_warnings(self): + decode = codecs.unicode_escape_decode + check = coding_checker(self, decode) for i in range(97, 123): b = bytes([i]) if b not in b'abfnrtuvx': - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\%c" is an invalid escape sequence' % i): check(b"\\" + b, "\\" + chr(i)) if b.upper() not in b'UN': - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\%c" is an invalid escape sequence' % (i-32)): check(b"\\" + b.upper(), "\\" + chr(i-32)) - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\8" is an invalid escape sequence'): check(br"\8", "\\8") with self.assertWarns(DeprecationWarning): check(br"\9", "\\9") - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\\xfa" is an invalid escape sequence') as cm: check(b"\\\xfa", "\\\xfa") for i in range(0o400, 0o1000): - with self.assertWarns(DeprecationWarning): + with self.assertWarnsRegex(DeprecationWarning, + r'"\\%o" is an invalid octal escape sequence' % i): check(rb'\%o' % i, chr(i)) + with self.assertWarnsRegex(DeprecationWarning, + r'"\\z" is an invalid escape sequence'): + self.assertEqual(decode(br'\x\z', 'ignore'), ('\\z', 4)) + with self.assertWarnsRegex(DeprecationWarning, + r'"\\501" is an invalid octal escape sequence'): + self.assertEqual(decode(br'\x\501', 'ignore'), ('\u0141', 6)) + def test_decode_errors(self): decode = codecs.unicode_escape_decode for c, d in (b'x', 2), (b'u', 4), (b'U', 4): diff --git a/Lib/test/test_concurrent_futures/test_future.py b/Lib/test/test_concurrent_futures/test_future.py index 4066ea1ee4b..06b11a3bacf 100644 --- a/Lib/test/test_concurrent_futures/test_future.py +++ b/Lib/test/test_concurrent_futures/test_future.py @@ -6,6 +6,7 @@ from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) from test import support +from test.support import threading_helper from .util import ( PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE, @@ -282,6 +283,62 @@ class FutureTests(BaseTestCase): self.assertEqual(f.exception(), e) + def test_get_snapshot(self): + """Test the _get_snapshot method for atomic state retrieval.""" + # Test with a pending future + f = Future() + done, cancelled, result, exception = f._get_snapshot() + self.assertFalse(done) + self.assertFalse(cancelled) + self.assertIsNone(result) + self.assertIsNone(exception) + + # Test with a finished future (successful result) + f = Future() + f.set_result(42) + done, cancelled, result, exception = f._get_snapshot() + self.assertTrue(done) + self.assertFalse(cancelled) + self.assertEqual(result, 42) + self.assertIsNone(exception) + + # Test with a finished future (exception) + f = Future() + exc = ValueError("test error") + f.set_exception(exc) + done, cancelled, result, exception = f._get_snapshot() + self.assertTrue(done) + self.assertFalse(cancelled) + self.assertIsNone(result) + self.assertIs(exception, exc) + + # Test with a cancelled future + f = Future() + f.cancel() + done, cancelled, result, exception = f._get_snapshot() + self.assertTrue(done) + self.assertTrue(cancelled) + self.assertIsNone(result) + self.assertIsNone(exception) + + # Test concurrent access (basic thread safety check) + f = Future() + f.set_result(100) + results = [] + + def get_snapshot(): + for _ in range(1000): + snapshot = f._get_snapshot() + results.append(snapshot) + + threads = [threading.Thread(target=get_snapshot) for _ in range(4)] + with threading_helper.start_threads(threads): + pass + # All snapshots should be identical for a finished future + expected = (True, False, 100, None) + for result in results: + self.assertEqual(result, expected) + def setUpModule(): setup_module() diff --git a/Lib/test/test_crossinterp.py b/Lib/test/test_crossinterp.py index b366a29645e..cddacbc9970 100644 --- a/Lib/test/test_crossinterp.py +++ b/Lib/test/test_crossinterp.py @@ -758,6 +758,40 @@ class CodeTests(_GetXIDataTests): ]) +class ShareableFuncTests(_GetXIDataTests): + + MODE = 'func' + + def test_stateless(self): + self.assert_roundtrip_not_equal([ + *defs.STATELESS_FUNCTIONS, + # Generators can be stateless too. + *defs.FUNCTION_LIKE, + ]) + + def test_not_stateless(self): + self.assert_not_shareable([ + *(f for f in defs.FUNCTIONS + if f not in defs.STATELESS_FUNCTIONS), + ]) + + def test_other_objects(self): + self.assert_not_shareable([ + None, + True, + False, + Ellipsis, + NotImplemented, + 9999, + 'spam', + b'spam', + (), + [], + {}, + object(), + ]) + + class PureShareableScriptTests(_GetXIDataTests): MODE = 'script-pure' diff --git a/Lib/test/test_curses.py b/Lib/test/test_curses.py index c307258e565..feca82681de 100644 --- a/Lib/test/test_curses.py +++ b/Lib/test/test_curses.py @@ -130,6 +130,9 @@ class TestCurses(unittest.TestCase): curses.use_env(False) curses.use_env(True) + def test_error(self): + self.assertIsSubclass(curses.error, Exception) + def test_create_windows(self): win = curses.newwin(5, 10) self.assertEqual(win.getbegyx(), (0, 0)) diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py index 371c63adce9..2fb963f52e5 100644 --- a/Lib/test/test_faulthandler.py +++ b/Lib/test/test_faulthandler.py @@ -166,29 +166,6 @@ class FaultHandlerTests(unittest.TestCase): fatal_error = 'Windows fatal exception: %s' % name_regex self.check_error(code, line_number, fatal_error, **kw) - @unittest.skipIf(sys.platform.startswith('aix'), - "the first page of memory is a mapped read-only on AIX") - def test_read_null(self): - if not MS_WINDOWS: - self.check_fatal_error(""" - import faulthandler - faulthandler.enable() - faulthandler._read_null() - """, - 3, - # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion - '(?:Segmentation fault' - '|Bus error' - '|Illegal instruction)') - else: - self.check_windows_exception(""" - import faulthandler - faulthandler.enable() - faulthandler._read_null() - """, - 3, - 'access violation') - @skip_segfault_on_android def test_sigsegv(self): self.check_fatal_error(""" diff --git a/Lib/test/test_fcntl.py b/Lib/test/test_fcntl.py index b84c98ef3a2..e0e6782258f 100644 --- a/Lib/test/test_fcntl.py +++ b/Lib/test/test_fcntl.py @@ -228,6 +228,52 @@ class TestFcntl(unittest.TestCase): os.close(test_pipe_r) os.close(test_pipe_w) + def _check_fcntl_not_mutate_len(self, nbytes=None): + self.f = open(TESTFN, 'wb') + buf = struct.pack('ii', fcntl.F_OWNER_PID, os.getpid()) + if nbytes is not None: + buf += b' ' * (nbytes - len(buf)) + else: + nbytes = len(buf) + save_buf = bytes(buf) + r = fcntl.fcntl(self.f, fcntl.F_SETOWN_EX, buf) + self.assertIsInstance(r, bytes) + self.assertEqual(len(r), len(save_buf)) + self.assertEqual(buf, save_buf) + type, pid = memoryview(r).cast('i')[:2] + self.assertEqual(type, fcntl.F_OWNER_PID) + self.assertEqual(pid, os.getpid()) + + buf = b' ' * nbytes + r = fcntl.fcntl(self.f, fcntl.F_GETOWN_EX, buf) + self.assertIsInstance(r, bytes) + self.assertEqual(len(r), len(save_buf)) + self.assertEqual(buf, b' ' * nbytes) + type, pid = memoryview(r).cast('i')[:2] + self.assertEqual(type, fcntl.F_OWNER_PID) + self.assertEqual(pid, os.getpid()) + + buf = memoryview(b' ' * nbytes) + r = fcntl.fcntl(self.f, fcntl.F_GETOWN_EX, buf) + self.assertIsInstance(r, bytes) + self.assertEqual(len(r), len(save_buf)) + self.assertEqual(bytes(buf), b' ' * nbytes) + type, pid = memoryview(r).cast('i')[:2] + self.assertEqual(type, fcntl.F_OWNER_PID) + self.assertEqual(pid, os.getpid()) + + @unittest.skipUnless( + hasattr(fcntl, "F_SETOWN_EX") and hasattr(fcntl, "F_GETOWN_EX"), + "requires F_SETOWN_EX and F_GETOWN_EX") + def test_fcntl_small_buffer(self): + self._check_fcntl_not_mutate_len() + + @unittest.skipUnless( + hasattr(fcntl, "F_SETOWN_EX") and hasattr(fcntl, "F_GETOWN_EX"), + "requires F_SETOWN_EX and F_GETOWN_EX") + def test_fcntl_large_buffer(self): + self._check_fcntl_not_mutate_len(2024) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py index 84faa636064..96b3f305194 100644 --- a/Lib/test/test_fractions.py +++ b/Lib/test/test_fractions.py @@ -1,7 +1,7 @@ """Tests for Lib/fractions.py.""" from decimal import Decimal -from test.support import requires_IEEE_754 +from test.support import requires_IEEE_754, adjust_int_max_str_digits import math import numbers import operator @@ -395,12 +395,14 @@ class FractionTest(unittest.TestCase): def testFromString(self): self.assertEqual((5, 1), _components(F("5"))) + self.assertEqual((5, 1), _components(F("005"))) self.assertEqual((3, 2), _components(F("3/2"))) self.assertEqual((3, 2), _components(F("3 / 2"))) self.assertEqual((3, 2), _components(F(" \n +3/2"))) self.assertEqual((-3, 2), _components(F("-3/2 "))) - self.assertEqual((13, 2), _components(F(" 013/02 \n "))) + self.assertEqual((13, 2), _components(F(" 0013/002 \n "))) self.assertEqual((16, 5), _components(F(" 3.2 "))) + self.assertEqual((16, 5), _components(F("003.2"))) self.assertEqual((-16, 5), _components(F(" -3.2 "))) self.assertEqual((-3, 1), _components(F(" -3. "))) self.assertEqual((3, 5), _components(F(" .6 "))) @@ -419,116 +421,102 @@ class FractionTest(unittest.TestCase): self.assertRaisesMessage( ZeroDivisionError, "Fraction(3, 0)", F, "3/0") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '3/'", - F, "3/") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '/2'", - F, "/2") - self.assertRaisesMessage( - # Denominators don't need a sign. - ValueError, "Invalid literal for Fraction: '3/+2'", - F, "3/+2") - self.assertRaisesMessage( - # Imitate float's parsing. - ValueError, "Invalid literal for Fraction: '+ 3/2'", - F, "+ 3/2") - self.assertRaisesMessage( - # Avoid treating '.' as a regex special character. - ValueError, "Invalid literal for Fraction: '3a2'", - F, "3a2") - self.assertRaisesMessage( - # Don't accept combinations of decimals and rationals. - ValueError, "Invalid literal for Fraction: '3/7.2'", - F, "3/7.2") - self.assertRaisesMessage( - # Don't accept combinations of decimals and rationals. - ValueError, "Invalid literal for Fraction: '3.2/7'", - F, "3.2/7") - self.assertRaisesMessage( - # Allow 3. and .3, but not . - ValueError, "Invalid literal for Fraction: '.'", - F, ".") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '_'", - F, "_") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '_1'", - F, "_1") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1__2'", - F, "1__2") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '/_'", - F, "/_") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1_/'", - F, "1_/") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '_1/'", - F, "_1/") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1__2/'", - F, "1__2/") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1/_'", - F, "1/_") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1/_1'", - F, "1/_1") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1/1__2'", - F, "1/1__2") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1._111'", - F, "1._111") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1.1__1'", - F, "1.1__1") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1.1e+_1'", - F, "1.1e+_1") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1.1e+1__1'", - F, "1.1e+1__1") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '123.dd'", - F, "123.dd") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '123.5_dd'", - F, "123.5_dd") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: 'dd.5'", - F, "dd.5") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '7_dd'", - F, "7_dd") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1/dd'", - F, "1/dd") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1/123_dd'", - F, "1/123_dd") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '789edd'", - F, "789edd") - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '789e2_dd'", - F, "789e2_dd") + + def check_invalid(s): + msg = "Invalid literal for Fraction: " + repr(s) + self.assertRaisesMessage(ValueError, msg, F, s) + + check_invalid("3/") + check_invalid("/2") + # Denominators don't need a sign. + check_invalid("3/+2") + check_invalid("3/-2") + # Imitate float's parsing. + check_invalid("+ 3/2") + check_invalid("- 3/2") + # Avoid treating '.' as a regex special character. + check_invalid("3a2") + # Don't accept combinations of decimals and rationals. + check_invalid("3/7.2") + check_invalid("3.2/7") + # No space around dot. + check_invalid("3 .2") + check_invalid("3. 2") + # No space around e. + check_invalid("3.2 e1") + check_invalid("3.2e 1") + # Fractional part don't need a sign. + check_invalid("3.+2") + check_invalid("3.-2") + # Only accept base 10. + check_invalid("0x10") + check_invalid("0x10/1") + check_invalid("1/0x10") + check_invalid("0x10.") + check_invalid("0x10.1") + check_invalid("1.0x10") + check_invalid("1.0e0x10") + # Only accept decimal digits. + check_invalid("³") + check_invalid("³/2") + check_invalid("3/²") + check_invalid("³.2") + check_invalid("3.²") + check_invalid("3.2e²") + check_invalid("¼") + # Allow 3. and .3, but not . + check_invalid(".") + check_invalid("_") + check_invalid("_1") + check_invalid("1__2") + check_invalid("/_") + check_invalid("1_/") + check_invalid("_1/") + check_invalid("1__2/") + check_invalid("1/_") + check_invalid("1/_1") + check_invalid("1/1__2") + check_invalid("1._111") + check_invalid("1.1__1") + check_invalid("1.1e+_1") + check_invalid("1.1e+1__1") + check_invalid("123.dd") + check_invalid("123.5_dd") + check_invalid("dd.5") + check_invalid("7_dd") + check_invalid("1/dd") + check_invalid("1/123_dd") + check_invalid("789edd") + check_invalid("789e2_dd") # Test catastrophic backtracking. val = "9"*50 + "_" - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '" + val + "'", - F, val) - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1/" + val + "'", - F, "1/" + val) - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1." + val + "'", - F, "1." + val) - self.assertRaisesMessage( - ValueError, "Invalid literal for Fraction: '1.1+e" + val + "'", - F, "1.1+e" + val) + check_invalid(val) + check_invalid("1/" + val) + check_invalid("1." + val) + check_invalid("." + val) + check_invalid("1.1+e" + val) + check_invalid("1.1e" + val) + + def test_limit_int(self): + maxdigits = 5000 + with adjust_int_max_str_digits(maxdigits): + msg = 'Exceeds the limit' + val = '1' * maxdigits + num = (10**maxdigits - 1)//9 + self.assertEqual((num, 1), _components(F(val))) + self.assertRaisesRegex(ValueError, msg, F, val + '1') + self.assertEqual((num, 2), _components(F(val + '/2'))) + self.assertRaisesRegex(ValueError, msg, F, val + '1/2') + self.assertEqual((1, num), _components(F('1/' + val))) + self.assertRaisesRegex(ValueError, msg, F, '1/1' + val) + self.assertEqual(((10**(maxdigits+1) - 1)//9, 10**maxdigits), + _components(F('1.' + val))) + self.assertRaisesRegex(ValueError, msg, F, '1.1' + val) + self.assertEqual((num, 10**maxdigits), _components(F('.' + val))) + self.assertRaisesRegex(ValueError, msg, F, '.1' + val) + self.assertRaisesRegex(ValueError, msg, F, '1.1e1' + val) + self.assertEqual((11, 10), _components(F('1.1e' + '0' * maxdigits))) + self.assertRaisesRegex(ValueError, msg, F, '1.1e' + '0' * (maxdigits+1)) def testImmutable(self): r = F(7, 3) diff --git a/Lib/test/test_free_threading/test_functools.py b/Lib/test/test_free_threading/test_functools.py new file mode 100644 index 00000000000..a442fe056ce --- /dev/null +++ b/Lib/test/test_free_threading/test_functools.py @@ -0,0 +1,75 @@ +import random +import unittest + +from functools import lru_cache +from threading import Barrier, Thread + +from test.support import threading_helper + +@threading_helper.requires_working_threading() +class TestLRUCache(unittest.TestCase): + + def _test_concurrent_operations(self, maxsize): + num_threads = 10 + b = Barrier(num_threads) + @lru_cache(maxsize=maxsize) + def func(arg=0): + return object() + + + def thread_func(): + b.wait() + for i in range(1000): + r = random.randint(0, 1000) + if i < 800: + func(i) + elif i < 900: + func.cache_info() + else: + func.cache_clear() + + threads = [] + for i in range(num_threads): + t = Thread(target=thread_func) + threads.append(t) + + with threading_helper.start_threads(threads): + pass + + def test_concurrent_operations_unbounded(self): + self._test_concurrent_operations(maxsize=None) + + def test_concurrent_operations_bounded(self): + self._test_concurrent_operations(maxsize=128) + + def _test_reentrant_cache_clear(self, maxsize): + num_threads = 10 + b = Barrier(num_threads) + @lru_cache(maxsize=maxsize) + def func(arg=0): + func.cache_clear() + return object() + + + def thread_func(): + b.wait() + for i in range(1000): + func(random.randint(0, 10000)) + + threads = [] + for i in range(num_threads): + t = Thread(target=thread_func) + threads.append(t) + + with threading_helper.start_threads(threads): + pass + + def test_reentrant_cache_clear_unbounded(self): + self._test_reentrant_cache_clear(maxsize=None) + + def test_reentrant_cache_clear_bounded(self): + self._test_reentrant_cache_clear(maxsize=128) + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_genericalias.py b/Lib/test/test_genericalias.py index 8d21ded4501..ea0dc241e39 100644 --- a/Lib/test/test_genericalias.py +++ b/Lib/test/test_genericalias.py @@ -61,6 +61,7 @@ try: from tkinter import Event except ImportError: Event = None +from string.templatelib import Template, Interpolation from typing import TypeVar T = TypeVar('T') @@ -139,7 +140,10 @@ class BaseTest(unittest.TestCase): DictReader, DictWriter, array, staticmethod, - classmethod] + classmethod, + Template, + Interpolation, + ] if ctypes is not None: generic_types.extend((ctypes.Array, ctypes.LibraryLoader, ctypes.py_object)) if ValueProxy is not None: diff --git a/Lib/test/test_hmac.py b/Lib/test/test_hmac.py index 70c79437722..e898644dd8a 100644 --- a/Lib/test/test_hmac.py +++ b/Lib/test/test_hmac.py @@ -1,8 +1,27 @@ +"""Test suite for HMAC. + +Python provides three different implementations of HMAC: + +- OpenSSL HMAC using OpenSSL hash functions. +- HACL* HMAC using HACL* hash functions. +- Generic Python HMAC using user-defined hash functions. + +The generic Python HMAC implementation is able to use OpenSSL +callables or names, HACL* named hash functions or arbitrary +objects implementing PEP 247 interface. + +In the two first cases, Python HMAC wraps a C HMAC object (either OpenSSL +or HACL*-based). As a last resort, HMAC is re-implemented in pure Python. +It is however interesting to test the pure Python implementation against +the OpenSSL and HACL* hash functions. +""" + import binascii import functools import hmac import hashlib import random +import test.support import test.support.hashlib_helper as hashlib_helper import types import unittest @@ -10,6 +29,12 @@ import unittest.mock as mock import warnings from _operator import _compare_digest as operator_compare_digest from test.support import check_disallow_instantiation +from test.support.hashlib_helper import ( + BuiltinHashFunctionsTrait, + HashFunctionsTrait, + NamedHashFunctionsTrait, + OpenSSLHashFunctionsTrait, +) from test.support.import_helper import import_fresh_module, import_module try: @@ -382,50 +407,7 @@ class BuiltinAssertersMixin(ThroughBuiltinAPIMixin, AssertersMixin): pass -class HashFunctionsTrait: - """Trait class for 'hashfunc' in hmac_new() and hmac_digest().""" - - ALGORITHMS = [ - 'md5', 'sha1', - 'sha224', 'sha256', 'sha384', 'sha512', - 'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512', - ] - - # By default, a missing algorithm skips the test that uses it. - _ = property(lambda self: self.skipTest("missing hash function")) - md5 = sha1 = _ - sha224 = sha256 = sha384 = sha512 = _ - sha3_224 = sha3_256 = sha3_384 = sha3_512 = _ - del _ - - -class WithOpenSSLHashFunctions(HashFunctionsTrait): - """Test a HMAC implementation with an OpenSSL-based callable 'hashfunc'.""" - - @classmethod - def setUpClass(cls): - super().setUpClass() - - for name in cls.ALGORITHMS: - @property - @hashlib_helper.requires_openssl_hashdigest(name) - def func(self, *, __name=name): # __name needed to bind 'name' - return getattr(_hashlib, f'openssl_{__name}') - setattr(cls, name, func) - - -class WithNamedHashFunctions(HashFunctionsTrait): - """Test a HMAC implementation with a named 'hashfunc'.""" - - @classmethod - def setUpClass(cls): - super().setUpClass() - - for name in cls.ALGORITHMS: - setattr(cls, name, name) - - -class RFCTestCaseMixin(AssertersMixin, HashFunctionsTrait): +class RFCTestCaseMixin(HashFunctionsTrait, AssertersMixin): """Test HMAC implementations against RFC 2202/4231 and NIST test vectors. - Test vectors for MD5 and SHA-1 are taken from RFC 2202. @@ -739,26 +721,83 @@ class RFCTestCaseMixin(AssertersMixin, HashFunctionsTrait): ) -class PyRFCTestCase(ThroughObjectMixin, PyAssertersMixin, - WithOpenSSLHashFunctions, RFCTestCaseMixin, - unittest.TestCase): +class PurePythonInitHMAC(PyModuleMixin, HashFunctionsTrait): + + @classmethod + def setUpClass(cls): + super().setUpClass() + for meth in ['_init_openssl_hmac', '_init_builtin_hmac']: + fn = getattr(cls.hmac.HMAC, meth) + cm = mock.patch.object(cls.hmac.HMAC, meth, autospec=True, wraps=fn) + cls.enterClassContext(cm) + + @classmethod + def tearDownClass(cls): + cls.hmac.HMAC._init_openssl_hmac.assert_not_called() + cls.hmac.HMAC._init_builtin_hmac.assert_not_called() + # Do not assert that HMAC._init_old() has been called as it's tricky + # to determine whether a test for a specific hash function has been + # executed or not. On regular builds, it will be called but if a + # hash function is not available, it's hard to detect for which + # test we should checj HMAC._init_old() or not. + super().tearDownClass() + + +class PyRFCOpenSSLTestCase(ThroughObjectMixin, + PyAssertersMixin, + OpenSSLHashFunctionsTrait, + RFCTestCaseMixin, + PurePythonInitHMAC, + unittest.TestCase): """Python implementation of HMAC using hmac.HMAC(). - The underlying hash functions are OpenSSL-based. + The underlying hash functions are OpenSSL-based but + _init_old() is used instead of _init_openssl_hmac(). """ -class PyDotNewRFCTestCase(ThroughModuleAPIMixin, PyAssertersMixin, - WithOpenSSLHashFunctions, RFCTestCaseMixin, - unittest.TestCase): +class PyRFCBuiltinTestCase(ThroughObjectMixin, + PyAssertersMixin, + BuiltinHashFunctionsTrait, + RFCTestCaseMixin, + PurePythonInitHMAC, + unittest.TestCase): + """Python implementation of HMAC using hmac.HMAC(). + + The underlying hash functions are HACL*-based but + _init_old() is used instead of _init_builtin_hmac(). + """ + + +class PyDotNewOpenSSLRFCTestCase(ThroughModuleAPIMixin, + PyAssertersMixin, + OpenSSLHashFunctionsTrait, + RFCTestCaseMixin, + PurePythonInitHMAC, + unittest.TestCase): + """Python implementation of HMAC using hmac.new(). + + The underlying hash functions are OpenSSL-based but + _init_old() is used instead of _init_openssl_hmac(). + """ + + +class PyDotNewBuiltinRFCTestCase(ThroughModuleAPIMixin, + PyAssertersMixin, + BuiltinHashFunctionsTrait, + RFCTestCaseMixin, + PurePythonInitHMAC, + unittest.TestCase): """Python implementation of HMAC using hmac.new(). - The underlying hash functions are OpenSSL-based. + The underlying hash functions are HACL-based but + _init_old() is used instead of _init_openssl_hmac(). """ class OpenSSLRFCTestCase(OpenSSLAssertersMixin, - WithOpenSSLHashFunctions, RFCTestCaseMixin, + OpenSSLHashFunctionsTrait, + RFCTestCaseMixin, unittest.TestCase): """OpenSSL implementation of HMAC. @@ -767,7 +806,8 @@ class OpenSSLRFCTestCase(OpenSSLAssertersMixin, class BuiltinRFCTestCase(BuiltinAssertersMixin, - WithNamedHashFunctions, RFCTestCaseMixin, + NamedHashFunctionsTrait, + RFCTestCaseMixin, unittest.TestCase): """Built-in HACL* implementation of HMAC. @@ -784,12 +824,6 @@ class BuiltinRFCTestCase(BuiltinAssertersMixin, self.check_hmac_hexdigest(key, msg, hexdigest, digest_size, func) -# TODO(picnixz): once we have a HACL* HMAC, we should also test the Python -# implementation of HMAC with a HACL*-based hash function. For now, we only -# test it partially via the '_sha2' module, but for completeness we could -# also test the RFC test vectors against all possible implementations. - - class DigestModTestCaseMixin(CreatorMixin, DigestMixin): """Tests for the 'digestmod' parameter for hmac_new() and hmac_digest().""" diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py index 2cafa4e45a1..0af1c45ecb2 100644 --- a/Lib/test/test_httpservers.py +++ b/Lib/test/test_httpservers.py @@ -3,16 +3,16 @@ Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. """ -from collections import OrderedDict + from http.server import BaseHTTPRequestHandler, HTTPServer, HTTPSServer, \ - SimpleHTTPRequestHandler, CGIHTTPRequestHandler + SimpleHTTPRequestHandler from http import server, HTTPStatus +import contextlib import os import socket import sys import re -import base64 import ntpath import pathlib import shutil @@ -31,7 +31,7 @@ from io import BytesIO, StringIO import unittest from test import support from test.support import ( - is_apple, import_helper, os_helper, requires_subprocess, threading_helper + is_apple, import_helper, os_helper, threading_helper ) try: @@ -522,42 +522,120 @@ class SimpleHTTPServerTestCase(BaseTestCase): reader.close() return body + def check_list_dir_dirname(self, dirname, quotedname=None): + fullpath = os.path.join(self.tempdir, dirname) + try: + os.mkdir(os.path.join(self.tempdir, dirname)) + except (OSError, UnicodeEncodeError): + self.skipTest(f'Can not create directory {dirname!a} ' + f'on current file system') + + if quotedname is None: + quotedname = urllib.parse.quote(dirname, errors='surrogatepass') + response = self.request(self.base_url + '/' + quotedname + '/') + body = self.check_status_and_reason(response, HTTPStatus.OK) + displaypath = html.escape(f'{self.base_url}/{dirname}/', quote=False) + enc = sys.getfilesystemencoding() + prefix = f'listing for {displaypath}</'.encode(enc, 'surrogateescape') + self.assertIn(prefix + b'title>', body) + self.assertIn(prefix + b'h1>', body) + + def check_list_dir_filename(self, filename): + fullpath = os.path.join(self.tempdir, filename) + content = ascii(fullpath).encode() + (os_helper.TESTFN_UNDECODABLE or b'\xff') + try: + with open(fullpath, 'wb') as f: + f.write(content) + except OSError: + self.skipTest(f'Can not create file {filename!a} ' + f'on current file system') + + response = self.request(self.base_url + '/') + body = self.check_status_and_reason(response, HTTPStatus.OK) + quotedname = urllib.parse.quote(filename, errors='surrogatepass') + enc = response.headers.get_content_charset() + self.assertIsNotNone(enc) + self.assertIn((f'href="{quotedname}"').encode('ascii'), body) + displayname = html.escape(filename, quote=False) + self.assertIn(f'>{displayname}<'.encode(enc, 'surrogateescape'), body) + + response = self.request(self.base_url + '/' + quotedname) + self.check_status_and_reason(response, HTTPStatus.OK, data=content) + + @unittest.skipUnless(os_helper.TESTFN_NONASCII, + 'need os_helper.TESTFN_NONASCII') + def test_list_dir_nonascii_dirname(self): + dirname = os_helper.TESTFN_NONASCII + '.dir' + self.check_list_dir_dirname(dirname) + + @unittest.skipUnless(os_helper.TESTFN_NONASCII, + 'need os_helper.TESTFN_NONASCII') + def test_list_dir_nonascii_filename(self): + filename = os_helper.TESTFN_NONASCII + '.txt' + self.check_list_dir_filename(filename) + @unittest.skipIf(is_apple, 'undecodable name cannot always be decoded on Apple platforms') @unittest.skipIf(sys.platform == 'win32', 'undecodable name cannot be decoded on win32') @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 'need os_helper.TESTFN_UNDECODABLE') - def test_undecodable_filename(self): - enc = sys.getfilesystemencoding() - filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt' - with open(os.path.join(self.tempdir, filename), 'wb') as f: - f.write(os_helper.TESTFN_UNDECODABLE) - response = self.request(self.base_url + '/') - if is_apple: - # On Apple platforms the HFS+ filesystem replaces bytes that - # aren't valid UTF-8 into a percent-encoded value. - for name in os.listdir(self.tempdir): - if name != 'test': # Ignore a filename created in setUp(). - filename = name - break - body = self.check_status_and_reason(response, HTTPStatus.OK) - quotedname = urllib.parse.quote(filename, errors='surrogatepass') - self.assertIn(('href="%s"' % quotedname) - .encode(enc, 'surrogateescape'), body) - self.assertIn(('>%s<' % html.escape(filename, quote=False)) - .encode(enc, 'surrogateescape'), body) - response = self.request(self.base_url + '/' + quotedname) - self.check_status_and_reason(response, HTTPStatus.OK, - data=os_helper.TESTFN_UNDECODABLE) + def test_list_dir_undecodable_dirname(self): + dirname = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.dir' + self.check_list_dir_dirname(dirname) - def test_undecodable_parameter(self): - # sanity check using a valid parameter + @unittest.skipIf(is_apple, + 'undecodable name cannot always be decoded on Apple platforms') + @unittest.skipIf(sys.platform == 'win32', + 'undecodable name cannot be decoded on win32') + @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, + 'need os_helper.TESTFN_UNDECODABLE') + def test_list_dir_undecodable_filename(self): + filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt' + self.check_list_dir_filename(filename) + + def test_list_dir_undecodable_dirname2(self): + dirname = '\ufffd.dir' + self.check_list_dir_dirname(dirname, quotedname='%ff.dir') + + @unittest.skipUnless(os_helper.TESTFN_UNENCODABLE, + 'need os_helper.TESTFN_UNENCODABLE') + def test_list_dir_unencodable_dirname(self): + dirname = os_helper.TESTFN_UNENCODABLE + '.dir' + self.check_list_dir_dirname(dirname) + + @unittest.skipUnless(os_helper.TESTFN_UNENCODABLE, + 'need os_helper.TESTFN_UNENCODABLE') + def test_list_dir_unencodable_filename(self): + filename = os_helper.TESTFN_UNENCODABLE + '.txt' + self.check_list_dir_filename(filename) + + def test_list_dir_escape_dirname(self): + # Characters that need special treating in URL or HTML. + for name in ('q?', 'f#', '&', '&', '<i>', '"dq"', "'sq'", + '%A4', '%E2%82%AC'): + with self.subTest(name=name): + dirname = name + '.dir' + self.check_list_dir_dirname(dirname, + quotedname=urllib.parse.quote(dirname, safe='&<>\'"')) + + def test_list_dir_escape_filename(self): + # Characters that need special treating in URL or HTML. + for name in ('q?', 'f#', '&', '&', '<i>', '"dq"', "'sq'", + '%A4', '%E2%82%AC'): + with self.subTest(name=name): + filename = name + '.txt' + self.check_list_dir_filename(filename) + os_helper.unlink(os.path.join(self.tempdir, filename)) + + def test_list_dir_with_query_and_fragment(self): + prefix = f'listing for {self.base_url}/</'.encode('latin1') + response = self.request(self.base_url + '/#123').read() + self.assertIn(prefix + b'title>', response) + self.assertIn(prefix + b'h1>', response) response = self.request(self.base_url + '/?x=123').read() - self.assertRegex(response, rf'listing for {self.base_url}/\?x=123'.encode('latin1')) - # now the bogus encoding - response = self.request(self.base_url + '/?x=%bb').read() - self.assertRegex(response, rf'listing for {self.base_url}/\?x=\xef\xbf\xbd'.encode('latin1')) + self.assertIn(prefix + b'title>', response) + self.assertIn(prefix + b'h1>', response) def test_get_dir_redirect_location_domain_injection_bug(self): """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location. @@ -615,10 +693,19 @@ class SimpleHTTPServerTestCase(BaseTestCase): # check for trailing "/" which should return 404. See Issue17324 response = self.request(self.base_url + '/test/') self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) + response = self.request(self.base_url + '/test%2f') + self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) + response = self.request(self.base_url + '/test%2F') + self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) response = self.request(self.base_url + '/') self.check_status_and_reason(response, HTTPStatus.OK) + response = self.request(self.base_url + '%2f') + self.check_status_and_reason(response, HTTPStatus.OK) + response = self.request(self.base_url + '%2F') + self.check_status_and_reason(response, HTTPStatus.OK) response = self.request(self.base_url) self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) + self.assertEqual(response.getheader("Location"), self.base_url + "/") self.assertEqual(response.getheader("Content-Length"), "0") response = self.request(self.base_url + '/?hi=2') self.check_status_and_reason(response, HTTPStatus.OK) @@ -724,6 +811,8 @@ class SimpleHTTPServerTestCase(BaseTestCase): self.check_status_and_reason(response, HTTPStatus.OK) response = self.request(self.tempdir_name) self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) + self.assertEqual(response.getheader("Location"), + self.tempdir_name + "/") response = self.request(self.tempdir_name + '/?hi=2') self.check_status_and_reason(response, HTTPStatus.OK) response = self.request(self.tempdir_name + '?hi=1') @@ -731,350 +820,6 @@ class SimpleHTTPServerTestCase(BaseTestCase): self.assertEqual(response.getheader("Location"), self.tempdir_name + "/?hi=1") - def test_html_escape_filename(self): - filename = '<test&>.txt' - fullpath = os.path.join(self.tempdir, filename) - - try: - open(fullpath, 'wb').close() - except OSError: - raise unittest.SkipTest('Can not create file %s on current file ' - 'system' % filename) - - try: - response = self.request(self.base_url + '/') - body = self.check_status_and_reason(response, HTTPStatus.OK) - enc = response.headers.get_content_charset() - finally: - os.unlink(fullpath) # avoid affecting test_undecodable_filename - - self.assertIsNotNone(enc) - html_text = '>%s<' % html.escape(filename, quote=False) - self.assertIn(html_text.encode(enc), body) - - -cgi_file1 = """\ -#!%s - -print("Content-type: text/html") -print() -print("Hello World") -""" - -cgi_file2 = """\ -#!%s -import os -import sys -import urllib.parse - -print("Content-type: text/html") -print() - -content_length = int(os.environ["CONTENT_LENGTH"]) -query_string = sys.stdin.buffer.read(content_length) -params = {key.decode("utf-8"): val.decode("utf-8") - for key, val in urllib.parse.parse_qsl(query_string)} - -print("%%s, %%s, %%s" %% (params["spam"], params["eggs"], params["bacon"])) -""" - -cgi_file4 = """\ -#!%s -import os - -print("Content-type: text/html") -print() - -print(os.environ["%s"]) -""" - -cgi_file6 = """\ -#!%s -import os - -print("X-ambv: was here") -print("Content-type: text/html") -print() -print("<pre>") -for k, v in os.environ.items(): - try: - k.encode('ascii') - v.encode('ascii') - except UnicodeEncodeError: - continue # see: BPO-44647 - print(f"{k}={v}") -print("</pre>") -""" - - -@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, - "This test can't be run reliably as root (issue #13308).") -@requires_subprocess() -class CGIHTTPServerTestCase(BaseTestCase): - class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): - _test_case_self = None # populated by each setUp() method call. - - def __init__(self, *args, **kwargs): - with self._test_case_self.assertWarnsRegex( - DeprecationWarning, - r'http\.server\.CGIHTTPRequestHandler'): - # This context also happens to catch and silence the - # threading DeprecationWarning from os.fork(). - super().__init__(*args, **kwargs) - - linesep = os.linesep.encode('ascii') - - def setUp(self): - self.request_handler._test_case_self = self # practical, but yuck. - BaseTestCase.setUp(self) - self.cwd = os.getcwd() - self.parent_dir = tempfile.mkdtemp() - self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') - self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') - self.sub_dir_1 = os.path.join(self.parent_dir, 'sub') - self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir') - self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin') - os.mkdir(self.cgi_dir) - os.mkdir(self.cgi_child_dir) - os.mkdir(self.sub_dir_1) - os.mkdir(self.sub_dir_2) - os.mkdir(self.cgi_dir_in_sub_dir) - self.nocgi_path = None - self.file1_path = None - self.file2_path = None - self.file3_path = None - self.file4_path = None - self.file5_path = None - - # The shebang line should be pure ASCII: use symlink if possible. - # See issue #7668. - self._pythonexe_symlink = None - if os_helper.can_symlink(): - self.pythonexe = os.path.join(self.parent_dir, 'python') - self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__() - else: - self.pythonexe = sys.executable - - try: - # The python executable path is written as the first line of the - # CGI Python script. The encoding cookie cannot be used, and so the - # path should be encodable to the default script encoding (utf-8) - self.pythonexe.encode('utf-8') - except UnicodeEncodeError: - self.tearDown() - self.skipTest("Python executable path is not encodable to utf-8") - - self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') - with open(self.nocgi_path, 'w', encoding='utf-8') as fp: - fp.write(cgi_file1 % self.pythonexe) - os.chmod(self.nocgi_path, 0o777) - - self.file1_path = os.path.join(self.cgi_dir, 'file1.py') - with open(self.file1_path, 'w', encoding='utf-8') as file1: - file1.write(cgi_file1 % self.pythonexe) - os.chmod(self.file1_path, 0o777) - - self.file2_path = os.path.join(self.cgi_dir, 'file2.py') - with open(self.file2_path, 'w', encoding='utf-8') as file2: - file2.write(cgi_file2 % self.pythonexe) - os.chmod(self.file2_path, 0o777) - - self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') - with open(self.file3_path, 'w', encoding='utf-8') as file3: - file3.write(cgi_file1 % self.pythonexe) - os.chmod(self.file3_path, 0o777) - - self.file4_path = os.path.join(self.cgi_dir, 'file4.py') - with open(self.file4_path, 'w', encoding='utf-8') as file4: - file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) - os.chmod(self.file4_path, 0o777) - - self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py') - with open(self.file5_path, 'w', encoding='utf-8') as file5: - file5.write(cgi_file1 % self.pythonexe) - os.chmod(self.file5_path, 0o777) - - self.file6_path = os.path.join(self.cgi_dir, 'file6.py') - with open(self.file6_path, 'w', encoding='utf-8') as file6: - file6.write(cgi_file6 % self.pythonexe) - os.chmod(self.file6_path, 0o777) - - os.chdir(self.parent_dir) - - def tearDown(self): - self.request_handler._test_case_self = None - try: - os.chdir(self.cwd) - if self._pythonexe_symlink: - self._pythonexe_symlink.__exit__(None, None, None) - if self.nocgi_path: - os.remove(self.nocgi_path) - if self.file1_path: - os.remove(self.file1_path) - if self.file2_path: - os.remove(self.file2_path) - if self.file3_path: - os.remove(self.file3_path) - if self.file4_path: - os.remove(self.file4_path) - if self.file5_path: - os.remove(self.file5_path) - if self.file6_path: - os.remove(self.file6_path) - os.rmdir(self.cgi_child_dir) - os.rmdir(self.cgi_dir) - os.rmdir(self.cgi_dir_in_sub_dir) - os.rmdir(self.sub_dir_2) - os.rmdir(self.sub_dir_1) - # The 'gmon.out' file can be written in the current working - # directory if C-level code profiling with gprof is enabled. - os_helper.unlink(os.path.join(self.parent_dir, 'gmon.out')) - os.rmdir(self.parent_dir) - finally: - BaseTestCase.tearDown(self) - - def test_url_collapse_path(self): - # verify tail is the last portion and head is the rest on proper urls - test_vectors = { - '': '//', - '..': IndexError, - '/.//..': IndexError, - '/': '//', - '//': '//', - '/\\': '//\\', - '/.//': '//', - 'cgi-bin/file1.py': '/cgi-bin/file1.py', - '/cgi-bin/file1.py': '/cgi-bin/file1.py', - 'a': '//a', - '/a': '//a', - '//a': '//a', - './a': '//a', - './C:/': '/C:/', - '/a/b': '/a/b', - '/a/b/': '/a/b/', - '/a/b/.': '/a/b/', - '/a/b/c/..': '/a/b/', - '/a/b/c/../d': '/a/b/d', - '/a/b/c/../d/e/../f': '/a/b/d/f', - '/a/b/c/../d/e/../../f': '/a/b/f', - '/a/b/c/../d/e/.././././..//f': '/a/b/f', - '../a/b/c/../d/e/.././././..//f': IndexError, - '/a/b/c/../d/e/../../../f': '/a/f', - '/a/b/c/../d/e/../../../../f': '//f', - '/a/b/c/../d/e/../../../../../f': IndexError, - '/a/b/c/../d/e/../../../../f/..': '//', - '/a/b/c/../d/e/../../../../f/../.': '//', - } - for path, expected in test_vectors.items(): - if isinstance(expected, type) and issubclass(expected, Exception): - self.assertRaises(expected, - server._url_collapse_path, path) - else: - actual = server._url_collapse_path(path) - self.assertEqual(expected, actual, - msg='path = %r\nGot: %r\nWanted: %r' % - (path, actual, expected)) - - def test_headers_and_content(self): - res = self.request('/cgi-bin/file1.py') - self.assertEqual( - (res.read(), res.getheader('Content-type'), res.status), - (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) - - def test_issue19435(self): - res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') - self.assertEqual(res.status, HTTPStatus.NOT_FOUND) - - def test_post(self): - params = urllib.parse.urlencode( - {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) - headers = {'Content-type' : 'application/x-www-form-urlencoded'} - res = self.request('/cgi-bin/file2.py', 'POST', params, headers) - - self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) - - def test_invaliduri(self): - res = self.request('/cgi-bin/invalid') - res.read() - self.assertEqual(res.status, HTTPStatus.NOT_FOUND) - - def test_authorization(self): - headers = {b'Authorization' : b'Basic ' + - base64.b64encode(b'username:pass')} - res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) - self.assertEqual( - (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - - def test_no_leading_slash(self): - # http://bugs.python.org/issue2254 - res = self.request('cgi-bin/file1.py') - self.assertEqual( - (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - - def test_os_environ_is_not_altered(self): - signature = "Test CGI Server" - os.environ['SERVER_SOFTWARE'] = signature - res = self.request('/cgi-bin/file1.py') - self.assertEqual( - (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) - - def test_urlquote_decoding_in_cgi_check(self): - res = self.request('/cgi-bin%2ffile1.py') - self.assertEqual( - (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - - def test_nested_cgi_path_issue21323(self): - res = self.request('/cgi-bin/child-dir/file3.py') - self.assertEqual( - (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - - def test_query_with_multiple_question_mark(self): - res = self.request('/cgi-bin/file4.py?a=b?c=d') - self.assertEqual( - (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - - def test_query_with_continuous_slashes(self): - res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') - self.assertEqual( - (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, - 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - - def test_cgi_path_in_sub_directories(self): - try: - CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin') - res = self.request('/sub/dir/cgi-bin/file5.py') - self.assertEqual( - (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), - (res.read(), res.getheader('Content-type'), res.status)) - finally: - CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin') - - def test_accept(self): - browser_accept = \ - 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' - tests = ( - ((('Accept', browser_accept),), browser_accept), - ((), ''), - # Hack case to get two values for the one header - ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')), - 'text/html,text/plain'), - ) - for headers, expected in tests: - headers = OrderedDict(headers) - with self.subTest(headers): - res = self.request('/cgi-bin/file6.py', 'GET', headers=headers) - self.assertEqual(http.HTTPStatus.OK, res.status) - expected = f"HTTP_ACCEPT={expected}".encode('ascii') - self.assertIn(expected, res.read()) - class SocketlessRequestHandler(SimpleHTTPRequestHandler): def __init__(self, directory=None): @@ -1095,6 +840,7 @@ class SocketlessRequestHandler(SimpleHTTPRequestHandler): def log_message(self, format, *args): pass + class RejectingSocketlessRequestHandler(SocketlessRequestHandler): def handle_expect_100(self): self.send_error(HTTPStatus.EXPECTATION_FAILED) @@ -1536,6 +1282,176 @@ class ScriptTestCase(unittest.TestCase): self.assertEqual(mock_server.address_family, socket.AF_INET) +class CommandLineTestCase(unittest.TestCase): + default_port = 8000 + default_bind = None + default_protocol = 'HTTP/1.0' + default_handler = SimpleHTTPRequestHandler + default_server = unittest.mock.ANY + tls_cert = certdata_file('ssl_cert.pem') + tls_key = certdata_file('ssl_key.pem') + tls_password = 'somepass' + tls_cert_options = ['--tls-cert'] + tls_key_options = ['--tls-key'] + tls_password_options = ['--tls-password-file'] + args = { + 'HandlerClass': default_handler, + 'ServerClass': default_server, + 'protocol': default_protocol, + 'port': default_port, + 'bind': default_bind, + 'tls_cert': None, + 'tls_key': None, + 'tls_password': None, + } + + def setUp(self): + super().setUp() + self.tls_password_file = tempfile.mktemp() + with open(self.tls_password_file, 'wb') as f: + f.write(self.tls_password.encode()) + self.addCleanup(os_helper.unlink, self.tls_password_file) + + def invoke_httpd(self, *args, stdout=None, stderr=None): + stdout = StringIO() if stdout is None else stdout + stderr = StringIO() if stderr is None else stderr + with contextlib.redirect_stdout(stdout), \ + contextlib.redirect_stderr(stderr): + server._main(args) + return stdout.getvalue(), stderr.getvalue() + + @mock.patch('http.server.test') + def test_port_flag(self, mock_func): + ports = [8000, 65535] + for port in ports: + with self.subTest(port=port): + self.invoke_httpd(str(port)) + call_args = self.args | dict(port=port) + mock_func.assert_called_once_with(**call_args) + mock_func.reset_mock() + + @mock.patch('http.server.test') + def test_directory_flag(self, mock_func): + options = ['-d', '--directory'] + directories = ['.', '/foo', '\\bar', '/', + 'C:\\', 'C:\\foo', 'C:\\bar', + '/home/user', './foo/foo2', 'D:\\foo\\bar'] + for flag in options: + for directory in directories: + with self.subTest(flag=flag, directory=directory): + self.invoke_httpd(flag, directory) + mock_func.assert_called_once_with(**self.args) + mock_func.reset_mock() + + @mock.patch('http.server.test') + def test_bind_flag(self, mock_func): + options = ['-b', '--bind'] + bind_addresses = ['localhost', '127.0.0.1', '::1', + '0.0.0.0', '8.8.8.8'] + for flag in options: + for bind_address in bind_addresses: + with self.subTest(flag=flag, bind_address=bind_address): + self.invoke_httpd(flag, bind_address) + call_args = self.args | dict(bind=bind_address) + mock_func.assert_called_once_with(**call_args) + mock_func.reset_mock() + + @mock.patch('http.server.test') + def test_protocol_flag(self, mock_func): + options = ['-p', '--protocol'] + protocols = ['HTTP/1.0', 'HTTP/1.1', 'HTTP/2.0', 'HTTP/3.0'] + for flag in options: + for protocol in protocols: + with self.subTest(flag=flag, protocol=protocol): + self.invoke_httpd(flag, protocol) + call_args = self.args | dict(protocol=protocol) + mock_func.assert_called_once_with(**call_args) + mock_func.reset_mock() + + @unittest.skipIf(ssl is None, "requires ssl") + @mock.patch('http.server.test') + def test_tls_cert_and_key_flags(self, mock_func): + for tls_cert_option in self.tls_cert_options: + for tls_key_option in self.tls_key_options: + self.invoke_httpd(tls_cert_option, self.tls_cert, + tls_key_option, self.tls_key) + call_args = self.args | { + 'tls_cert': self.tls_cert, + 'tls_key': self.tls_key, + } + mock_func.assert_called_once_with(**call_args) + mock_func.reset_mock() + + @unittest.skipIf(ssl is None, "requires ssl") + @mock.patch('http.server.test') + def test_tls_cert_and_key_and_password_flags(self, mock_func): + for tls_cert_option in self.tls_cert_options: + for tls_key_option in self.tls_key_options: + for tls_password_option in self.tls_password_options: + self.invoke_httpd(tls_cert_option, + self.tls_cert, + tls_key_option, + self.tls_key, + tls_password_option, + self.tls_password_file) + call_args = self.args | { + 'tls_cert': self.tls_cert, + 'tls_key': self.tls_key, + 'tls_password': self.tls_password, + } + mock_func.assert_called_once_with(**call_args) + mock_func.reset_mock() + + @unittest.skipIf(ssl is None, "requires ssl") + @mock.patch('http.server.test') + def test_missing_tls_cert_flag(self, mock_func): + for tls_key_option in self.tls_key_options: + with self.assertRaises(SystemExit): + self.invoke_httpd(tls_key_option, self.tls_key) + mock_func.reset_mock() + + for tls_password_option in self.tls_password_options: + with self.assertRaises(SystemExit): + self.invoke_httpd(tls_password_option, self.tls_password) + mock_func.reset_mock() + + @unittest.skipIf(ssl is None, "requires ssl") + @mock.patch('http.server.test') + def test_invalid_password_file(self, mock_func): + non_existent_file = 'non_existent_file' + for tls_password_option in self.tls_password_options: + for tls_cert_option in self.tls_cert_options: + with self.assertRaises(SystemExit): + self.invoke_httpd(tls_cert_option, + self.tls_cert, + tls_password_option, + non_existent_file) + + @mock.patch('http.server.test') + def test_no_arguments(self, mock_func): + self.invoke_httpd() + mock_func.assert_called_once_with(**self.args) + mock_func.reset_mock() + + @mock.patch('http.server.test') + def test_help_flag(self, _): + options = ['-h', '--help'] + for option in options: + stdout, stderr = StringIO(), StringIO() + with self.assertRaises(SystemExit): + self.invoke_httpd(option, stdout=stdout, stderr=stderr) + self.assertIn('usage', stdout.getvalue()) + self.assertEqual(stderr.getvalue(), '') + + @mock.patch('http.server.test') + def test_unknown_flag(self, _): + stdout, stderr = StringIO(), StringIO() + with self.assertRaises(SystemExit): + self.invoke_httpd('--unknown-flag', stdout=stdout, stderr=stderr) + self.assertEqual(stdout.getvalue(), '') + self.assertIn('error', stderr.getvalue()) + + def setUpModule(): unittest.addModuleCleanup(os.chdir, os.getcwd()) diff --git a/Lib/test/test_importlib/import_/test_relative_imports.py b/Lib/test/test_importlib/import_/test_relative_imports.py index e535d119763..1549cbe96ce 100644 --- a/Lib/test/test_importlib/import_/test_relative_imports.py +++ b/Lib/test/test_importlib/import_/test_relative_imports.py @@ -223,6 +223,21 @@ class RelativeImports: self.__import__('sys', {'__package__': '', '__spec__': None}, level=1) + def test_malicious_relative_import(self): + # https://github.com/python/cpython/issues/134100 + # Test to make sure UAF bug with error msg doesn't come back to life + import sys + loooong = "".ljust(0x23000, "b") + name = f"a.{loooong}.c" + + with util.uncache(name): + sys.modules[name] = {} + with self.assertRaisesRegex( + KeyError, + r"'a\.b+' not in sys\.modules as expected" + ): + __import__(f"{loooong}.c", {"__package__": "a"}, level=1) + (Frozen_RelativeImports, Source_RelativeImports diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index 66c7afce88f..1e2d572b1cb 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -1452,6 +1452,14 @@ class LowLevelTests(TestBase): self.assertFalse( self.interp_exists(interpid)) + with self.subTest('basic C-API'): + interpid = _testinternalcapi.create_interpreter() + self.assertTrue( + self.interp_exists(interpid)) + _testinternalcapi.destroy_interpreter(interpid, basic=True) + self.assertFalse( + self.interp_exists(interpid)) + def test_get_config(self): # This test overlaps with # test.test_capi.test_misc.InterpreterConfigTests. diff --git a/Lib/test/test_ioctl.py b/Lib/test/test_ioctl.py index 7a986048bda..3c7a58aa2bc 100644 --- a/Lib/test/test_ioctl.py +++ b/Lib/test/test_ioctl.py @@ -127,9 +127,8 @@ class IoctlTestsTty(unittest.TestCase): self._check_ioctl_not_mutate_len(1024) def test_ioctl_mutate_2048(self): - # Test with a larger buffer, just for the record. self._check_ioctl_mutate_len(2048) - self.assertRaises(ValueError, self._check_ioctl_not_mutate_len, 2048) + self._check_ioctl_not_mutate_len(1024) @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 1e5adcc8db1..fa5b1e43816 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -61,7 +61,7 @@ import warnings import weakref from http.server import HTTPServer, BaseHTTPRequestHandler -from unittest.mock import patch +from unittest.mock import call, Mock, patch from urllib.parse import urlparse, parse_qs from socketserver import (ThreadingUDPServer, DatagramRequestHandler, ThreadingTCPServer, StreamRequestHandler) @@ -5655,12 +5655,19 @@ class BasicConfigTest(unittest.TestCase): assertRaises = self.assertRaises handlers = [logging.StreamHandler()] stream = sys.stderr + formatter = logging.Formatter() assertRaises(ValueError, logging.basicConfig, filename='test.log', stream=stream) assertRaises(ValueError, logging.basicConfig, filename='test.log', handlers=handlers) assertRaises(ValueError, logging.basicConfig, stream=stream, handlers=handlers) + assertRaises(ValueError, logging.basicConfig, formatter=formatter, + format='%(message)s') + assertRaises(ValueError, logging.basicConfig, formatter=formatter, + datefmt='%H:%M:%S') + assertRaises(ValueError, logging.basicConfig, formatter=formatter, + style='%') # Issue 23207: test for invalid kwargs assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) # Should pop both filename and filemode even if filename is None @@ -5795,6 +5802,20 @@ class BasicConfigTest(unittest.TestCase): # didn't write anything due to the encoding error self.assertEqual(data, r'') + def test_formatter_given(self): + mock_formatter = Mock() + mock_handler = Mock(formatter=None) + with patch("logging.Formatter") as mock_formatter_init: + logging.basicConfig(formatter=mock_formatter, handlers=[mock_handler]) + self.assertEqual(mock_handler.setFormatter.call_args_list, [call(mock_formatter)]) + self.assertEqual(mock_formatter_init.call_count, 0) + + def test_formatter_not_given(self): + mock_handler = Mock(formatter=None) + with patch("logging.Formatter") as mock_formatter_init: + logging.basicConfig(handlers=[mock_handler]) + self.assertEqual(mock_formatter_init.call_count, 1) + @support.requires_working_socket() def test_log_taskName(self): async def log_record(): diff --git a/Lib/test/test_ntpath.py b/Lib/test/test_ntpath.py index c10387b58e3..f83ef225a6e 100644 --- a/Lib/test/test_ntpath.py +++ b/Lib/test/test_ntpath.py @@ -124,6 +124,22 @@ class TestNtpath(NtpathTestCase): tester('ntpath.splitdrive("//?/UNC/server/share/dir")', ("//?/UNC/server/share", "/dir")) + def test_splitdrive_invalid_paths(self): + splitdrive = ntpath.splitdrive + self.assertEqual(splitdrive('\\\\ser\x00ver\\sha\x00re\\di\x00r'), + ('\\\\ser\x00ver\\sha\x00re', '\\di\x00r')) + self.assertEqual(splitdrive(b'\\\\ser\x00ver\\sha\x00re\\di\x00r'), + (b'\\\\ser\x00ver\\sha\x00re', b'\\di\x00r')) + self.assertEqual(splitdrive("\\\\\udfff\\\udffe\\\udffd"), + ('\\\\\udfff\\\udffe', '\\\udffd')) + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, splitdrive, b'\\\\\xff\\share\\dir') + self.assertRaises(UnicodeDecodeError, splitdrive, b'\\\\server\\\xff\\dir') + self.assertRaises(UnicodeDecodeError, splitdrive, b'\\\\server\\share\\\xff') + else: + self.assertEqual(splitdrive(b'\\\\\xff\\\xfe\\\xfd'), + (b'\\\\\xff\\\xfe', b'\\\xfd')) + def test_splitroot(self): tester("ntpath.splitroot('')", ('', '', '')) tester("ntpath.splitroot('foo')", ('', '', 'foo')) @@ -214,6 +230,22 @@ class TestNtpath(NtpathTestCase): tester('ntpath.splitroot(" :/foo")', (" :", "/", "foo")) tester('ntpath.splitroot("/:/foo")', ("", "/", ":/foo")) + def test_splitroot_invalid_paths(self): + splitroot = ntpath.splitroot + self.assertEqual(splitroot('\\\\ser\x00ver\\sha\x00re\\di\x00r'), + ('\\\\ser\x00ver\\sha\x00re', '\\', 'di\x00r')) + self.assertEqual(splitroot(b'\\\\ser\x00ver\\sha\x00re\\di\x00r'), + (b'\\\\ser\x00ver\\sha\x00re', b'\\', b'di\x00r')) + self.assertEqual(splitroot("\\\\\udfff\\\udffe\\\udffd"), + ('\\\\\udfff\\\udffe', '\\', '\udffd')) + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, splitroot, b'\\\\\xff\\share\\dir') + self.assertRaises(UnicodeDecodeError, splitroot, b'\\\\server\\\xff\\dir') + self.assertRaises(UnicodeDecodeError, splitroot, b'\\\\server\\share\\\xff') + else: + self.assertEqual(splitroot(b'\\\\\xff\\\xfe\\\xfd'), + (b'\\\\\xff\\\xfe', b'\\', b'\xfd')) + def test_split(self): tester('ntpath.split("c:\\foo\\bar")', ('c:\\foo', 'bar')) tester('ntpath.split("\\\\conky\\mountpoint\\foo\\bar")', @@ -226,6 +258,21 @@ class TestNtpath(NtpathTestCase): tester('ntpath.split("c:/")', ('c:/', '')) tester('ntpath.split("//conky/mountpoint/")', ('//conky/mountpoint/', '')) + def test_split_invalid_paths(self): + split = ntpath.split + self.assertEqual(split('c:\\fo\x00o\\ba\x00r'), + ('c:\\fo\x00o', 'ba\x00r')) + self.assertEqual(split(b'c:\\fo\x00o\\ba\x00r'), + (b'c:\\fo\x00o', b'ba\x00r')) + self.assertEqual(split('c:\\\udfff\\\udffe'), + ('c:\\\udfff', '\udffe')) + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, split, b'c:\\\xff\\bar') + self.assertRaises(UnicodeDecodeError, split, b'c:\\foo\\\xff') + else: + self.assertEqual(split(b'c:\\\xff\\\xfe'), + (b'c:\\\xff', b'\xfe')) + def test_isabs(self): tester('ntpath.isabs("foo\\bar")', 0) tester('ntpath.isabs("foo/bar")', 0) @@ -333,6 +380,30 @@ class TestNtpath(NtpathTestCase): tester("ntpath.join('D:a', './c:b')", 'D:a\\.\\c:b') tester("ntpath.join('D:/a', './c:b')", 'D:\\a\\.\\c:b') + def test_normcase(self): + normcase = ntpath.normcase + self.assertEqual(normcase(''), '') + self.assertEqual(normcase(b''), b'') + self.assertEqual(normcase('ABC'), 'abc') + self.assertEqual(normcase(b'ABC'), b'abc') + self.assertEqual(normcase('\xc4\u0141\u03a8'), '\xe4\u0142\u03c8') + expected = '\u03c9\u2126' if sys.platform == 'win32' else '\u03c9\u03c9' + self.assertEqual(normcase('\u03a9\u2126'), expected) + if sys.platform == 'win32' or sys.getfilesystemencoding() == 'utf-8': + self.assertEqual(normcase('\xc4\u0141\u03a8'.encode()), + '\xe4\u0142\u03c8'.encode()) + self.assertEqual(normcase('\u03a9\u2126'.encode()), + expected.encode()) + + def test_normcase_invalid_paths(self): + normcase = ntpath.normcase + self.assertEqual(normcase('abc\x00def'), 'abc\x00def') + self.assertEqual(normcase(b'abc\x00def'), b'abc\x00def') + self.assertEqual(normcase('\udfff'), '\udfff') + if sys.platform == 'win32': + path = b'ABC' + bytes(range(128, 256)) + self.assertEqual(normcase(path), path.lower()) + def test_normpath(self): tester("ntpath.normpath('A//////././//.//B')", r'A\B') tester("ntpath.normpath('A/./B')", r'A\B') @@ -381,6 +452,21 @@ class TestNtpath(NtpathTestCase): tester("ntpath.normpath('\\\\')", '\\\\') tester("ntpath.normpath('//?/UNC/server/share/..')", '\\\\?\\UNC\\server\\share\\') + def test_normpath_invalid_paths(self): + normpath = ntpath.normpath + self.assertEqual(normpath('fo\x00o'), 'fo\x00o') + self.assertEqual(normpath(b'fo\x00o'), b'fo\x00o') + self.assertEqual(normpath('fo\x00o\\..\\bar'), 'bar') + self.assertEqual(normpath(b'fo\x00o\\..\\bar'), b'bar') + self.assertEqual(normpath('\udfff'), '\udfff') + self.assertEqual(normpath('\udfff\\..\\foo'), 'foo') + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, normpath, b'\xff') + self.assertRaises(UnicodeDecodeError, normpath, b'\xff\\..\\foo') + else: + self.assertEqual(normpath(b'\xff'), b'\xff') + self.assertEqual(normpath(b'\xff\\..\\foo'), b'foo') + def test_realpath_curdir(self): expected = ntpath.normpath(os.getcwd()) tester("ntpath.realpath('.')", expected) @@ -420,10 +506,6 @@ class TestNtpath(NtpathTestCase): d = drives.pop().encode() self.assertEqual(ntpath.realpath(d), d) - # gh-106242: Embedded nulls and non-strict fallback to abspath - self.assertEqual(ABSTFN + "\0spam", - ntpath.realpath(os_helper.TESTFN + "\0spam", strict=False)) - @os_helper.skip_unless_symlink @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') def test_realpath_strict(self): @@ -434,8 +516,51 @@ class TestNtpath(NtpathTestCase): self.addCleanup(os_helper.unlink, ABSTFN) self.assertRaises(FileNotFoundError, ntpath.realpath, ABSTFN, strict=True) self.assertRaises(FileNotFoundError, ntpath.realpath, ABSTFN + "2", strict=True) + + @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') + def test_realpath_invalid_paths(self): + realpath = ntpath.realpath + ABSTFN = ntpath.abspath(os_helper.TESTFN) + ABSTFNb = os.fsencode(ABSTFN) + path = ABSTFN + '\x00' + # gh-106242: Embedded nulls and non-strict fallback to abspath + self.assertEqual(realpath(path, strict=False), path) # gh-106242: Embedded nulls should raise OSError (not ValueError) - self.assertRaises(OSError, ntpath.realpath, ABSTFN + "\0spam", strict=True) + self.assertRaises(OSError, realpath, path, strict=True) + path = ABSTFNb + b'\x00' + self.assertEqual(realpath(path, strict=False), path) + self.assertRaises(OSError, realpath, path, strict=True) + path = ABSTFN + '\\nonexistent\\x\x00' + self.assertEqual(realpath(path, strict=False), path) + self.assertRaises(OSError, realpath, path, strict=True) + path = ABSTFNb + b'\\nonexistent\\x\x00' + self.assertEqual(realpath(path, strict=False), path) + self.assertRaises(OSError, realpath, path, strict=True) + path = ABSTFN + '\x00\\..' + self.assertEqual(realpath(path, strict=False), os.getcwd()) + self.assertEqual(realpath(path, strict=True), os.getcwd()) + path = ABSTFNb + b'\x00\\..' + self.assertEqual(realpath(path, strict=False), os.getcwdb()) + self.assertEqual(realpath(path, strict=True), os.getcwdb()) + path = ABSTFN + '\\nonexistent\\x\x00\\..' + self.assertEqual(realpath(path, strict=False), ABSTFN + '\\nonexistent') + self.assertRaises(OSError, realpath, path, strict=True) + path = ABSTFNb + b'\\nonexistent\\x\x00\\..' + self.assertEqual(realpath(path, strict=False), ABSTFNb + b'\\nonexistent') + self.assertRaises(OSError, realpath, path, strict=True) + + path = ABSTFNb + b'\xff' + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + path = ABSTFNb + b'\\nonexistent\\\xff' + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + path = ABSTFNb + b'\xff\\..' + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + path = ABSTFNb + b'\\nonexistent\\\xff\\..' + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) @os_helper.skip_unless_symlink @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @@ -812,8 +937,6 @@ class TestNtpath(NtpathTestCase): tester('ntpath.abspath("C:/nul")', "\\\\.\\nul") tester('ntpath.abspath("C:\\nul")', "\\\\.\\nul") self.assertTrue(ntpath.isabs(ntpath.abspath("C:spam"))) - self.assertEqual(ntpath.abspath("C:\x00"), ntpath.join(ntpath.abspath("C:"), "\x00")) - self.assertEqual(ntpath.abspath("\x00:spam"), "\x00:\\spam") tester('ntpath.abspath("//..")', "\\\\") tester('ntpath.abspath("//../")', "\\\\..\\") tester('ntpath.abspath("//../..")', "\\\\..\\") @@ -847,6 +970,26 @@ class TestNtpath(NtpathTestCase): drive, _ = ntpath.splitdrive(cwd_dir) tester('ntpath.abspath("/abc/")', drive + "\\abc") + def test_abspath_invalid_paths(self): + abspath = ntpath.abspath + if sys.platform == 'win32': + self.assertEqual(abspath("C:\x00"), ntpath.join(abspath("C:"), "\x00")) + self.assertEqual(abspath(b"C:\x00"), ntpath.join(abspath(b"C:"), b"\x00")) + self.assertEqual(abspath("\x00:spam"), "\x00:\\spam") + self.assertEqual(abspath(b"\x00:spam"), b"\x00:\\spam") + self.assertEqual(abspath('c:\\fo\x00o'), 'c:\\fo\x00o') + self.assertEqual(abspath(b'c:\\fo\x00o'), b'c:\\fo\x00o') + self.assertEqual(abspath('c:\\fo\x00o\\..\\bar'), 'c:\\bar') + self.assertEqual(abspath(b'c:\\fo\x00o\\..\\bar'), b'c:\\bar') + self.assertEqual(abspath('c:\\\udfff'), 'c:\\\udfff') + self.assertEqual(abspath('c:\\\udfff\\..\\foo'), 'c:\\foo') + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, abspath, b'c:\\\xff') + self.assertRaises(UnicodeDecodeError, abspath, b'c:\\\xff\\..\\foo') + else: + self.assertEqual(abspath(b'c:\\\xff'), b'c:\\\xff') + self.assertEqual(abspath(b'c:\\\xff\\..\\foo'), b'c:\\foo') + def test_relpath(self): tester('ntpath.relpath("a")', 'a') tester('ntpath.relpath(ntpath.abspath("a"))', 'a') @@ -989,6 +1132,18 @@ class TestNtpath(NtpathTestCase): self.assertTrue(ntpath.ismount(b"\\\\localhost\\c$")) self.assertTrue(ntpath.ismount(b"\\\\localhost\\c$\\")) + def test_ismount_invalid_paths(self): + ismount = ntpath.ismount + self.assertFalse(ismount("c:\\\udfff")) + if sys.platform == 'win32': + self.assertRaises(ValueError, ismount, "c:\\\x00") + self.assertRaises(ValueError, ismount, b"c:\\\x00") + self.assertRaises(UnicodeDecodeError, ismount, b"c:\\\xff") + else: + self.assertFalse(ismount("c:\\\x00")) + self.assertFalse(ismount(b"c:\\\x00")) + self.assertFalse(ismount(b"c:\\\xff")) + def test_isreserved(self): self.assertFalse(ntpath.isreserved('')) self.assertFalse(ntpath.isreserved('.')) @@ -1095,6 +1250,13 @@ class TestNtpath(NtpathTestCase): self.assertFalse(ntpath.isjunction('tmpdir')) self.assertPathEqual(ntpath.realpath('testjunc'), ntpath.realpath('tmpdir')) + def test_isfile_invalid_paths(self): + isfile = ntpath.isfile + self.assertIs(isfile('/tmp\udfffabcds'), False) + self.assertIs(isfile(b'/tmp\xffabcds'), False) + self.assertIs(isfile('/tmp\x00abcds'), False) + self.assertIs(isfile(b'/tmp\x00abcds'), False) + @unittest.skipIf(sys.platform != 'win32', "drive letters are a windows concept") def test_isfile_driveletter(self): drive = os.environ.get('SystemDrive') @@ -1195,9 +1357,6 @@ class PathLikeTests(NtpathTestCase): def test_path_normcase(self): self._check_function(self.path.normcase) - if sys.platform == 'win32': - self.assertEqual(ntpath.normcase('\u03a9\u2126'), 'ωΩ') - self.assertEqual(ntpath.normcase('abc\x00def'), 'abc\x00def') def test_path_isabs(self): self._check_function(self.path.isabs) diff --git a/Lib/test/test_pathlib/support/lexical_path.py b/Lib/test/test_pathlib/support/lexical_path.py index f29a521af9b..fd7fbf283a6 100644 --- a/Lib/test/test_pathlib/support/lexical_path.py +++ b/Lib/test/test_pathlib/support/lexical_path.py @@ -9,9 +9,10 @@ import posixpath from . import is_pypi if is_pypi: - from pathlib_abc import _JoinablePath + from pathlib_abc import vfspath, _JoinablePath else: from pathlib.types import _JoinablePath + from pathlib._os import vfspath class LexicalPath(_JoinablePath): @@ -22,20 +23,20 @@ class LexicalPath(_JoinablePath): self._segments = pathsegments def __hash__(self): - return hash(str(self)) + return hash(vfspath(self)) def __eq__(self, other): if not isinstance(other, LexicalPath): return NotImplemented - return str(self) == str(other) + return vfspath(self) == vfspath(other) - def __str__(self): + def __vfspath__(self): if not self._segments: return '' return self.parser.join(*self._segments) def __repr__(self): - return f'{type(self).__name__}({str(self)!r})' + return f'{type(self).__name__}({vfspath(self)!r})' def with_segments(self, *pathsegments): return type(self)(*pathsegments) diff --git a/Lib/test/test_pathlib/support/local_path.py b/Lib/test/test_pathlib/support/local_path.py index d481fd45ead..c1423c545bf 100644 --- a/Lib/test/test_pathlib/support/local_path.py +++ b/Lib/test/test_pathlib/support/local_path.py @@ -97,7 +97,7 @@ class LocalPathInfo(PathInfo): __slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink') def __init__(self, path): - self._path = str(path) + self._path = os.fspath(path) self._exists = None self._is_dir = None self._is_file = None @@ -139,14 +139,12 @@ class ReadableLocalPath(_ReadablePath, LexicalPath): Simple implementation of a ReadablePath class for local filesystem paths. """ __slots__ = ('info',) + __fspath__ = LexicalPath.__vfspath__ def __init__(self, *pathsegments): super().__init__(*pathsegments) self.info = LocalPathInfo(self) - def __fspath__(self): - return str(self) - def __open_rb__(self, buffering=-1): return open(self, 'rb') @@ -163,9 +161,7 @@ class WritableLocalPath(_WritablePath, LexicalPath): """ __slots__ = () - - def __fspath__(self): - return str(self) + __fspath__ = LexicalPath.__vfspath__ def __open_wb__(self, buffering=-1): return open(self, 'wb') diff --git a/Lib/test/test_pathlib/support/zip_path.py b/Lib/test/test_pathlib/support/zip_path.py index 2905260c9df..21e1d07423a 100644 --- a/Lib/test/test_pathlib/support/zip_path.py +++ b/Lib/test/test_pathlib/support/zip_path.py @@ -16,9 +16,10 @@ from stat import S_IFMT, S_ISDIR, S_ISREG, S_ISLNK from . import is_pypi if is_pypi: - from pathlib_abc import PathInfo, _ReadablePath, _WritablePath + from pathlib_abc import vfspath, PathInfo, _ReadablePath, _WritablePath else: from pathlib.types import PathInfo, _ReadablePath, _WritablePath + from pathlib._os import vfspath class ZipPathGround: @@ -34,16 +35,16 @@ class ZipPathGround: root.zip_file.close() def create_file(self, path, data=b''): - path.zip_file.writestr(str(path), data) + path.zip_file.writestr(vfspath(path), data) def create_dir(self, path): - zip_info = zipfile.ZipInfo(str(path) + '/') + zip_info = zipfile.ZipInfo(vfspath(path) + '/') zip_info.external_attr |= stat.S_IFDIR << 16 zip_info.external_attr |= stat.FILE_ATTRIBUTE_DIRECTORY path.zip_file.writestr(zip_info, '') def create_symlink(self, path, target): - zip_info = zipfile.ZipInfo(str(path)) + zip_info = zipfile.ZipInfo(vfspath(path)) zip_info.external_attr = stat.S_IFLNK << 16 path.zip_file.writestr(zip_info, target.encode()) @@ -62,28 +63,28 @@ class ZipPathGround: self.create_symlink(p.joinpath('brokenLinkLoop'), 'brokenLinkLoop') def readtext(self, p): - with p.zip_file.open(str(p), 'r') as f: + with p.zip_file.open(vfspath(p), 'r') as f: f = io.TextIOWrapper(f, encoding='utf-8') return f.read() def readbytes(self, p): - with p.zip_file.open(str(p), 'r') as f: + with p.zip_file.open(vfspath(p), 'r') as f: return f.read() readlink = readtext def isdir(self, p): - path_str = str(p) + "/" + path_str = vfspath(p) + "/" return path_str in p.zip_file.NameToInfo def isfile(self, p): - info = p.zip_file.NameToInfo.get(str(p)) + info = p.zip_file.NameToInfo.get(vfspath(p)) if info is None: return False return not stat.S_ISLNK(info.external_attr >> 16) def islink(self, p): - info = p.zip_file.NameToInfo.get(str(p)) + info = p.zip_file.NameToInfo.get(vfspath(p)) if info is None: return False return stat.S_ISLNK(info.external_attr >> 16) @@ -240,20 +241,20 @@ class ReadableZipPath(_ReadablePath): zip_file.filelist = ZipFileList(zip_file) def __hash__(self): - return hash((str(self), self.zip_file)) + return hash((vfspath(self), self.zip_file)) def __eq__(self, other): if not isinstance(other, ReadableZipPath): return NotImplemented - return str(self) == str(other) and self.zip_file is other.zip_file + return vfspath(self) == vfspath(other) and self.zip_file is other.zip_file - def __str__(self): + def __vfspath__(self): if not self._segments: return '' return self.parser.join(*self._segments) def __repr__(self): - return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})' + return f'{type(self).__name__}({vfspath(self)!r}, zip_file={self.zip_file!r})' def with_segments(self, *pathsegments): return type(self)(*pathsegments, zip_file=self.zip_file) @@ -261,7 +262,7 @@ class ReadableZipPath(_ReadablePath): @property def info(self): tree = self.zip_file.filelist.tree - return tree.resolve(str(self), follow_symlinks=False) + return tree.resolve(vfspath(self), follow_symlinks=False) def __open_rb__(self, buffering=-1): info = self.info.resolve() @@ -301,36 +302,36 @@ class WritableZipPath(_WritablePath): self.zip_file = zip_file def __hash__(self): - return hash((str(self), self.zip_file)) + return hash((vfspath(self), self.zip_file)) def __eq__(self, other): if not isinstance(other, WritableZipPath): return NotImplemented - return str(self) == str(other) and self.zip_file is other.zip_file + return vfspath(self) == vfspath(other) and self.zip_file is other.zip_file - def __str__(self): + def __vfspath__(self): if not self._segments: return '' return self.parser.join(*self._segments) def __repr__(self): - return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})' + return f'{type(self).__name__}({vfspath(self)!r}, zip_file={self.zip_file!r})' def with_segments(self, *pathsegments): return type(self)(*pathsegments, zip_file=self.zip_file) def __open_wb__(self, buffering=-1): - return self.zip_file.open(str(self), 'w') + return self.zip_file.open(vfspath(self), 'w') def mkdir(self, mode=0o777): - zinfo = zipfile.ZipInfo(str(self) + '/') + zinfo = zipfile.ZipInfo(vfspath(self) + '/') zinfo.external_attr |= stat.S_IFDIR << 16 zinfo.external_attr |= stat.FILE_ATTRIBUTE_DIRECTORY self.zip_file.writestr(zinfo, '') def symlink_to(self, target, target_is_directory=False): - zinfo = zipfile.ZipInfo(str(self)) + zinfo = zipfile.ZipInfo(vfspath(self)) zinfo.external_attr = stat.S_IFLNK << 16 if target_is_directory: zinfo.external_attr |= 0x10 - self.zip_file.writestr(zinfo, str(target)) + self.zip_file.writestr(zinfo, vfspath(target)) diff --git a/Lib/test/test_pathlib/test_join_windows.py b/Lib/test/test_pathlib/test_join_windows.py index 2cc634f25ef..f30c80605f7 100644 --- a/Lib/test/test_pathlib/test_join_windows.py +++ b/Lib/test/test_pathlib/test_join_windows.py @@ -8,6 +8,11 @@ import unittest from .support import is_pypi from .support.lexical_path import LexicalWindowsPath +if is_pypi: + from pathlib_abc import vfspath +else: + from pathlib._os import vfspath + class JoinTestBase: def test_join(self): @@ -70,17 +75,17 @@ class JoinTestBase: self.assertEqual(p / './dd:s', P(r'C:/a/b\./dd:s')) self.assertEqual(p / 'E:d:s', P('E:d:s')) - def test_str(self): + def test_vfspath(self): p = self.cls(r'a\b\c') - self.assertEqual(str(p), 'a\\b\\c') + self.assertEqual(vfspath(p), 'a\\b\\c') p = self.cls(r'c:\a\b\c') - self.assertEqual(str(p), 'c:\\a\\b\\c') + self.assertEqual(vfspath(p), 'c:\\a\\b\\c') p = self.cls('\\\\a\\b\\') - self.assertEqual(str(p), '\\\\a\\b\\') + self.assertEqual(vfspath(p), '\\\\a\\b\\') p = self.cls(r'\\a\b\c') - self.assertEqual(str(p), '\\\\a\\b\\c') + self.assertEqual(vfspath(p), '\\\\a\\b\\c') p = self.cls(r'\\a\b\c\d') - self.assertEqual(str(p), '\\\\a\\b\\c\\d') + self.assertEqual(vfspath(p), '\\\\a\\b\\c\\d') def test_parts(self): P = self.cls diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py index 8a313cc4292..37ef9fa1946 100644 --- a/Lib/test/test_pathlib/test_pathlib.py +++ b/Lib/test/test_pathlib/test_pathlib.py @@ -20,7 +20,7 @@ from test.support import cpython_only from test.support import is_emscripten, is_wasi from test.support import infinite_recursion from test.support import os_helper -from test.support.os_helper import TESTFN, FakePath +from test.support.os_helper import TESTFN, FS_NONASCII, FakePath try: import fcntl except ImportError: @@ -770,12 +770,16 @@ class PurePathTest(unittest.TestCase): self.assertEqual(self.make_uri(P('c:/')), 'file:///c:/') self.assertEqual(self.make_uri(P('c:/a/b.c')), 'file:///c:/a/b.c') self.assertEqual(self.make_uri(P('c:/a/b%#c')), 'file:///c:/a/b%25%23c') - self.assertEqual(self.make_uri(P('c:/a/b\xe9')), 'file:///c:/a/b%C3%A9') self.assertEqual(self.make_uri(P('//some/share/')), 'file://some/share/') self.assertEqual(self.make_uri(P('//some/share/a/b.c')), 'file://some/share/a/b.c') - self.assertEqual(self.make_uri(P('//some/share/a/b%#c\xe9')), - 'file://some/share/a/b%25%23c%C3%A9') + + from urllib.parse import quote_from_bytes + QUOTED_FS_NONASCII = quote_from_bytes(os.fsencode(FS_NONASCII)) + self.assertEqual(self.make_uri(P('c:/a/b' + FS_NONASCII)), + 'file:///c:/a/b' + QUOTED_FS_NONASCII) + self.assertEqual(self.make_uri(P('//some/share/a/b%#c' + FS_NONASCII)), + 'file://some/share/a/b%25%23c' + QUOTED_FS_NONASCII) @needs_windows def test_ordering_windows(self): diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py index 818e807dd3a..3b673a47c8c 100644 --- a/Lib/test/test_platform.py +++ b/Lib/test/test_platform.py @@ -383,15 +383,6 @@ class PlatformTest(unittest.TestCase): finally: platform._uname_cache = None - def test_java_ver(self): - import re - msg = re.escape( - "'java_ver' is deprecated and slated for removal in Python 3.15" - ) - with self.assertWarnsRegex(DeprecationWarning, msg): - res = platform.java_ver() - self.assertEqual(len(res), 4) - @unittest.skipUnless(support.MS_WINDOWS, 'This test only makes sense on Windows') def test_win32_ver(self): release1, version1, csd1, ptype1 = 'a', 'b', 'c', 'd' diff --git a/Lib/test/test_posixpath.py b/Lib/test/test_posixpath.py index fa19d549c26..c70688ff5e9 100644 --- a/Lib/test/test_posixpath.py +++ b/Lib/test/test_posixpath.py @@ -229,6 +229,7 @@ class PosixPathTest(unittest.TestCase): finally: safe_rmdir(ABSTFN) + def test_ismount_invalid_paths(self): self.assertIs(posixpath.ismount('/\udfff'), False) self.assertIs(posixpath.ismount(b'/\xff'), False) self.assertIs(posixpath.ismount('/\x00'), False) @@ -489,6 +490,79 @@ class PosixPathTest(unittest.TestCase): finally: os_helper.unlink(ABSTFN) + def test_realpath_invalid_paths(self): + path = '/\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + path = b'/\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + path = '/nonexistent/x\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + path = b'/nonexistent/x\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + path = '/\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + path = b'/\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + path = '/nonexistent/x\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + path = b'/nonexistent/x\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + + path = '/\udfff' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), path) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) + path = '/nonexistent/\udfff' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), path) + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + path = '/\udfff/..' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), '/') + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) + path = '/nonexistent/\udfff/..' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), '/nonexistent') + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + + path = b'/\xff' + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + else: + self.assertEqual(realpath(path, strict=False), path) + if support.is_wasi: + self.assertRaises(OSError, realpath, path, strict=True) + else: + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + path = b'/nonexistent/\xff' + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + else: + self.assertEqual(realpath(path, strict=False), path) + if support.is_wasi: + self.assertRaises(OSError, realpath, path, strict=True) + else: + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash def test_realpath_relative(self): diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py index fc8114891d1..59f5d1f893f 100644 --- a/Lib/test/test_pyrepl/test_pyrepl.py +++ b/Lib/test/test_pyrepl/test_pyrepl.py @@ -10,7 +10,7 @@ import sys import tempfile from unittest import TestCase, skipUnless, skipIf from unittest.mock import patch -from test.support import force_not_colorized, make_clean_env +from test.support import force_not_colorized, make_clean_env, Py_DEBUG from test.support import SHORT_TIMEOUT, STDLIB_DIR from test.support.import_helper import import_module from test.support.os_helper import EnvironmentVarGuard, unlink @@ -959,6 +959,26 @@ class TestPyReplModuleCompleter(TestCase): output = reader.readline() self.assertEqual(output, expected) + def test_builtin_completion_top_level(self): + import importlib + # Make iter_modules() search only the standard library. + # This makes the test more reliable in case there are + # other user packages/scripts on PYTHONPATH which can + # intefere with the completions. + lib_path = os.path.dirname(importlib.__path__[0]) + sys.path = [lib_path] + + cases = ( + ("import bui\t\n", "import builtins"), + ("from bui\t\n", "from builtins"), + ) + for code, expected in cases: + with self.subTest(code=code): + events = code_to_events(code) + reader = self.prepare_reader(events, namespace={}) + output = reader.readline() + self.assertEqual(output, expected) + def test_relative_import_completions(self): cases = ( ("from .readl\t\n", "from .readline"), @@ -1055,11 +1075,15 @@ class TestPyReplModuleCompleter(TestCase): self.assertEqual(actual, parsed) # The parser should not get tripped up by any # other preceding statements - code = f'import xyz\n{code}' - with self.subTest(code=code): + _code = f'import xyz\n{code}' + parser = ImportParser(_code) + actual = parser.parse() + with self.subTest(code=_code): self.assertEqual(actual, parsed) - code = f'import xyz;{code}' - with self.subTest(code=code): + _code = f'import xyz;{code}' + parser = ImportParser(_code) + actual = parser.parse() + with self.subTest(code=_code): self.assertEqual(actual, parsed) def test_parse_error(self): @@ -1610,3 +1634,16 @@ class TestMain(ReplTestCase): # Extra stuff (newline and `exit` rewrites) are necessary # because of how run_repl works. self.assertNotIn(">>> \n>>> >>>", cleaned_output) + + @skipUnless(Py_DEBUG, '-X showrefcount requires a Python debug build') + def test_showrefcount(self): + env = os.environ.copy() + env.pop("PYTHON_BASIC_REPL", "") + output, _ = self.run_repl("1\n1+2\nexit()\n", cmdline_args=['-Xshowrefcount'], env=env) + matches = re.findall(r'\[-?\d+ refs, \d+ blocks\]', output) + self.assertEqual(len(matches), 3) + + env["PYTHON_BASIC_REPL"] = "1" + output, _ = self.run_repl("1\n1+2\nexit()\n", cmdline_args=['-Xshowrefcount'], env=env) + matches = re.findall(r'\[-?\d+ refs, \d+ blocks\]', output) + self.assertEqual(len(matches), 3) diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py index 57526f88f93..1f655264f1c 100644 --- a/Lib/test/test_pyrepl/test_reader.py +++ b/Lib/test/test_pyrepl/test_reader.py @@ -517,6 +517,37 @@ class TestReaderInColor(ScreenEqualMixin, TestCase): self.assert_screen_equal(reader, code, clean=True) self.assert_screen_equal(reader, expected) + def test_syntax_highlighting_literal_brace_in_fstring_or_tstring(self): + code = dedent( + """\ + f"{{" + f"}}" + f"a{{b" + f"a}}b" + f"a{{b}}c" + t"a{{b}}c" + f"{{{0}}}" + f"{ {0} }" + """ + ) + expected = dedent( + """\ + {s}f"{z}{s}<<{z}{s}"{z} + {s}f"{z}{s}>>{z}{s}"{z} + {s}f"{z}{s}a<<{z}{s}b{z}{s}"{z} + {s}f"{z}{s}a>>{z}{s}b{z}{s}"{z} + {s}f"{z}{s}a<<{z}{s}b>>{z}{s}c{z}{s}"{z} + {s}t"{z}{s}a<<{z}{s}b>>{z}{s}c{z}{s}"{z} + {s}f"{z}{s}<<{z}{o}<{z}{n}0{z}{o}>{z}{s}>>{z}{s}"{z} + {s}f"{z}{o}<{z} {o}<{z}{n}0{z}{o}>{z} {o}>{z}{s}"{z} + """ + ).format(**colors).replace("<", "{").replace(">", "}") + events = code_to_events(code) + reader, _ = handle_all_events(events) + self.assert_screen_equal(reader, code, clean=True) + self.maxDiff=None + self.assert_screen_equal(reader, expected) + def test_control_characters(self): code = 'flag = "🏳️🌈"' events = code_to_events(code) diff --git a/Lib/test/test_string/test_templatelib.py b/Lib/test/test_string/test_templatelib.py index 5b9490c2be6..85fcff486d6 100644 --- a/Lib/test/test_string/test_templatelib.py +++ b/Lib/test/test_string/test_templatelib.py @@ -148,6 +148,13 @@ class TemplateIterTests(unittest.TestCase): self.assertEqual(res[1].format_spec, '') self.assertEqual(res[2], ' yz') + def test_exhausted(self): + # See https://github.com/python/cpython/issues/134119. + template_iter = iter(t"{1}") + self.assertIsInstance(next(template_iter), Interpolation) + self.assertRaises(StopIteration, next, template_iter) + self.assertRaises(StopIteration, next, template_iter) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py index 0eccf03a1a9..c7ac7914158 100644 --- a/Lib/test/test_syntax.py +++ b/Lib/test/test_syntax.py @@ -1431,6 +1431,23 @@ Better error message for using `except as` with not a name: Traceback (most recent call last): SyntaxError: cannot use except* statement with literal +Regression tests for gh-133999: + + >>> try: pass + ... except TypeError as name: raise from None + Traceback (most recent call last): + SyntaxError: invalid syntax + + >>> try: pass + ... except* TypeError as name: raise from None + Traceback (most recent call last): + SyntaxError: invalid syntax + + >>> match 1: + ... case 1 | 2 as abc: raise from None + Traceback (most recent call last): + SyntaxError: invalid syntax + Ensure that early = are not matched by the parser as invalid comparisons >>> f(2, 4, x=34); 1 $ 2 Traceback (most recent call last): diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 59ef5c99309..fb1c8492a64 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -1976,12 +1976,13 @@ class TestRemoteExec(unittest.TestCase): def tearDown(self): test.support.reap_children() - def _run_remote_exec_test(self, script_code, python_args=None, env=None, prologue=''): + def _run_remote_exec_test(self, script_code, python_args=None, env=None, + prologue='', + script_path=os_helper.TESTFN + '_remote.py'): # Create the script that will be remotely executed - script = os_helper.TESTFN + '_remote.py' - self.addCleanup(os_helper.unlink, script) + self.addCleanup(os_helper.unlink, script_path) - with open(script, 'w') as f: + with open(script_path, 'w') as f: f.write(script_code) # Create and run the target process @@ -2050,7 +2051,7 @@ sock.close() self.assertEqual(response, b"ready") # Try remote exec on the target process - sys.remote_exec(proc.pid, script) + sys.remote_exec(proc.pid, script_path) # Signal script to continue client_socket.sendall(b"continue") @@ -2073,14 +2074,32 @@ sock.close() def test_remote_exec(self): """Test basic remote exec functionality""" - script = ''' -print("Remote script executed successfully!") -''' + script = 'print("Remote script executed successfully!")' returncode, stdout, stderr = self._run_remote_exec_test(script) # self.assertEqual(returncode, 0) self.assertIn(b"Remote script executed successfully!", stdout) self.assertEqual(stderr, b"") + def test_remote_exec_bytes(self): + script = 'print("Remote script executed successfully!")' + script_path = os.fsencode(os_helper.TESTFN) + b'_bytes_remote.py' + returncode, stdout, stderr = self._run_remote_exec_test(script, + script_path=script_path) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertEqual(stderr, b"") + + @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 'requires undecodable path') + @unittest.skipIf(sys.platform == 'darwin', + 'undecodable paths are not supported on macOS') + def test_remote_exec_undecodable(self): + script = 'print("Remote script executed successfully!")' + script_path = os_helper.TESTFN_UNDECODABLE + b'_undecodable_remote.py' + for script_path in [script_path, os.fsdecode(script_path)]: + returncode, stdout, stderr = self._run_remote_exec_test(script, + script_path=script_path) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertEqual(stderr, b"") + def test_remote_exec_with_self_process(self): """Test remote exec with the target process being the same as the test process""" @@ -2157,6 +2176,13 @@ raise Exception("Remote script exception") with self.assertRaises(OSError): sys.remote_exec(99999, "print('should not run')") + def test_remote_exec_invalid_script(self): + """Test remote exec with invalid script type""" + with self.assertRaises(TypeError): + sys.remote_exec(0, None) + with self.assertRaises(TypeError): + sys.remote_exec(0, 123) + def test_remote_exec_syntax_error(self): """Test remote exec with syntax error in script""" script = ''' diff --git a/Lib/test/test_sysconfig.py b/Lib/test/test_sysconfig.py index 53e55383bf9..963cf753ce6 100644 --- a/Lib/test/test_sysconfig.py +++ b/Lib/test/test_sysconfig.py @@ -531,13 +531,10 @@ class TestSysConfig(unittest.TestCase, VirtualEnvironmentMixin): Python_h = os.path.join(srcdir, 'Include', 'Python.h') self.assertTrue(os.path.exists(Python_h), Python_h) # <srcdir>/PC/pyconfig.h.in always exists even if unused - pyconfig_h = os.path.join(srcdir, 'PC', 'pyconfig.h.in') - self.assertTrue(os.path.exists(pyconfig_h), pyconfig_h) pyconfig_h_in = os.path.join(srcdir, 'pyconfig.h.in') self.assertTrue(os.path.exists(pyconfig_h_in), pyconfig_h_in) if os.name == 'nt': - # <executable dir>/pyconfig.h exists on Windows in a build tree - pyconfig_h = os.path.join(sys.executable, '..', 'pyconfig.h') + pyconfig_h = os.path.join(srcdir, 'PC', 'pyconfig.h') self.assertTrue(os.path.exists(pyconfig_h), pyconfig_h) elif os.name == 'posix': makefile_dir = os.path.dirname(sysconfig.get_makefile_filename()) diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 2d9649237a9..2018a20afd1 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -3490,11 +3490,12 @@ class ArchiveMaker: with t.open() as tar: ... # `tar` is now a TarFile with 'filename' in it! """ - def __init__(self): + def __init__(self, **kwargs): self.bio = io.BytesIO() + self.tar_kwargs = dict(kwargs) def __enter__(self): - self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio) + self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio, **self.tar_kwargs) return self def __exit__(self, *exc): @@ -4073,7 +4074,10 @@ class TestExtractionFilters(unittest.TestCase): # that in the test archive.) with tarfile.TarFile.open(tarname) as tar: for tarinfo in tar.getmembers(): - filtered = tarfile.tar_filter(tarinfo, '') + try: + filtered = tarfile.tar_filter(tarinfo, '') + except UnicodeEncodeError: + continue self.assertIs(filtered.name, tarinfo.name) self.assertIs(filtered.type, tarinfo.type) @@ -4084,11 +4088,48 @@ class TestExtractionFilters(unittest.TestCase): for tarinfo in tar.getmembers(): try: filtered = tarfile.data_filter(tarinfo, '') - except tarfile.FilterError: + except (tarfile.FilterError, UnicodeEncodeError): continue self.assertIs(filtered.name, tarinfo.name) self.assertIs(filtered.type, tarinfo.type) + @unittest.skipIf(sys.platform == 'win32', 'requires native bytes paths') + def test_filter_unencodable(self): + # Sanity check using a valid path. + tarinfo = tarfile.TarInfo(os_helper.TESTFN) + filtered = tarfile.tar_filter(tarinfo, '') + self.assertIs(filtered.name, tarinfo.name) + filtered = tarfile.data_filter(tarinfo, '') + self.assertIs(filtered.name, tarinfo.name) + + tarinfo = tarfile.TarInfo('test\x00') + self.assertRaises(ValueError, tarfile.tar_filter, tarinfo, '') + self.assertRaises(ValueError, tarfile.data_filter, tarinfo, '') + tarinfo = tarfile.TarInfo('\ud800') + self.assertRaises(UnicodeEncodeError, tarfile.tar_filter, tarinfo, '') + self.assertRaises(UnicodeEncodeError, tarfile.data_filter, tarinfo, '') + + @unittest.skipIf(sys.platform == 'win32', 'requires native bytes paths') + def test_extract_unencodable(self): + # Create a member with name \xed\xa0\x80 which is UTF-8 encoded + # lone surrogate \ud800. + with ArchiveMaker(encoding='ascii', errors='surrogateescape') as arc: + arc.add('\udced\udca0\udc80') + with os_helper.temp_cwd() as tmp: + tar = arc.open(encoding='utf-8', errors='surrogatepass', + errorlevel=1) + self.assertEqual(tar.getnames(), ['\ud800']) + with self.assertRaises(UnicodeEncodeError): + tar.extractall() + self.assertEqual(os.listdir(), []) + + tar = arc.open(encoding='utf-8', errors='surrogatepass', + errorlevel=0, debug=1) + with support.captured_stderr() as stderr: + tar.extractall() + self.assertEqual(os.listdir(), []) + self.assertIn('tarfile: UnicodeEncodeError ', stderr.getvalue()) + def test_change_default_filter_on_instance(self): tar = tarfile.TarFile(tarname, 'r') def strict_filter(tarinfo, path): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index abe63c10c0a..b6e2d419019 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2137,8 +2137,7 @@ class CRLockTests(lock_tests.RLockTests): ] for args, kwargs in arg_types: with self.subTest(args=args, kwargs=kwargs): - with self.assertWarns(DeprecationWarning): - threading.RLock(*args, **kwargs) + self.assertRaises(TypeError, threading.RLock, *args, **kwargs) # Subtypes with custom `__init__` are allowed (but, not recommended): class CustomRLock(self.locktype): @@ -2156,6 +2155,9 @@ class ConditionAsRLockTests(lock_tests.RLockTests): # Condition uses an RLock by default and exports its API. locktype = staticmethod(threading.Condition) + def test_constructor_noargs(self): + self.skipTest("Condition allows positional arguments") + def test_recursion_count(self): self.skipTest("Condition does not expose _recursion_count()") diff --git a/Lib/test/test_typing.py b/Lib/test/test_typing.py index bc03e6bf2d7..246be22a0d8 100644 --- a/Lib/test/test_typing.py +++ b/Lib/test/test_typing.py @@ -8487,6 +8487,36 @@ class TypedDictTests(BaseTestCase): self.assertEqual(Child.__required_keys__, frozenset(['a'])) self.assertEqual(Child.__optional_keys__, frozenset()) + def test_inheritance_pep563(self): + def _make_td(future, class_name, annos, base, extra_names=None): + lines = [] + if future: + lines.append('from __future__ import annotations') + lines.append('from typing import TypedDict') + lines.append(f'class {class_name}({base}):') + for name, anno in annos.items(): + lines.append(f' {name}: {anno}') + code = '\n'.join(lines) + ns = run_code(code, extra_names) + return ns[class_name] + + for base_future in (True, False): + for child_future in (True, False): + with self.subTest(base_future=base_future, child_future=child_future): + base = _make_td( + base_future, "Base", {"base": "int"}, "TypedDict" + ) + self.assertIsNotNone(base.__annotate__) + child = _make_td( + child_future, "Child", {"child": "int"}, "Base", {"Base": base} + ) + base_anno = ForwardRef("int", module="builtins") if base_future else int + child_anno = ForwardRef("int", module="builtins") if child_future else int + self.assertEqual(base.__annotations__, {'base': base_anno}) + self.assertEqual( + child.__annotations__, {'child': child_anno, 'base': base_anno} + ) + def test_required_notrequired_keys(self): self.assertEqual(NontotalMovie.__required_keys__, frozenset({"title"})) @@ -10668,6 +10698,9 @@ class UnionGenericAliasTests(BaseTestCase): with self.assertWarns(DeprecationWarning): self.assertNotEqual(int, typing._UnionGenericAlias) + def test_hashable(self): + self.assertEqual(hash(typing._UnionGenericAlias), hash(Union)) + def load_tests(loader, tests, pattern): import doctest diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index c965860fbb1..bc1030eea60 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -109,7 +109,7 @@ class urlopen_FileTests(unittest.TestCase): finally: f.close() self.pathname = os_helper.TESTFN - self.quoted_pathname = urllib.parse.quote(self.pathname) + self.quoted_pathname = urllib.parse.quote(os.fsencode(self.pathname)) self.returned_obj = urllib.request.urlopen("file:%s" % self.quoted_pathname) def tearDown(self): diff --git a/Lib/test/test_wave.py b/Lib/test/test_wave.py index 5e771c8de96..6c3362857fc 100644 --- a/Lib/test/test_wave.py +++ b/Lib/test/test_wave.py @@ -136,32 +136,6 @@ class MiscTestCase(unittest.TestCase): not_exported = {'WAVE_FORMAT_PCM', 'WAVE_FORMAT_EXTENSIBLE', 'KSDATAFORMAT_SUBTYPE_PCM'} support.check__all__(self, wave, not_exported=not_exported) - def test_read_deprecations(self): - filename = support.findfile('pluck-pcm8.wav', subdir='audiodata') - with wave.open(filename) as reader: - with self.assertWarns(DeprecationWarning): - with self.assertRaises(wave.Error): - reader.getmark('mark') - with self.assertWarns(DeprecationWarning): - self.assertIsNone(reader.getmarkers()) - - def test_write_deprecations(self): - with io.BytesIO(b'') as tmpfile: - with wave.open(tmpfile, 'wb') as writer: - writer.setnchannels(1) - writer.setsampwidth(1) - writer.setframerate(1) - writer.setcomptype('NONE', 'not compressed') - - with self.assertWarns(DeprecationWarning): - with self.assertRaises(wave.Error): - writer.setmark(0, 0, 'mark') - with self.assertWarns(DeprecationWarning): - with self.assertRaises(wave.Error): - writer.getmark('mark') - with self.assertWarns(DeprecationWarning): - self.assertIsNone(writer.getmarkers()) - class WaveLowLevelTest(unittest.TestCase): diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py index ae898150658..43056978848 100644 --- a/Lib/test/test_zipfile/test_core.py +++ b/Lib/test/test_zipfile/test_core.py @@ -3642,7 +3642,7 @@ class EncodedMetadataTests(unittest.TestCase): except OSError: pass except UnicodeEncodeError: - self.skipTest(f'cannot encode file name {fn!r}') + self.skipTest(f'cannot encode file name {fn!a}') zipfile.main(["--metadata-encoding=shift_jis", "-e", TESTFN, TESTFN2]) listing = os.listdir(TESTFN2) diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index 713294c4c27..53ca592ea38 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -288,8 +288,8 @@ class CompressorTestCase(unittest.TestCase): KEY = 100001234 option = {CompressionParameter.compression_level: 10, KEY: 200000000} - pattern = r'Zstd compression parameter.*?"unknown parameter \(key %d\)"' \ - % KEY + pattern = (r'Invalid zstd compression parameter.*?' + fr'"unknown parameter \(key {KEY}\)"') with self.assertRaisesRegex(ZstdError, pattern): ZstdCompressor(options=option) @@ -420,8 +420,8 @@ class DecompressorTestCase(unittest.TestCase): KEY = 100001234 options = {DecompressionParameter.window_log_max: DecompressionParameter.window_log_max.bounds()[1], KEY: 200000000} - pattern = r'Zstd decompression parameter.*?"unknown parameter \(key %d\)"' \ - % KEY + pattern = (r'Invalid zstd decompression parameter.*?' + fr'"unknown parameter \(key {KEY}\)"') with self.assertRaisesRegex(ZstdError, pattern): ZstdDecompressor(options=options) @@ -507,7 +507,7 @@ class DecompressorTestCase(unittest.TestCase): self.assertFalse(d.needs_input) def test_decompressor_arg(self): - zd = ZstdDict(b'12345678', True) + zd = ZstdDict(b'12345678', is_raw=True) with self.assertRaises(TypeError): d = ZstdDecompressor(zstd_dict={}) @@ -1021,6 +1021,10 @@ class DecompressorFlagsTestCase(unittest.TestCase): class ZstdDictTestCase(unittest.TestCase): def test_is_raw(self): + # must be passed as a keyword argument + with self.assertRaises(TypeError): + ZstdDict(bytes(8), True) + # content < 8 b = b'1234567' with self.assertRaises(ValueError): @@ -1068,9 +1072,9 @@ class ZstdDictTestCase(unittest.TestCase): # corrupted zd = ZstdDict(dict_content, is_raw=False) - with self.assertRaisesRegex(ZstdError, r'ZSTD_CDict.*?corrupted'): + with self.assertRaisesRegex(ZstdError, r'ZSTD_CDict.*?content\.$'): ZstdCompressor(zstd_dict=zd.as_digested_dict) - with self.assertRaisesRegex(ZstdError, r'ZSTD_DDict.*?corrupted'): + with self.assertRaisesRegex(ZstdError, r'ZSTD_DDict.*?content\.$'): ZstdDecompressor(zd) # wrong type @@ -1096,7 +1100,7 @@ class ZstdDictTestCase(unittest.TestCase): TRAINED_DICT = train_dict(SAMPLES, DICT_SIZE1) - ZstdDict(TRAINED_DICT.dict_content, False) + ZstdDict(TRAINED_DICT.dict_content, is_raw=False) self.assertNotEqual(TRAINED_DICT.dict_id, 0) self.assertGreater(len(TRAINED_DICT.dict_content), 0) @@ -1250,7 +1254,7 @@ class ZstdDictTestCase(unittest.TestCase): def test_as_prefix(self): # V1 V1 = THIS_FILE_BYTES - zd = ZstdDict(V1, True) + zd = ZstdDict(V1, is_raw=True) # V2 mid = len(V1) // 2 @@ -1266,7 +1270,7 @@ class ZstdDictTestCase(unittest.TestCase): self.assertEqual(decompress(dat, zd.as_prefix), V2) # use wrong prefix - zd2 = ZstdDict(SAMPLES[0], True) + zd2 = ZstdDict(SAMPLES[0], is_raw=True) try: decompressed = decompress(dat, zd2.as_prefix) except ZstdError: # expected @@ -2426,6 +2430,7 @@ class OpenTestCase(unittest.TestCase): self.assertEqual(f.write(arr), LENGTH) self.assertEqual(f.tell(), LENGTH) +@unittest.skip("it fails for now, see gh-133885") class FreeThreadingMethodTests(unittest.TestCase): @unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled') |