aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/_test_multiprocessing.py21
-rw-r--r--Lib/test/datetimetester.py9
-rw-r--r--Lib/test/lock_tests.py5
-rw-r--r--Lib/test/pythoninfo.py11
-rw-r--r--Lib/test/support/__init__.py4
-rw-r--r--Lib/test/support/hashlib_helper.py221
-rw-r--r--Lib/test/support/strace_helper.py2
-rw-r--r--Lib/test/test_argparse.py131
-rw-r--r--Lib/test/test_asyncio/test_futures.py58
-rw-r--r--Lib/test/test_asyncio/test_tools.py8
-rw-r--r--Lib/test/test_capi/test_opt.py136
-rw-r--r--Lib/test/test_codeccallbacks.py39
-rw-r--r--Lib/test/test_codecs.py52
-rw-r--r--Lib/test/test_concurrent_futures/test_future.py57
-rw-r--r--Lib/test/test_crossinterp.py34
-rw-r--r--Lib/test/test_curses.py3
-rw-r--r--Lib/test/test_faulthandler.py23
-rw-r--r--Lib/test/test_fcntl.py46
-rw-r--r--Lib/test/test_fractions.py208
-rw-r--r--Lib/test/test_free_threading/test_functools.py75
-rw-r--r--Lib/test/test_genericalias.py6
-rw-r--r--Lib/test/test_hmac.py154
-rw-r--r--Lib/test/test_httpservers.py668
-rw-r--r--Lib/test/test_importlib/import_/test_relative_imports.py15
-rw-r--r--Lib/test/test_interpreters/test_api.py8
-rw-r--r--Lib/test/test_ioctl.py3
-rw-r--r--Lib/test/test_logging.py23
-rw-r--r--Lib/test/test_ntpath.py179
-rw-r--r--Lib/test/test_pathlib/support/lexical_path.py11
-rw-r--r--Lib/test/test_pathlib/support/local_path.py10
-rw-r--r--Lib/test/test_pathlib/support/zip_path.py45
-rw-r--r--Lib/test/test_pathlib/test_join_windows.py17
-rw-r--r--Lib/test/test_pathlib/test_pathlib.py12
-rw-r--r--Lib/test/test_platform.py9
-rw-r--r--Lib/test/test_posixpath.py74
-rw-r--r--Lib/test/test_pyrepl/test_pyrepl.py47
-rw-r--r--Lib/test/test_pyrepl/test_reader.py31
-rw-r--r--Lib/test/test_string/test_templatelib.py7
-rw-r--r--Lib/test/test_syntax.py17
-rw-r--r--Lib/test/test_sys.py42
-rw-r--r--Lib/test/test_sysconfig.py5
-rw-r--r--Lib/test/test_tarfile.py49
-rw-r--r--Lib/test/test_threading.py6
-rw-r--r--Lib/test/test_typing.py33
-rw-r--r--Lib/test/test_urllib.py2
-rw-r--r--Lib/test/test_wave.py26
-rw-r--r--Lib/test/test_zipfile/test_core.py2
-rw-r--r--Lib/test/test_zstd.py25
48 files changed, 1934 insertions, 735 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 1b690cb88bf..6a20a1eb03e 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -513,9 +513,14 @@ class _TestProcess(BaseTestCase):
time.sleep(100)
@classmethod
- def _sleep_no_int_handler(cls):
+ def _sleep_some_event(cls, event):
+ event.set()
+ time.sleep(100)
+
+ @classmethod
+ def _sleep_no_int_handler(cls, event):
signal.signal(signal.SIGINT, signal.SIG_DFL)
- cls._sleep_some()
+ cls._sleep_some_event(event)
@classmethod
def _test_sleep(cls, delay):
@@ -525,7 +530,10 @@ class _TestProcess(BaseTestCase):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
- p = self.Process(target=target or self._sleep_some)
+ event = self.Event()
+ if not target:
+ target = self._sleep_some_event
+ p = self.Process(target=target, args=(event,))
p.daemon = True
p.start()
@@ -543,8 +551,11 @@ class _TestProcess(BaseTestCase):
self.assertTimingAlmostEqual(join.elapsed, 0.0)
self.assertEqual(p.is_alive(), True)
- # XXX maybe terminating too soon causes the problems on Gentoo...
- time.sleep(1)
+ timeout = support.SHORT_TIMEOUT
+ if not event.wait(timeout):
+ p.terminate()
+ p.join()
+ self.fail(f"event not signaled in {timeout} seconds")
meth(p)
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py
index d1882a310bb..345698cfb5f 100644
--- a/Lib/test/datetimetester.py
+++ b/Lib/test/datetimetester.py
@@ -3571,6 +3571,10 @@ class TestDateTime(TestDate):
'2009-04-19T12:30:45.400 +02:30', # Space between ms and timezone (gh-130959)
'2009-04-19T12:30:45.400 ', # Trailing space (gh-130959)
'2009-04-19T12:30:45. 400', # Space before fraction (gh-130959)
+ '2009-04-19T12:30:45+00:90:00', # Time zone field out from range
+ '2009-04-19T12:30:45+00:00:90', # Time zone field out from range
+ '2009-04-19T12:30:45-00:90:00', # Time zone field out from range
+ '2009-04-19T12:30:45-00:00:90', # Time zone field out from range
]
for bad_str in bad_strs:
@@ -4795,6 +4799,11 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase):
'12:30:45.400 +02:30', # Space between ms and timezone (gh-130959)
'12:30:45.400 ', # Trailing space (gh-130959)
'12:30:45. 400', # Space before fraction (gh-130959)
+ '24:00:00.000001', # Has non-zero microseconds on 24:00
+ '24:00:01.000000', # Has non-zero seconds on 24:00
+ '24:01:00.000000', # Has non-zero minutes on 24:00
+ '12:30:45+00:90:00', # Time zone field out from range
+ '12:30:45+00:00:90', # Time zone field out from range
]
for bad_str in bad_strs:
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index 009e04e9c0b..d64b2b9fe28 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -124,6 +124,11 @@ class BaseLockTests(BaseTestCase):
lock = self.locktype()
del lock
+ def test_constructor_noargs(self):
+ self.assertRaises(TypeError, self.locktype, 1)
+ self.assertRaises(TypeError, self.locktype, x=1)
+ self.assertRaises(TypeError, self.locktype, 1, x=2)
+
def test_repr(self):
lock = self.locktype()
self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
diff --git a/Lib/test/pythoninfo.py b/Lib/test/pythoninfo.py
index 682815c3fdd..e1830f2e6eb 100644
--- a/Lib/test/pythoninfo.py
+++ b/Lib/test/pythoninfo.py
@@ -658,6 +658,16 @@ def collect_zlib(info_add):
copy_attributes(info_add, zlib, 'zlib.%s', attributes)
+def collect_zstd(info_add):
+ try:
+ import _zstd
+ except ImportError:
+ return
+
+ attributes = ('zstd_version',)
+ copy_attributes(info_add, _zstd, 'zstd.%s', attributes)
+
+
def collect_expat(info_add):
try:
from xml.parsers import expat
@@ -1051,6 +1061,7 @@ def collect_info(info):
collect_tkinter,
collect_windows,
collect_zlib,
+ collect_zstd,
collect_libregrtest_utils,
# Collecting from tests should be last as they have side effects.
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index c74c3a31909..9b6e80fdad9 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -696,9 +696,11 @@ def sortdict(dict):
return "{%s}" % withcommas
-def run_code(code: str) -> dict[str, object]:
+def run_code(code: str, extra_names: dict[str, object] | None = None) -> dict[str, object]:
"""Run a piece of code after dedenting it, and return its global namespace."""
ns = {}
+ if extra_names:
+ ns.update(extra_names)
exec(textwrap.dedent(code), ns)
return ns
diff --git a/Lib/test/support/hashlib_helper.py b/Lib/test/support/hashlib_helper.py
index 5043f08dd93..7032257b068 100644
--- a/Lib/test/support/hashlib_helper.py
+++ b/Lib/test/support/hashlib_helper.py
@@ -23,6 +23,22 @@ def requires_builtin_hmac():
return unittest.skipIf(_hmac is None, "requires _hmac")
+def _missing_hash(digestname, implementation=None, *, exc=None):
+ parts = ["missing", implementation, f"hash algorithm: {digestname!r}"]
+ msg = " ".join(filter(None, parts))
+ raise unittest.SkipTest(msg) from exc
+
+
+def _openssl_availabillity(digestname, *, usedforsecurity):
+ try:
+ _hashlib.new(digestname, usedforsecurity=usedforsecurity)
+ except AttributeError:
+ assert _hashlib is None
+ _missing_hash(digestname, "OpenSSL")
+ except ValueError as exc:
+ _missing_hash(digestname, "OpenSSL", exc=exc)
+
+
def _decorate_func_or_class(func_or_class, decorator_func):
if not isinstance(func_or_class, type):
return decorator_func(func_or_class)
@@ -71,8 +87,7 @@ def requires_hashdigest(digestname, openssl=None, usedforsecurity=True):
try:
test_availability()
except ValueError as exc:
- msg = f"missing hash algorithm: {digestname!r}"
- raise unittest.SkipTest(msg) from exc
+ _missing_hash(digestname, exc=exc)
return func(*args, **kwargs)
return wrapper
@@ -87,14 +102,44 @@ def requires_openssl_hashdigest(digestname, *, usedforsecurity=True):
The hashing algorithm may be missing or blocked by a strict crypto policy.
"""
def decorator_func(func):
- @requires_hashlib()
+ @requires_hashlib() # avoid checking at each call
@functools.wraps(func)
def wrapper(*args, **kwargs):
+ _openssl_availabillity(digestname, usedforsecurity=usedforsecurity)
+ return func(*args, **kwargs)
+ return wrapper
+
+ def decorator(func_or_class):
+ return _decorate_func_or_class(func_or_class, decorator_func)
+ return decorator
+
+
+def find_openssl_hashdigest_constructor(digestname, *, usedforsecurity=True):
+ """Find the OpenSSL hash function constructor by its name."""
+ assert isinstance(digestname, str), digestname
+ _openssl_availabillity(digestname, usedforsecurity=usedforsecurity)
+ # This returns a function of the form _hashlib.openssl_<name> and
+ # not a lambda function as it is rejected by _hashlib.hmac_new().
+ return getattr(_hashlib, f"openssl_{digestname}")
+
+
+def requires_builtin_hashdigest(
+ module_name, digestname, *, usedforsecurity=True
+):
+ """Decorator raising SkipTest if a HACL* hashing algorithm is missing.
+
+ - The *module_name* is the C extension module name based on HACL*.
+ - The *digestname* is one of its member, e.g., 'md5'.
+ """
+ def decorator_func(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ module = import_module(module_name)
try:
- _hashlib.new(digestname, usedforsecurity=usedforsecurity)
- except ValueError:
- msg = f"missing OpenSSL hash algorithm: {digestname!r}"
- raise unittest.SkipTest(msg)
+ getattr(module, digestname)
+ except AttributeError:
+ fullname = f'{module_name}.{digestname}'
+ _missing_hash(fullname, implementation="HACL")
return func(*args, **kwargs)
return wrapper
@@ -103,6 +148,168 @@ def requires_openssl_hashdigest(digestname, *, usedforsecurity=True):
return decorator
+def find_builtin_hashdigest_constructor(
+ module_name, digestname, *, usedforsecurity=True
+):
+ """Find the HACL* hash function constructor.
+
+ - The *module_name* is the C extension module name based on HACL*.
+ - The *digestname* is one of its member, e.g., 'md5'.
+ """
+ module = import_module(module_name)
+ try:
+ constructor = getattr(module, digestname)
+ constructor(b'', usedforsecurity=usedforsecurity)
+ except (AttributeError, TypeError, ValueError):
+ _missing_hash(f'{module_name}.{digestname}', implementation="HACL")
+ return constructor
+
+
+class HashFunctionsTrait:
+ """Mixin trait class containing hash functions.
+
+ This class is assumed to have all unitest.TestCase methods but should
+ not directly inherit from it to prevent the test suite being run on it.
+
+ Subclasses should implement the hash functions by returning an object
+ that can be recognized as a valid digestmod parameter for both hashlib
+ and HMAC. In particular, it cannot be a lambda function as it will not
+ be recognized by hashlib (it will still be accepted by the pure Python
+ implementation of HMAC).
+ """
+
+ ALGORITHMS = [
+ 'md5', 'sha1',
+ 'sha224', 'sha256', 'sha384', 'sha512',
+ 'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512',
+ ]
+
+ # Default 'usedforsecurity' to use when looking up a hash function.
+ usedforsecurity = True
+
+ def _find_constructor(self, name):
+ # By default, a missing algorithm skips the test that uses it.
+ self.assertIn(name, self.ALGORITHMS)
+ self.skipTest(f"missing hash function: {name}")
+
+ @property
+ def md5(self):
+ return self._find_constructor("md5")
+
+ @property
+ def sha1(self):
+ return self._find_constructor("sha1")
+
+ @property
+ def sha224(self):
+ return self._find_constructor("sha224")
+
+ @property
+ def sha256(self):
+ return self._find_constructor("sha256")
+
+ @property
+ def sha384(self):
+ return self._find_constructor("sha384")
+
+ @property
+ def sha512(self):
+ return self._find_constructor("sha512")
+
+ @property
+ def sha3_224(self):
+ return self._find_constructor("sha3_224")
+
+ @property
+ def sha3_256(self):
+ return self._find_constructor("sha3_256")
+
+ @property
+ def sha3_384(self):
+ return self._find_constructor("sha3_384")
+
+ @property
+ def sha3_512(self):
+ return self._find_constructor("sha3_512")
+
+
+class NamedHashFunctionsTrait(HashFunctionsTrait):
+ """Trait containing named hash functions.
+
+ Hash functions are available if and only if they are available in hashlib.
+ """
+
+ def _find_constructor(self, name):
+ self.assertIn(name, self.ALGORITHMS)
+ return name
+
+
+class OpenSSLHashFunctionsTrait(HashFunctionsTrait):
+ """Trait containing OpenSSL hash functions.
+
+ Hash functions are available if and only if they are available in _hashlib.
+ """
+
+ def _find_constructor(self, name):
+ self.assertIn(name, self.ALGORITHMS)
+ return find_openssl_hashdigest_constructor(
+ name, usedforsecurity=self.usedforsecurity
+ )
+
+
+class BuiltinHashFunctionsTrait(HashFunctionsTrait):
+ """Trait containing HACL* hash functions.
+
+ Hash functions are available if and only if they are available in C.
+ In particular, HACL* HMAC-MD5 may be available even though HACL* md5
+ is not since the former is unconditionally built.
+ """
+
+ def _find_constructor_in(self, module, name):
+ self.assertIn(name, self.ALGORITHMS)
+ return find_builtin_hashdigest_constructor(module, name)
+
+ @property
+ def md5(self):
+ return self._find_constructor_in("_md5", "md5")
+
+ @property
+ def sha1(self):
+ return self._find_constructor_in("_sha1", "sha1")
+
+ @property
+ def sha224(self):
+ return self._find_constructor_in("_sha2", "sha224")
+
+ @property
+ def sha256(self):
+ return self._find_constructor_in("_sha2", "sha256")
+
+ @property
+ def sha384(self):
+ return self._find_constructor_in("_sha2", "sha384")
+
+ @property
+ def sha512(self):
+ return self._find_constructor_in("_sha2", "sha512")
+
+ @property
+ def sha3_224(self):
+ return self._find_constructor_in("_sha3", "sha3_224")
+
+ @property
+ def sha3_256(self):
+ return self._find_constructor_in("_sha3","sha3_256")
+
+ @property
+ def sha3_384(self):
+ return self._find_constructor_in("_sha3","sha3_384")
+
+ @property
+ def sha3_512(self):
+ return self._find_constructor_in("_sha3","sha3_512")
+
+
def find_gil_minsize(modules_names, default=2048):
"""Get the largest GIL_MINSIZE value for the given cryptographic modules.
diff --git a/Lib/test/support/strace_helper.py b/Lib/test/support/strace_helper.py
index 1a9d2b520b7..cf95f7bdc7d 100644
--- a/Lib/test/support/strace_helper.py
+++ b/Lib/test/support/strace_helper.py
@@ -38,7 +38,7 @@ class StraceResult:
This assumes the program under inspection doesn't print any non-utf8
strings which would mix into the strace output."""
- decoded_events = self.event_bytes.decode('utf-8')
+ decoded_events = self.event_bytes.decode('utf-8', 'surrogateescape')
matches = [
_syscall_regex.match(event)
for event in decoded_events.splitlines()
diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py
index 5a6be1180c1..58853ba4eb3 100644
--- a/Lib/test/test_argparse.py
+++ b/Lib/test/test_argparse.py
@@ -5469,11 +5469,60 @@ class TestHelpMetavarTypeFormatter(HelpTestCase):
version = ''
-class TestHelpUsageLongSubparserCommand(TestCase):
- """Test that subparser commands are formatted correctly in help"""
+class TestHelpCustomHelpFormatter(TestCase):
maxDiff = None
- def test_parent_help(self):
+ def test_custom_formatter_function(self):
+ def custom_formatter(prog):
+ return argparse.RawTextHelpFormatter(prog, indent_increment=5)
+
+ parser = argparse.ArgumentParser(
+ prog='PROG',
+ prefix_chars='-+',
+ formatter_class=custom_formatter
+ )
+ parser.add_argument('+f', '++foo', help="foo help")
+ parser.add_argument('spam', help="spam help")
+
+ parser_help = parser.format_help()
+ self.assertEqual(parser_help, textwrap.dedent('''\
+ usage: PROG [-h] [+f FOO] spam
+
+ positional arguments:
+ spam spam help
+
+ options:
+ -h, --help show this help message and exit
+ +f, ++foo FOO foo help
+ '''))
+
+ def test_custom_formatter_class(self):
+ class CustomFormatter(argparse.RawTextHelpFormatter):
+ def __init__(self, prog):
+ super().__init__(prog, indent_increment=5)
+
+ parser = argparse.ArgumentParser(
+ prog='PROG',
+ prefix_chars='-+',
+ formatter_class=CustomFormatter
+ )
+ parser.add_argument('+f', '++foo', help="foo help")
+ parser.add_argument('spam', help="spam help")
+
+ parser_help = parser.format_help()
+ self.assertEqual(parser_help, textwrap.dedent('''\
+ usage: PROG [-h] [+f FOO] spam
+
+ positional arguments:
+ spam spam help
+
+ options:
+ -h, --help show this help message and exit
+ +f, ++foo FOO foo help
+ '''))
+
+ def test_usage_long_subparser_command(self):
+ """Test that subparser commands are formatted correctly in help"""
def custom_formatter(prog):
return argparse.RawTextHelpFormatter(prog, max_help_position=50)
@@ -6973,7 +7022,7 @@ class TestProgName(TestCase):
def check_usage(self, expected, *args, **kwargs):
res = script_helper.assert_python_ok('-Xutf8', *args, '-h', **kwargs)
- self.assertEqual(res.out.splitlines()[0].decode(),
+ self.assertEqual(os.fsdecode(res.out.splitlines()[0]),
f'usage: {expected} [-h]')
def test_script(self, compiled=False):
@@ -7053,6 +7102,7 @@ class TestTranslations(TestTranslationsBase):
class TestColorized(TestCase):
+ maxDiff = None
def setUp(self):
super().setUp()
@@ -7211,6 +7261,79 @@ class TestColorized(TestCase):
),
)
+ def test_custom_formatter_function(self):
+ def custom_formatter(prog):
+ return argparse.RawTextHelpFormatter(prog, indent_increment=5)
+
+ parser = argparse.ArgumentParser(
+ prog="PROG",
+ prefix_chars="-+",
+ formatter_class=custom_formatter,
+ color=True,
+ )
+ parser.add_argument('+f', '++foo', help="foo help")
+ parser.add_argument('spam', help="spam help")
+
+ prog = self.theme.prog
+ heading = self.theme.heading
+ short = self.theme.summary_short_option
+ label = self.theme.summary_label
+ pos = self.theme.summary_action
+ long_b = self.theme.long_option
+ short_b = self.theme.short_option
+ label_b = self.theme.label
+ pos_b = self.theme.action
+ reset = self.theme.reset
+
+ parser_help = parser.format_help()
+ self.assertEqual(parser_help, textwrap.dedent(f'''\
+ {heading}usage: {reset}{prog}PROG{reset} [{short}-h{reset}] [{short}+f {label}FOO{reset}] {pos}spam{reset}
+
+ {heading}positional arguments:{reset}
+ {pos_b}spam{reset} spam help
+
+ {heading}options:{reset}
+ {short_b}-h{reset}, {long_b}--help{reset} show this help message and exit
+ {short_b}+f{reset}, {long_b}++foo{reset} {label_b}FOO{reset} foo help
+ '''))
+
+ def test_custom_formatter_class(self):
+ class CustomFormatter(argparse.RawTextHelpFormatter):
+ def __init__(self, prog):
+ super().__init__(prog, indent_increment=5)
+
+ parser = argparse.ArgumentParser(
+ prog="PROG",
+ prefix_chars="-+",
+ formatter_class=CustomFormatter,
+ color=True,
+ )
+ parser.add_argument('+f', '++foo', help="foo help")
+ parser.add_argument('spam', help="spam help")
+
+ prog = self.theme.prog
+ heading = self.theme.heading
+ short = self.theme.summary_short_option
+ label = self.theme.summary_label
+ pos = self.theme.summary_action
+ long_b = self.theme.long_option
+ short_b = self.theme.short_option
+ label_b = self.theme.label
+ pos_b = self.theme.action
+ reset = self.theme.reset
+
+ parser_help = parser.format_help()
+ self.assertEqual(parser_help, textwrap.dedent(f'''\
+ {heading}usage: {reset}{prog}PROG{reset} [{short}-h{reset}] [{short}+f {label}FOO{reset}] {pos}spam{reset}
+
+ {heading}positional arguments:{reset}
+ {pos_b}spam{reset} spam help
+
+ {heading}options:{reset}
+ {short_b}-h{reset}, {long_b}--help{reset} show this help message and exit
+ {short_b}+f{reset}, {long_b}++foo{reset} {label_b}FOO{reset} foo help
+ '''))
+
def tearDownModule():
# Remove global references to avoid looking like we have refleaks.
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index 8b51522278a..39bef465bdb 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -413,7 +413,7 @@ class BaseFutureTests:
def test_copy_state(self):
from asyncio.futures import _copy_future_state
- f = self._new_future(loop=self.loop)
+ f = concurrent.futures.Future()
f.set_result(10)
newf = self._new_future(loop=self.loop)
@@ -421,7 +421,7 @@ class BaseFutureTests:
self.assertTrue(newf.done())
self.assertEqual(newf.result(), 10)
- f_exception = self._new_future(loop=self.loop)
+ f_exception = concurrent.futures.Future()
f_exception.set_exception(RuntimeError())
newf_exception = self._new_future(loop=self.loop)
@@ -429,7 +429,7 @@ class BaseFutureTests:
self.assertTrue(newf_exception.done())
self.assertRaises(RuntimeError, newf_exception.result)
- f_cancelled = self._new_future(loop=self.loop)
+ f_cancelled = concurrent.futures.Future()
f_cancelled.cancel()
newf_cancelled = self._new_future(loop=self.loop)
@@ -441,7 +441,7 @@ class BaseFutureTests:
except BaseException as e:
f_exc = e
- f_conexc = self._new_future(loop=self.loop)
+ f_conexc = concurrent.futures.Future()
f_conexc.set_exception(f_exc)
newf_conexc = self._new_future(loop=self.loop)
@@ -454,6 +454,56 @@ class BaseFutureTests:
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)
+ def test_copy_state_from_concurrent_futures(self):
+ """Test _copy_future_state from concurrent.futures.Future.
+
+ This tests the optimized path using _get_snapshot when available.
+ """
+ from asyncio.futures import _copy_future_state
+
+ # Test with a result
+ f_concurrent = concurrent.futures.Future()
+ f_concurrent.set_result(42)
+ f_asyncio = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent, f_asyncio)
+ self.assertTrue(f_asyncio.done())
+ self.assertEqual(f_asyncio.result(), 42)
+
+ # Test with an exception
+ f_concurrent_exc = concurrent.futures.Future()
+ f_concurrent_exc.set_exception(ValueError("test exception"))
+ f_asyncio_exc = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_exc, f_asyncio_exc)
+ self.assertTrue(f_asyncio_exc.done())
+ with self.assertRaises(ValueError) as cm:
+ f_asyncio_exc.result()
+ self.assertEqual(str(cm.exception), "test exception")
+
+ # Test with cancelled state
+ f_concurrent_cancelled = concurrent.futures.Future()
+ f_concurrent_cancelled.cancel()
+ f_asyncio_cancelled = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
+ self.assertTrue(f_asyncio_cancelled.cancelled())
+
+ # Test that destination already cancelled prevents copy
+ f_concurrent_result = concurrent.futures.Future()
+ f_concurrent_result.set_result(10)
+ f_asyncio_precancelled = self._new_future(loop=self.loop)
+ f_asyncio_precancelled.cancel()
+ _copy_future_state(f_concurrent_result, f_asyncio_precancelled)
+ self.assertTrue(f_asyncio_precancelled.cancelled())
+
+ # Test exception type conversion
+ f_concurrent_invalid = concurrent.futures.Future()
+ f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
+ f_asyncio_invalid = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
+ self.assertTrue(f_asyncio_invalid.done())
+ with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
+ f_asyncio_invalid.result()
+ self.assertEqual(str(cm.exception), "invalid")
+
def test_iter(self):
fut = self._new_future(loop=self.loop)
diff --git a/Lib/test/test_asyncio/test_tools.py b/Lib/test/test_asyncio/test_tools.py
index 0413e236c27..ba36e759ccd 100644
--- a/Lib/test/test_asyncio/test_tools.py
+++ b/Lib/test/test_asyncio/test_tools.py
@@ -791,21 +791,21 @@ class TestAsyncioToolsBasic(unittest.TestCase):
class TestAsyncioToolsEdgeCases(unittest.TestCase):
def test_task_awaits_self(self):
- """A task directly awaits itself – should raise a cycle."""
+ """A task directly awaits itself - should raise a cycle."""
input_ = [(1, [(1, "Self-Awaiter", [[["loopback"], 1]])])]
with self.assertRaises(tools.CycleFoundException) as ctx:
tools.build_async_tree(input_)
self.assertIn([1, 1], ctx.exception.cycles)
def test_task_with_missing_awaiter_id(self):
- """Awaiter ID not in task list – should not crash, just show 'Unknown'."""
+ """Awaiter ID not in task list - should not crash, just show 'Unknown'."""
input_ = [(1, [(1, "Task-A", [[["coro"], 999]])])] # 999 not defined
table = tools.build_task_table(input_)
self.assertEqual(len(table), 1)
self.assertEqual(table[0][4], "Unknown")
def test_duplicate_coroutine_frames(self):
- """Same coroutine frame repeated under a parent – should deduplicate."""
+ """Same coroutine frame repeated under a parent - should deduplicate."""
input_ = [
(
1,
@@ -829,7 +829,7 @@ class TestAsyncioToolsEdgeCases(unittest.TestCase):
self.assertIn("Task-1", flat)
def test_task_with_no_name(self):
- """Task with no name in id2name – should still render with fallback."""
+ """Task with no name in id2name - should still render with fallback."""
input_ = [(1, [(1, "root", [[["f1"], 2]]), (2, None, [])])]
# If name is None, fallback to string should not crash
tree = tools.build_async_tree(input_)
diff --git a/Lib/test/test_capi/test_opt.py b/Lib/test/test_capi/test_opt.py
index 651148336f7..f7a4200e1ee 100644
--- a/Lib/test/test_capi/test_opt.py
+++ b/Lib/test/test_capi/test_opt.py
@@ -1955,9 +1955,143 @@ class TestUopsOptimization(unittest.TestCase):
self.assertEqual(res, TIER2_THRESHOLD)
self.assertIsNotNone(ex)
uops = get_opnames(ex)
- self.assertIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_CALL_ISINSTANCE", uops)
self.assertNotIn("_GUARD_THIRD_NULL", uops)
self.assertNotIn("_GUARD_CALLABLE_ISINSTANCE", uops)
+ self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops)
+
+ def test_call_list_append(self):
+ def testfunc(n):
+ a = []
+ for i in range(n):
+ a.append(i)
+ return sum(a)
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, sum(range(TIER2_THRESHOLD)))
+ uops = get_opnames(ex)
+ self.assertIn("_CALL_LIST_APPEND", uops)
+ # We should remove these in the future
+ self.assertIn("_GUARD_NOS_LIST", uops)
+ self.assertIn("_GUARD_CALLABLE_LIST_APPEND", uops)
+
+ def test_call_isinstance_is_true(self):
+ def testfunc(n):
+ x = 0
+ for _ in range(n):
+ y = isinstance(42, int)
+ if y:
+ x += 1
+ return x
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD)
+ self.assertIsNotNone(ex)
+ uops = get_opnames(ex)
+ self.assertNotIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_TO_BOOL_BOOL", uops)
+ self.assertNotIn("_GUARD_IS_TRUE_POP", uops)
+ self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops)
+
+ def test_call_isinstance_is_false(self):
+ def testfunc(n):
+ x = 0
+ for _ in range(n):
+ y = isinstance(42, str)
+ if not y:
+ x += 1
+ return x
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD)
+ self.assertIsNotNone(ex)
+ uops = get_opnames(ex)
+ self.assertNotIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_TO_BOOL_BOOL", uops)
+ self.assertNotIn("_GUARD_IS_FALSE_POP", uops)
+ self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops)
+
+ def test_call_isinstance_subclass(self):
+ def testfunc(n):
+ x = 0
+ for _ in range(n):
+ y = isinstance(True, int)
+ if y:
+ x += 1
+ return x
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD)
+ self.assertIsNotNone(ex)
+ uops = get_opnames(ex)
+ self.assertNotIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_TO_BOOL_BOOL", uops)
+ self.assertNotIn("_GUARD_IS_TRUE_POP", uops)
+ self.assertIn("_POP_CALL_TWO_LOAD_CONST_INLINE_BORROW", uops)
+
+ def test_call_isinstance_unknown_object(self):
+ def testfunc(n):
+ x = 0
+ for _ in range(n):
+ # The optimizer doesn't know the return type here:
+ bar = eval("42")
+ # This will only narrow to bool:
+ y = isinstance(bar, int)
+ if y:
+ x += 1
+ return x
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD)
+ self.assertIsNotNone(ex)
+ uops = get_opnames(ex)
+ self.assertIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_TO_BOOL_BOOL", uops)
+ self.assertIn("_GUARD_IS_TRUE_POP", uops)
+
+ def test_call_isinstance_tuple_of_classes(self):
+ def testfunc(n):
+ x = 0
+ for _ in range(n):
+ # A tuple of classes is currently not optimized,
+ # so this is only narrowed to bool:
+ y = isinstance(42, (int, str))
+ if y:
+ x += 1
+ return x
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD)
+ self.assertIsNotNone(ex)
+ uops = get_opnames(ex)
+ self.assertIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_TO_BOOL_BOOL", uops)
+ self.assertIn("_GUARD_IS_TRUE_POP", uops)
+
+ def test_call_isinstance_metaclass(self):
+ class EvenNumberMeta(type):
+ def __instancecheck__(self, number):
+ return number % 2 == 0
+
+ class EvenNumber(metaclass=EvenNumberMeta):
+ pass
+
+ def testfunc(n):
+ x = 0
+ for _ in range(n):
+ # Only narrowed to bool
+ y = isinstance(42, EvenNumber)
+ if y:
+ x += 1
+ return x
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD)
+ self.assertIsNotNone(ex)
+ uops = get_opnames(ex)
+ self.assertIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_TO_BOOL_BOOL", uops)
+ self.assertIn("_GUARD_IS_TRUE_POP", uops)
def global_identity(x):
diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py
index 86e5e5c1474..a767f67a02c 100644
--- a/Lib/test/test_codeccallbacks.py
+++ b/Lib/test/test_codeccallbacks.py
@@ -2,6 +2,7 @@ from _codecs import _unregister_error as _codecs_unregister_error
import codecs
import html.entities
import itertools
+import re
import sys
import unicodedata
import unittest
@@ -1125,7 +1126,7 @@ class CodecCallbackTest(unittest.TestCase):
text = 'abc<def>ghi'*n
text.translate(charmap)
- def test_mutatingdecodehandler(self):
+ def test_mutating_decode_handler(self):
baddata = [
("ascii", b"\xff"),
("utf-7", b"++"),
@@ -1160,6 +1161,42 @@ class CodecCallbackTest(unittest.TestCase):
for (encoding, data) in baddata:
self.assertEqual(data.decode(encoding, "test.mutating"), "\u4242")
+ def test_mutating_decode_handler_unicode_escape(self):
+ decode = codecs.unicode_escape_decode
+ def mutating(exc):
+ if isinstance(exc, UnicodeDecodeError):
+ r = data.get(exc.object[:exc.end])
+ if r is not None:
+ exc.object = r[0] + exc.object[exc.end:]
+ return ('\u0404', r[1])
+ raise AssertionError("don't know how to handle %r" % exc)
+
+ codecs.register_error('test.mutating2', mutating)
+ data = {
+ br'\x0': (b'\\', 0),
+ br'\x3': (b'xxx\\', 3),
+ br'\x5': (b'x\\', 1),
+ }
+ def check(input, expected, msg):
+ with self.assertWarns(DeprecationWarning) as cm:
+ self.assertEqual(decode(input, 'test.mutating2'), (expected, len(input)))
+ self.assertIn(msg, str(cm.warning))
+
+ check(br'\x0n\z', '\u0404\n\\z', r'"\z" is an invalid escape sequence')
+ check(br'\x0n\501', '\u0404\n\u0141', r'"\501" is an invalid octal escape sequence')
+ check(br'\x0z', '\u0404\\z', r'"\z" is an invalid escape sequence')
+
+ check(br'\x3n\zr', '\u0404\n\\zr', r'"\z" is an invalid escape sequence')
+ check(br'\x3zr', '\u0404\\zr', r'"\z" is an invalid escape sequence')
+ check(br'\x3z5', '\u0404\\z5', r'"\z" is an invalid escape sequence')
+ check(memoryview(br'\x3z5x')[:-1], '\u0404\\z5', r'"\z" is an invalid escape sequence')
+ check(memoryview(br'\x3z5xy')[:-2], '\u0404\\z5', r'"\z" is an invalid escape sequence')
+
+ check(br'\x5n\z', '\u0404\n\\z', r'"\z" is an invalid escape sequence')
+ check(br'\x5n\501', '\u0404\n\u0141', r'"\501" is an invalid octal escape sequence')
+ check(br'\x5z', '\u0404\\z', r'"\z" is an invalid escape sequence')
+ check(memoryview(br'\x5zy')[:-1], '\u0404\\z', r'"\z" is an invalid escape sequence')
+
# issue32583
def test_crashing_decode_handler(self):
# better generating one more character to fill the extra space slot
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index 94fcf98e757..d42270da15e 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -1196,23 +1196,39 @@ class EscapeDecodeTest(unittest.TestCase):
check(br"[\1010]", b"[A0]")
check(br"[\x41]", b"[A]")
check(br"[\x410]", b"[A0]")
+
+ def test_warnings(self):
+ decode = codecs.escape_decode
+ check = coding_checker(self, decode)
for i in range(97, 123):
b = bytes([i])
if b not in b'abfnrtvx':
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\%c" is an invalid escape sequence' % i):
check(b"\\" + b, b"\\" + b)
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\%c" is an invalid escape sequence' % (i-32)):
check(b"\\" + b.upper(), b"\\" + b.upper())
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\8" is an invalid escape sequence'):
check(br"\8", b"\\8")
with self.assertWarns(DeprecationWarning):
check(br"\9", b"\\9")
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\\xfa" is an invalid escape sequence') as cm:
check(b"\\\xfa", b"\\\xfa")
for i in range(0o400, 0o1000):
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\%o" is an invalid octal escape sequence' % i):
check(rb'\%o' % i, bytes([i & 0o377]))
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\z" is an invalid escape sequence'):
+ self.assertEqual(decode(br'\x\z', 'ignore'), (b'\\z', 4))
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\501" is an invalid octal escape sequence'):
+ self.assertEqual(decode(br'\x\501', 'ignore'), (b'A', 6))
+
def test_errors(self):
decode = codecs.escape_decode
self.assertRaises(ValueError, decode, br"\x")
@@ -2661,24 +2677,40 @@ class UnicodeEscapeTest(ReadTest, unittest.TestCase):
check(br"[\x410]", "[A0]")
check(br"\u20ac", "\u20ac")
check(br"\U0001d120", "\U0001d120")
+
+ def test_decode_warnings(self):
+ decode = codecs.unicode_escape_decode
+ check = coding_checker(self, decode)
for i in range(97, 123):
b = bytes([i])
if b not in b'abfnrtuvx':
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\%c" is an invalid escape sequence' % i):
check(b"\\" + b, "\\" + chr(i))
if b.upper() not in b'UN':
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\%c" is an invalid escape sequence' % (i-32)):
check(b"\\" + b.upper(), "\\" + chr(i-32))
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\8" is an invalid escape sequence'):
check(br"\8", "\\8")
with self.assertWarns(DeprecationWarning):
check(br"\9", "\\9")
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\\xfa" is an invalid escape sequence') as cm:
check(b"\\\xfa", "\\\xfa")
for i in range(0o400, 0o1000):
- with self.assertWarns(DeprecationWarning):
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\%o" is an invalid octal escape sequence' % i):
check(rb'\%o' % i, chr(i))
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\z" is an invalid escape sequence'):
+ self.assertEqual(decode(br'\x\z', 'ignore'), ('\\z', 4))
+ with self.assertWarnsRegex(DeprecationWarning,
+ r'"\\501" is an invalid octal escape sequence'):
+ self.assertEqual(decode(br'\x\501', 'ignore'), ('\u0141', 6))
+
def test_decode_errors(self):
decode = codecs.unicode_escape_decode
for c, d in (b'x', 2), (b'u', 4), (b'U', 4):
diff --git a/Lib/test/test_concurrent_futures/test_future.py b/Lib/test/test_concurrent_futures/test_future.py
index 4066ea1ee4b..06b11a3bacf 100644
--- a/Lib/test/test_concurrent_futures/test_future.py
+++ b/Lib/test/test_concurrent_futures/test_future.py
@@ -6,6 +6,7 @@ from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
from test import support
+from test.support import threading_helper
from .util import (
PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
@@ -282,6 +283,62 @@ class FutureTests(BaseTestCase):
self.assertEqual(f.exception(), e)
+ def test_get_snapshot(self):
+ """Test the _get_snapshot method for atomic state retrieval."""
+ # Test with a pending future
+ f = Future()
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertFalse(done)
+ self.assertFalse(cancelled)
+ self.assertIsNone(result)
+ self.assertIsNone(exception)
+
+ # Test with a finished future (successful result)
+ f = Future()
+ f.set_result(42)
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertTrue(done)
+ self.assertFalse(cancelled)
+ self.assertEqual(result, 42)
+ self.assertIsNone(exception)
+
+ # Test with a finished future (exception)
+ f = Future()
+ exc = ValueError("test error")
+ f.set_exception(exc)
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertTrue(done)
+ self.assertFalse(cancelled)
+ self.assertIsNone(result)
+ self.assertIs(exception, exc)
+
+ # Test with a cancelled future
+ f = Future()
+ f.cancel()
+ done, cancelled, result, exception = f._get_snapshot()
+ self.assertTrue(done)
+ self.assertTrue(cancelled)
+ self.assertIsNone(result)
+ self.assertIsNone(exception)
+
+ # Test concurrent access (basic thread safety check)
+ f = Future()
+ f.set_result(100)
+ results = []
+
+ def get_snapshot():
+ for _ in range(1000):
+ snapshot = f._get_snapshot()
+ results.append(snapshot)
+
+ threads = [threading.Thread(target=get_snapshot) for _ in range(4)]
+ with threading_helper.start_threads(threads):
+ pass
+ # All snapshots should be identical for a finished future
+ expected = (True, False, 100, None)
+ for result in results:
+ self.assertEqual(result, expected)
+
def setUpModule():
setup_module()
diff --git a/Lib/test/test_crossinterp.py b/Lib/test/test_crossinterp.py
index b366a29645e..cddacbc9970 100644
--- a/Lib/test/test_crossinterp.py
+++ b/Lib/test/test_crossinterp.py
@@ -758,6 +758,40 @@ class CodeTests(_GetXIDataTests):
])
+class ShareableFuncTests(_GetXIDataTests):
+
+ MODE = 'func'
+
+ def test_stateless(self):
+ self.assert_roundtrip_not_equal([
+ *defs.STATELESS_FUNCTIONS,
+ # Generators can be stateless too.
+ *defs.FUNCTION_LIKE,
+ ])
+
+ def test_not_stateless(self):
+ self.assert_not_shareable([
+ *(f for f in defs.FUNCTIONS
+ if f not in defs.STATELESS_FUNCTIONS),
+ ])
+
+ def test_other_objects(self):
+ self.assert_not_shareable([
+ None,
+ True,
+ False,
+ Ellipsis,
+ NotImplemented,
+ 9999,
+ 'spam',
+ b'spam',
+ (),
+ [],
+ {},
+ object(),
+ ])
+
+
class PureShareableScriptTests(_GetXIDataTests):
MODE = 'script-pure'
diff --git a/Lib/test/test_curses.py b/Lib/test/test_curses.py
index c307258e565..feca82681de 100644
--- a/Lib/test/test_curses.py
+++ b/Lib/test/test_curses.py
@@ -130,6 +130,9 @@ class TestCurses(unittest.TestCase):
curses.use_env(False)
curses.use_env(True)
+ def test_error(self):
+ self.assertIsSubclass(curses.error, Exception)
+
def test_create_windows(self):
win = curses.newwin(5, 10)
self.assertEqual(win.getbegyx(), (0, 0))
diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py
index 371c63adce9..2fb963f52e5 100644
--- a/Lib/test/test_faulthandler.py
+++ b/Lib/test/test_faulthandler.py
@@ -166,29 +166,6 @@ class FaultHandlerTests(unittest.TestCase):
fatal_error = 'Windows fatal exception: %s' % name_regex
self.check_error(code, line_number, fatal_error, **kw)
- @unittest.skipIf(sys.platform.startswith('aix'),
- "the first page of memory is a mapped read-only on AIX")
- def test_read_null(self):
- if not MS_WINDOWS:
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._read_null()
- """,
- 3,
- # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
- '(?:Segmentation fault'
- '|Bus error'
- '|Illegal instruction)')
- else:
- self.check_windows_exception("""
- import faulthandler
- faulthandler.enable()
- faulthandler._read_null()
- """,
- 3,
- 'access violation')
-
@skip_segfault_on_android
def test_sigsegv(self):
self.check_fatal_error("""
diff --git a/Lib/test/test_fcntl.py b/Lib/test/test_fcntl.py
index b84c98ef3a2..e0e6782258f 100644
--- a/Lib/test/test_fcntl.py
+++ b/Lib/test/test_fcntl.py
@@ -228,6 +228,52 @@ class TestFcntl(unittest.TestCase):
os.close(test_pipe_r)
os.close(test_pipe_w)
+ def _check_fcntl_not_mutate_len(self, nbytes=None):
+ self.f = open(TESTFN, 'wb')
+ buf = struct.pack('ii', fcntl.F_OWNER_PID, os.getpid())
+ if nbytes is not None:
+ buf += b' ' * (nbytes - len(buf))
+ else:
+ nbytes = len(buf)
+ save_buf = bytes(buf)
+ r = fcntl.fcntl(self.f, fcntl.F_SETOWN_EX, buf)
+ self.assertIsInstance(r, bytes)
+ self.assertEqual(len(r), len(save_buf))
+ self.assertEqual(buf, save_buf)
+ type, pid = memoryview(r).cast('i')[:2]
+ self.assertEqual(type, fcntl.F_OWNER_PID)
+ self.assertEqual(pid, os.getpid())
+
+ buf = b' ' * nbytes
+ r = fcntl.fcntl(self.f, fcntl.F_GETOWN_EX, buf)
+ self.assertIsInstance(r, bytes)
+ self.assertEqual(len(r), len(save_buf))
+ self.assertEqual(buf, b' ' * nbytes)
+ type, pid = memoryview(r).cast('i')[:2]
+ self.assertEqual(type, fcntl.F_OWNER_PID)
+ self.assertEqual(pid, os.getpid())
+
+ buf = memoryview(b' ' * nbytes)
+ r = fcntl.fcntl(self.f, fcntl.F_GETOWN_EX, buf)
+ self.assertIsInstance(r, bytes)
+ self.assertEqual(len(r), len(save_buf))
+ self.assertEqual(bytes(buf), b' ' * nbytes)
+ type, pid = memoryview(r).cast('i')[:2]
+ self.assertEqual(type, fcntl.F_OWNER_PID)
+ self.assertEqual(pid, os.getpid())
+
+ @unittest.skipUnless(
+ hasattr(fcntl, "F_SETOWN_EX") and hasattr(fcntl, "F_GETOWN_EX"),
+ "requires F_SETOWN_EX and F_GETOWN_EX")
+ def test_fcntl_small_buffer(self):
+ self._check_fcntl_not_mutate_len()
+
+ @unittest.skipUnless(
+ hasattr(fcntl, "F_SETOWN_EX") and hasattr(fcntl, "F_GETOWN_EX"),
+ "requires F_SETOWN_EX and F_GETOWN_EX")
+ def test_fcntl_large_buffer(self):
+ self._check_fcntl_not_mutate_len(2024)
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py
index 84faa636064..96b3f305194 100644
--- a/Lib/test/test_fractions.py
+++ b/Lib/test/test_fractions.py
@@ -1,7 +1,7 @@
"""Tests for Lib/fractions.py."""
from decimal import Decimal
-from test.support import requires_IEEE_754
+from test.support import requires_IEEE_754, adjust_int_max_str_digits
import math
import numbers
import operator
@@ -395,12 +395,14 @@ class FractionTest(unittest.TestCase):
def testFromString(self):
self.assertEqual((5, 1), _components(F("5")))
+ self.assertEqual((5, 1), _components(F("005")))
self.assertEqual((3, 2), _components(F("3/2")))
self.assertEqual((3, 2), _components(F("3 / 2")))
self.assertEqual((3, 2), _components(F(" \n +3/2")))
self.assertEqual((-3, 2), _components(F("-3/2 ")))
- self.assertEqual((13, 2), _components(F(" 013/02 \n ")))
+ self.assertEqual((13, 2), _components(F(" 0013/002 \n ")))
self.assertEqual((16, 5), _components(F(" 3.2 ")))
+ self.assertEqual((16, 5), _components(F("003.2")))
self.assertEqual((-16, 5), _components(F(" -3.2 ")))
self.assertEqual((-3, 1), _components(F(" -3. ")))
self.assertEqual((3, 5), _components(F(" .6 ")))
@@ -419,116 +421,102 @@ class FractionTest(unittest.TestCase):
self.assertRaisesMessage(
ZeroDivisionError, "Fraction(3, 0)",
F, "3/0")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '3/'",
- F, "3/")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '/2'",
- F, "/2")
- self.assertRaisesMessage(
- # Denominators don't need a sign.
- ValueError, "Invalid literal for Fraction: '3/+2'",
- F, "3/+2")
- self.assertRaisesMessage(
- # Imitate float's parsing.
- ValueError, "Invalid literal for Fraction: '+ 3/2'",
- F, "+ 3/2")
- self.assertRaisesMessage(
- # Avoid treating '.' as a regex special character.
- ValueError, "Invalid literal for Fraction: '3a2'",
- F, "3a2")
- self.assertRaisesMessage(
- # Don't accept combinations of decimals and rationals.
- ValueError, "Invalid literal for Fraction: '3/7.2'",
- F, "3/7.2")
- self.assertRaisesMessage(
- # Don't accept combinations of decimals and rationals.
- ValueError, "Invalid literal for Fraction: '3.2/7'",
- F, "3.2/7")
- self.assertRaisesMessage(
- # Allow 3. and .3, but not .
- ValueError, "Invalid literal for Fraction: '.'",
- F, ".")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '_'",
- F, "_")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '_1'",
- F, "_1")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1__2'",
- F, "1__2")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '/_'",
- F, "/_")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1_/'",
- F, "1_/")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '_1/'",
- F, "_1/")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1__2/'",
- F, "1__2/")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1/_'",
- F, "1/_")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1/_1'",
- F, "1/_1")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1/1__2'",
- F, "1/1__2")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1._111'",
- F, "1._111")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1.1__1'",
- F, "1.1__1")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1.1e+_1'",
- F, "1.1e+_1")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1.1e+1__1'",
- F, "1.1e+1__1")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '123.dd'",
- F, "123.dd")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '123.5_dd'",
- F, "123.5_dd")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: 'dd.5'",
- F, "dd.5")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '7_dd'",
- F, "7_dd")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1/dd'",
- F, "1/dd")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1/123_dd'",
- F, "1/123_dd")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '789edd'",
- F, "789edd")
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '789e2_dd'",
- F, "789e2_dd")
+
+ def check_invalid(s):
+ msg = "Invalid literal for Fraction: " + repr(s)
+ self.assertRaisesMessage(ValueError, msg, F, s)
+
+ check_invalid("3/")
+ check_invalid("/2")
+ # Denominators don't need a sign.
+ check_invalid("3/+2")
+ check_invalid("3/-2")
+ # Imitate float's parsing.
+ check_invalid("+ 3/2")
+ check_invalid("- 3/2")
+ # Avoid treating '.' as a regex special character.
+ check_invalid("3a2")
+ # Don't accept combinations of decimals and rationals.
+ check_invalid("3/7.2")
+ check_invalid("3.2/7")
+ # No space around dot.
+ check_invalid("3 .2")
+ check_invalid("3. 2")
+ # No space around e.
+ check_invalid("3.2 e1")
+ check_invalid("3.2e 1")
+ # Fractional part don't need a sign.
+ check_invalid("3.+2")
+ check_invalid("3.-2")
+ # Only accept base 10.
+ check_invalid("0x10")
+ check_invalid("0x10/1")
+ check_invalid("1/0x10")
+ check_invalid("0x10.")
+ check_invalid("0x10.1")
+ check_invalid("1.0x10")
+ check_invalid("1.0e0x10")
+ # Only accept decimal digits.
+ check_invalid("³")
+ check_invalid("³/2")
+ check_invalid("3/²")
+ check_invalid("³.2")
+ check_invalid("3.²")
+ check_invalid("3.2e²")
+ check_invalid("¼")
+ # Allow 3. and .3, but not .
+ check_invalid(".")
+ check_invalid("_")
+ check_invalid("_1")
+ check_invalid("1__2")
+ check_invalid("/_")
+ check_invalid("1_/")
+ check_invalid("_1/")
+ check_invalid("1__2/")
+ check_invalid("1/_")
+ check_invalid("1/_1")
+ check_invalid("1/1__2")
+ check_invalid("1._111")
+ check_invalid("1.1__1")
+ check_invalid("1.1e+_1")
+ check_invalid("1.1e+1__1")
+ check_invalid("123.dd")
+ check_invalid("123.5_dd")
+ check_invalid("dd.5")
+ check_invalid("7_dd")
+ check_invalid("1/dd")
+ check_invalid("1/123_dd")
+ check_invalid("789edd")
+ check_invalid("789e2_dd")
# Test catastrophic backtracking.
val = "9"*50 + "_"
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '" + val + "'",
- F, val)
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1/" + val + "'",
- F, "1/" + val)
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1." + val + "'",
- F, "1." + val)
- self.assertRaisesMessage(
- ValueError, "Invalid literal for Fraction: '1.1+e" + val + "'",
- F, "1.1+e" + val)
+ check_invalid(val)
+ check_invalid("1/" + val)
+ check_invalid("1." + val)
+ check_invalid("." + val)
+ check_invalid("1.1+e" + val)
+ check_invalid("1.1e" + val)
+
+ def test_limit_int(self):
+ maxdigits = 5000
+ with adjust_int_max_str_digits(maxdigits):
+ msg = 'Exceeds the limit'
+ val = '1' * maxdigits
+ num = (10**maxdigits - 1)//9
+ self.assertEqual((num, 1), _components(F(val)))
+ self.assertRaisesRegex(ValueError, msg, F, val + '1')
+ self.assertEqual((num, 2), _components(F(val + '/2')))
+ self.assertRaisesRegex(ValueError, msg, F, val + '1/2')
+ self.assertEqual((1, num), _components(F('1/' + val)))
+ self.assertRaisesRegex(ValueError, msg, F, '1/1' + val)
+ self.assertEqual(((10**(maxdigits+1) - 1)//9, 10**maxdigits),
+ _components(F('1.' + val)))
+ self.assertRaisesRegex(ValueError, msg, F, '1.1' + val)
+ self.assertEqual((num, 10**maxdigits), _components(F('.' + val)))
+ self.assertRaisesRegex(ValueError, msg, F, '.1' + val)
+ self.assertRaisesRegex(ValueError, msg, F, '1.1e1' + val)
+ self.assertEqual((11, 10), _components(F('1.1e' + '0' * maxdigits)))
+ self.assertRaisesRegex(ValueError, msg, F, '1.1e' + '0' * (maxdigits+1))
def testImmutable(self):
r = F(7, 3)
diff --git a/Lib/test/test_free_threading/test_functools.py b/Lib/test/test_free_threading/test_functools.py
new file mode 100644
index 00000000000..a442fe056ce
--- /dev/null
+++ b/Lib/test/test_free_threading/test_functools.py
@@ -0,0 +1,75 @@
+import random
+import unittest
+
+from functools import lru_cache
+from threading import Barrier, Thread
+
+from test.support import threading_helper
+
+@threading_helper.requires_working_threading()
+class TestLRUCache(unittest.TestCase):
+
+ def _test_concurrent_operations(self, maxsize):
+ num_threads = 10
+ b = Barrier(num_threads)
+ @lru_cache(maxsize=maxsize)
+ def func(arg=0):
+ return object()
+
+
+ def thread_func():
+ b.wait()
+ for i in range(1000):
+ r = random.randint(0, 1000)
+ if i < 800:
+ func(i)
+ elif i < 900:
+ func.cache_info()
+ else:
+ func.cache_clear()
+
+ threads = []
+ for i in range(num_threads):
+ t = Thread(target=thread_func)
+ threads.append(t)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ def test_concurrent_operations_unbounded(self):
+ self._test_concurrent_operations(maxsize=None)
+
+ def test_concurrent_operations_bounded(self):
+ self._test_concurrent_operations(maxsize=128)
+
+ def _test_reentrant_cache_clear(self, maxsize):
+ num_threads = 10
+ b = Barrier(num_threads)
+ @lru_cache(maxsize=maxsize)
+ def func(arg=0):
+ func.cache_clear()
+ return object()
+
+
+ def thread_func():
+ b.wait()
+ for i in range(1000):
+ func(random.randint(0, 10000))
+
+ threads = []
+ for i in range(num_threads):
+ t = Thread(target=thread_func)
+ threads.append(t)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ def test_reentrant_cache_clear_unbounded(self):
+ self._test_reentrant_cache_clear(maxsize=None)
+
+ def test_reentrant_cache_clear_bounded(self):
+ self._test_reentrant_cache_clear(maxsize=128)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_genericalias.py b/Lib/test/test_genericalias.py
index 8d21ded4501..ea0dc241e39 100644
--- a/Lib/test/test_genericalias.py
+++ b/Lib/test/test_genericalias.py
@@ -61,6 +61,7 @@ try:
from tkinter import Event
except ImportError:
Event = None
+from string.templatelib import Template, Interpolation
from typing import TypeVar
T = TypeVar('T')
@@ -139,7 +140,10 @@ class BaseTest(unittest.TestCase):
DictReader, DictWriter,
array,
staticmethod,
- classmethod]
+ classmethod,
+ Template,
+ Interpolation,
+ ]
if ctypes is not None:
generic_types.extend((ctypes.Array, ctypes.LibraryLoader, ctypes.py_object))
if ValueProxy is not None:
diff --git a/Lib/test/test_hmac.py b/Lib/test/test_hmac.py
index 70c79437722..e898644dd8a 100644
--- a/Lib/test/test_hmac.py
+++ b/Lib/test/test_hmac.py
@@ -1,8 +1,27 @@
+"""Test suite for HMAC.
+
+Python provides three different implementations of HMAC:
+
+- OpenSSL HMAC using OpenSSL hash functions.
+- HACL* HMAC using HACL* hash functions.
+- Generic Python HMAC using user-defined hash functions.
+
+The generic Python HMAC implementation is able to use OpenSSL
+callables or names, HACL* named hash functions or arbitrary
+objects implementing PEP 247 interface.
+
+In the two first cases, Python HMAC wraps a C HMAC object (either OpenSSL
+or HACL*-based). As a last resort, HMAC is re-implemented in pure Python.
+It is however interesting to test the pure Python implementation against
+the OpenSSL and HACL* hash functions.
+"""
+
import binascii
import functools
import hmac
import hashlib
import random
+import test.support
import test.support.hashlib_helper as hashlib_helper
import types
import unittest
@@ -10,6 +29,12 @@ import unittest.mock as mock
import warnings
from _operator import _compare_digest as operator_compare_digest
from test.support import check_disallow_instantiation
+from test.support.hashlib_helper import (
+ BuiltinHashFunctionsTrait,
+ HashFunctionsTrait,
+ NamedHashFunctionsTrait,
+ OpenSSLHashFunctionsTrait,
+)
from test.support.import_helper import import_fresh_module, import_module
try:
@@ -382,50 +407,7 @@ class BuiltinAssertersMixin(ThroughBuiltinAPIMixin, AssertersMixin):
pass
-class HashFunctionsTrait:
- """Trait class for 'hashfunc' in hmac_new() and hmac_digest()."""
-
- ALGORITHMS = [
- 'md5', 'sha1',
- 'sha224', 'sha256', 'sha384', 'sha512',
- 'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512',
- ]
-
- # By default, a missing algorithm skips the test that uses it.
- _ = property(lambda self: self.skipTest("missing hash function"))
- md5 = sha1 = _
- sha224 = sha256 = sha384 = sha512 = _
- sha3_224 = sha3_256 = sha3_384 = sha3_512 = _
- del _
-
-
-class WithOpenSSLHashFunctions(HashFunctionsTrait):
- """Test a HMAC implementation with an OpenSSL-based callable 'hashfunc'."""
-
- @classmethod
- def setUpClass(cls):
- super().setUpClass()
-
- for name in cls.ALGORITHMS:
- @property
- @hashlib_helper.requires_openssl_hashdigest(name)
- def func(self, *, __name=name): # __name needed to bind 'name'
- return getattr(_hashlib, f'openssl_{__name}')
- setattr(cls, name, func)
-
-
-class WithNamedHashFunctions(HashFunctionsTrait):
- """Test a HMAC implementation with a named 'hashfunc'."""
-
- @classmethod
- def setUpClass(cls):
- super().setUpClass()
-
- for name in cls.ALGORITHMS:
- setattr(cls, name, name)
-
-
-class RFCTestCaseMixin(AssertersMixin, HashFunctionsTrait):
+class RFCTestCaseMixin(HashFunctionsTrait, AssertersMixin):
"""Test HMAC implementations against RFC 2202/4231 and NIST test vectors.
- Test vectors for MD5 and SHA-1 are taken from RFC 2202.
@@ -739,26 +721,83 @@ class RFCTestCaseMixin(AssertersMixin, HashFunctionsTrait):
)
-class PyRFCTestCase(ThroughObjectMixin, PyAssertersMixin,
- WithOpenSSLHashFunctions, RFCTestCaseMixin,
- unittest.TestCase):
+class PurePythonInitHMAC(PyModuleMixin, HashFunctionsTrait):
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ for meth in ['_init_openssl_hmac', '_init_builtin_hmac']:
+ fn = getattr(cls.hmac.HMAC, meth)
+ cm = mock.patch.object(cls.hmac.HMAC, meth, autospec=True, wraps=fn)
+ cls.enterClassContext(cm)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.hmac.HMAC._init_openssl_hmac.assert_not_called()
+ cls.hmac.HMAC._init_builtin_hmac.assert_not_called()
+ # Do not assert that HMAC._init_old() has been called as it's tricky
+ # to determine whether a test for a specific hash function has been
+ # executed or not. On regular builds, it will be called but if a
+ # hash function is not available, it's hard to detect for which
+ # test we should checj HMAC._init_old() or not.
+ super().tearDownClass()
+
+
+class PyRFCOpenSSLTestCase(ThroughObjectMixin,
+ PyAssertersMixin,
+ OpenSSLHashFunctionsTrait,
+ RFCTestCaseMixin,
+ PurePythonInitHMAC,
+ unittest.TestCase):
"""Python implementation of HMAC using hmac.HMAC().
- The underlying hash functions are OpenSSL-based.
+ The underlying hash functions are OpenSSL-based but
+ _init_old() is used instead of _init_openssl_hmac().
"""
-class PyDotNewRFCTestCase(ThroughModuleAPIMixin, PyAssertersMixin,
- WithOpenSSLHashFunctions, RFCTestCaseMixin,
- unittest.TestCase):
+class PyRFCBuiltinTestCase(ThroughObjectMixin,
+ PyAssertersMixin,
+ BuiltinHashFunctionsTrait,
+ RFCTestCaseMixin,
+ PurePythonInitHMAC,
+ unittest.TestCase):
+ """Python implementation of HMAC using hmac.HMAC().
+
+ The underlying hash functions are HACL*-based but
+ _init_old() is used instead of _init_builtin_hmac().
+ """
+
+
+class PyDotNewOpenSSLRFCTestCase(ThroughModuleAPIMixin,
+ PyAssertersMixin,
+ OpenSSLHashFunctionsTrait,
+ RFCTestCaseMixin,
+ PurePythonInitHMAC,
+ unittest.TestCase):
+ """Python implementation of HMAC using hmac.new().
+
+ The underlying hash functions are OpenSSL-based but
+ _init_old() is used instead of _init_openssl_hmac().
+ """
+
+
+class PyDotNewBuiltinRFCTestCase(ThroughModuleAPIMixin,
+ PyAssertersMixin,
+ BuiltinHashFunctionsTrait,
+ RFCTestCaseMixin,
+ PurePythonInitHMAC,
+ unittest.TestCase):
"""Python implementation of HMAC using hmac.new().
- The underlying hash functions are OpenSSL-based.
+ The underlying hash functions are HACL-based but
+ _init_old() is used instead of _init_openssl_hmac().
"""
class OpenSSLRFCTestCase(OpenSSLAssertersMixin,
- WithOpenSSLHashFunctions, RFCTestCaseMixin,
+ OpenSSLHashFunctionsTrait,
+ RFCTestCaseMixin,
unittest.TestCase):
"""OpenSSL implementation of HMAC.
@@ -767,7 +806,8 @@ class OpenSSLRFCTestCase(OpenSSLAssertersMixin,
class BuiltinRFCTestCase(BuiltinAssertersMixin,
- WithNamedHashFunctions, RFCTestCaseMixin,
+ NamedHashFunctionsTrait,
+ RFCTestCaseMixin,
unittest.TestCase):
"""Built-in HACL* implementation of HMAC.
@@ -784,12 +824,6 @@ class BuiltinRFCTestCase(BuiltinAssertersMixin,
self.check_hmac_hexdigest(key, msg, hexdigest, digest_size, func)
-# TODO(picnixz): once we have a HACL* HMAC, we should also test the Python
-# implementation of HMAC with a HACL*-based hash function. For now, we only
-# test it partially via the '_sha2' module, but for completeness we could
-# also test the RFC test vectors against all possible implementations.
-
-
class DigestModTestCaseMixin(CreatorMixin, DigestMixin):
"""Tests for the 'digestmod' parameter for hmac_new() and hmac_digest()."""
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py
index 2cafa4e45a1..0af1c45ecb2 100644
--- a/Lib/test/test_httpservers.py
+++ b/Lib/test/test_httpservers.py
@@ -3,16 +3,16 @@
Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
"""
-from collections import OrderedDict
+
from http.server import BaseHTTPRequestHandler, HTTPServer, HTTPSServer, \
- SimpleHTTPRequestHandler, CGIHTTPRequestHandler
+ SimpleHTTPRequestHandler
from http import server, HTTPStatus
+import contextlib
import os
import socket
import sys
import re
-import base64
import ntpath
import pathlib
import shutil
@@ -31,7 +31,7 @@ from io import BytesIO, StringIO
import unittest
from test import support
from test.support import (
- is_apple, import_helper, os_helper, requires_subprocess, threading_helper
+ is_apple, import_helper, os_helper, threading_helper
)
try:
@@ -522,42 +522,120 @@ class SimpleHTTPServerTestCase(BaseTestCase):
reader.close()
return body
+ def check_list_dir_dirname(self, dirname, quotedname=None):
+ fullpath = os.path.join(self.tempdir, dirname)
+ try:
+ os.mkdir(os.path.join(self.tempdir, dirname))
+ except (OSError, UnicodeEncodeError):
+ self.skipTest(f'Can not create directory {dirname!a} '
+ f'on current file system')
+
+ if quotedname is None:
+ quotedname = urllib.parse.quote(dirname, errors='surrogatepass')
+ response = self.request(self.base_url + '/' + quotedname + '/')
+ body = self.check_status_and_reason(response, HTTPStatus.OK)
+ displaypath = html.escape(f'{self.base_url}/{dirname}/', quote=False)
+ enc = sys.getfilesystemencoding()
+ prefix = f'listing for {displaypath}</'.encode(enc, 'surrogateescape')
+ self.assertIn(prefix + b'title>', body)
+ self.assertIn(prefix + b'h1>', body)
+
+ def check_list_dir_filename(self, filename):
+ fullpath = os.path.join(self.tempdir, filename)
+ content = ascii(fullpath).encode() + (os_helper.TESTFN_UNDECODABLE or b'\xff')
+ try:
+ with open(fullpath, 'wb') as f:
+ f.write(content)
+ except OSError:
+ self.skipTest(f'Can not create file {filename!a} '
+ f'on current file system')
+
+ response = self.request(self.base_url + '/')
+ body = self.check_status_and_reason(response, HTTPStatus.OK)
+ quotedname = urllib.parse.quote(filename, errors='surrogatepass')
+ enc = response.headers.get_content_charset()
+ self.assertIsNotNone(enc)
+ self.assertIn((f'href="{quotedname}"').encode('ascii'), body)
+ displayname = html.escape(filename, quote=False)
+ self.assertIn(f'>{displayname}<'.encode(enc, 'surrogateescape'), body)
+
+ response = self.request(self.base_url + '/' + quotedname)
+ self.check_status_and_reason(response, HTTPStatus.OK, data=content)
+
+ @unittest.skipUnless(os_helper.TESTFN_NONASCII,
+ 'need os_helper.TESTFN_NONASCII')
+ def test_list_dir_nonascii_dirname(self):
+ dirname = os_helper.TESTFN_NONASCII + '.dir'
+ self.check_list_dir_dirname(dirname)
+
+ @unittest.skipUnless(os_helper.TESTFN_NONASCII,
+ 'need os_helper.TESTFN_NONASCII')
+ def test_list_dir_nonascii_filename(self):
+ filename = os_helper.TESTFN_NONASCII + '.txt'
+ self.check_list_dir_filename(filename)
+
@unittest.skipIf(is_apple,
'undecodable name cannot always be decoded on Apple platforms')
@unittest.skipIf(sys.platform == 'win32',
'undecodable name cannot be decoded on win32')
@unittest.skipUnless(os_helper.TESTFN_UNDECODABLE,
'need os_helper.TESTFN_UNDECODABLE')
- def test_undecodable_filename(self):
- enc = sys.getfilesystemencoding()
- filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt'
- with open(os.path.join(self.tempdir, filename), 'wb') as f:
- f.write(os_helper.TESTFN_UNDECODABLE)
- response = self.request(self.base_url + '/')
- if is_apple:
- # On Apple platforms the HFS+ filesystem replaces bytes that
- # aren't valid UTF-8 into a percent-encoded value.
- for name in os.listdir(self.tempdir):
- if name != 'test': # Ignore a filename created in setUp().
- filename = name
- break
- body = self.check_status_and_reason(response, HTTPStatus.OK)
- quotedname = urllib.parse.quote(filename, errors='surrogatepass')
- self.assertIn(('href="%s"' % quotedname)
- .encode(enc, 'surrogateescape'), body)
- self.assertIn(('>%s<' % html.escape(filename, quote=False))
- .encode(enc, 'surrogateescape'), body)
- response = self.request(self.base_url + '/' + quotedname)
- self.check_status_and_reason(response, HTTPStatus.OK,
- data=os_helper.TESTFN_UNDECODABLE)
+ def test_list_dir_undecodable_dirname(self):
+ dirname = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.dir'
+ self.check_list_dir_dirname(dirname)
- def test_undecodable_parameter(self):
- # sanity check using a valid parameter
+ @unittest.skipIf(is_apple,
+ 'undecodable name cannot always be decoded on Apple platforms')
+ @unittest.skipIf(sys.platform == 'win32',
+ 'undecodable name cannot be decoded on win32')
+ @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE,
+ 'need os_helper.TESTFN_UNDECODABLE')
+ def test_list_dir_undecodable_filename(self):
+ filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt'
+ self.check_list_dir_filename(filename)
+
+ def test_list_dir_undecodable_dirname2(self):
+ dirname = '\ufffd.dir'
+ self.check_list_dir_dirname(dirname, quotedname='%ff.dir')
+
+ @unittest.skipUnless(os_helper.TESTFN_UNENCODABLE,
+ 'need os_helper.TESTFN_UNENCODABLE')
+ def test_list_dir_unencodable_dirname(self):
+ dirname = os_helper.TESTFN_UNENCODABLE + '.dir'
+ self.check_list_dir_dirname(dirname)
+
+ @unittest.skipUnless(os_helper.TESTFN_UNENCODABLE,
+ 'need os_helper.TESTFN_UNENCODABLE')
+ def test_list_dir_unencodable_filename(self):
+ filename = os_helper.TESTFN_UNENCODABLE + '.txt'
+ self.check_list_dir_filename(filename)
+
+ def test_list_dir_escape_dirname(self):
+ # Characters that need special treating in URL or HTML.
+ for name in ('q?', 'f#', '&amp;', '&amp', '<i>', '"dq"', "'sq'",
+ '%A4', '%E2%82%AC'):
+ with self.subTest(name=name):
+ dirname = name + '.dir'
+ self.check_list_dir_dirname(dirname,
+ quotedname=urllib.parse.quote(dirname, safe='&<>\'"'))
+
+ def test_list_dir_escape_filename(self):
+ # Characters that need special treating in URL or HTML.
+ for name in ('q?', 'f#', '&amp;', '&amp', '<i>', '"dq"', "'sq'",
+ '%A4', '%E2%82%AC'):
+ with self.subTest(name=name):
+ filename = name + '.txt'
+ self.check_list_dir_filename(filename)
+ os_helper.unlink(os.path.join(self.tempdir, filename))
+
+ def test_list_dir_with_query_and_fragment(self):
+ prefix = f'listing for {self.base_url}/</'.encode('latin1')
+ response = self.request(self.base_url + '/#123').read()
+ self.assertIn(prefix + b'title>', response)
+ self.assertIn(prefix + b'h1>', response)
response = self.request(self.base_url + '/?x=123').read()
- self.assertRegex(response, rf'listing for {self.base_url}/\?x=123'.encode('latin1'))
- # now the bogus encoding
- response = self.request(self.base_url + '/?x=%bb').read()
- self.assertRegex(response, rf'listing for {self.base_url}/\?x=\xef\xbf\xbd'.encode('latin1'))
+ self.assertIn(prefix + b'title>', response)
+ self.assertIn(prefix + b'h1>', response)
def test_get_dir_redirect_location_domain_injection_bug(self):
"""Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location.
@@ -615,10 +693,19 @@ class SimpleHTTPServerTestCase(BaseTestCase):
# check for trailing "/" which should return 404. See Issue17324
response = self.request(self.base_url + '/test/')
self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
+ response = self.request(self.base_url + '/test%2f')
+ self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
+ response = self.request(self.base_url + '/test%2F')
+ self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
response = self.request(self.base_url + '/')
self.check_status_and_reason(response, HTTPStatus.OK)
+ response = self.request(self.base_url + '%2f')
+ self.check_status_and_reason(response, HTTPStatus.OK)
+ response = self.request(self.base_url + '%2F')
+ self.check_status_and_reason(response, HTTPStatus.OK)
response = self.request(self.base_url)
self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
+ self.assertEqual(response.getheader("Location"), self.base_url + "/")
self.assertEqual(response.getheader("Content-Length"), "0")
response = self.request(self.base_url + '/?hi=2')
self.check_status_and_reason(response, HTTPStatus.OK)
@@ -724,6 +811,8 @@ class SimpleHTTPServerTestCase(BaseTestCase):
self.check_status_and_reason(response, HTTPStatus.OK)
response = self.request(self.tempdir_name)
self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
+ self.assertEqual(response.getheader("Location"),
+ self.tempdir_name + "/")
response = self.request(self.tempdir_name + '/?hi=2')
self.check_status_and_reason(response, HTTPStatus.OK)
response = self.request(self.tempdir_name + '?hi=1')
@@ -731,350 +820,6 @@ class SimpleHTTPServerTestCase(BaseTestCase):
self.assertEqual(response.getheader("Location"),
self.tempdir_name + "/?hi=1")
- def test_html_escape_filename(self):
- filename = '<test&>.txt'
- fullpath = os.path.join(self.tempdir, filename)
-
- try:
- open(fullpath, 'wb').close()
- except OSError:
- raise unittest.SkipTest('Can not create file %s on current file '
- 'system' % filename)
-
- try:
- response = self.request(self.base_url + '/')
- body = self.check_status_and_reason(response, HTTPStatus.OK)
- enc = response.headers.get_content_charset()
- finally:
- os.unlink(fullpath) # avoid affecting test_undecodable_filename
-
- self.assertIsNotNone(enc)
- html_text = '>%s<' % html.escape(filename, quote=False)
- self.assertIn(html_text.encode(enc), body)
-
-
-cgi_file1 = """\
-#!%s
-
-print("Content-type: text/html")
-print()
-print("Hello World")
-"""
-
-cgi_file2 = """\
-#!%s
-import os
-import sys
-import urllib.parse
-
-print("Content-type: text/html")
-print()
-
-content_length = int(os.environ["CONTENT_LENGTH"])
-query_string = sys.stdin.buffer.read(content_length)
-params = {key.decode("utf-8"): val.decode("utf-8")
- for key, val in urllib.parse.parse_qsl(query_string)}
-
-print("%%s, %%s, %%s" %% (params["spam"], params["eggs"], params["bacon"]))
-"""
-
-cgi_file4 = """\
-#!%s
-import os
-
-print("Content-type: text/html")
-print()
-
-print(os.environ["%s"])
-"""
-
-cgi_file6 = """\
-#!%s
-import os
-
-print("X-ambv: was here")
-print("Content-type: text/html")
-print()
-print("<pre>")
-for k, v in os.environ.items():
- try:
- k.encode('ascii')
- v.encode('ascii')
- except UnicodeEncodeError:
- continue # see: BPO-44647
- print(f"{k}={v}")
-print("</pre>")
-"""
-
-
-@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
- "This test can't be run reliably as root (issue #13308).")
-@requires_subprocess()
-class CGIHTTPServerTestCase(BaseTestCase):
- class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
- _test_case_self = None # populated by each setUp() method call.
-
- def __init__(self, *args, **kwargs):
- with self._test_case_self.assertWarnsRegex(
- DeprecationWarning,
- r'http\.server\.CGIHTTPRequestHandler'):
- # This context also happens to catch and silence the
- # threading DeprecationWarning from os.fork().
- super().__init__(*args, **kwargs)
-
- linesep = os.linesep.encode('ascii')
-
- def setUp(self):
- self.request_handler._test_case_self = self # practical, but yuck.
- BaseTestCase.setUp(self)
- self.cwd = os.getcwd()
- self.parent_dir = tempfile.mkdtemp()
- self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
- self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
- self.sub_dir_1 = os.path.join(self.parent_dir, 'sub')
- self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir')
- self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin')
- os.mkdir(self.cgi_dir)
- os.mkdir(self.cgi_child_dir)
- os.mkdir(self.sub_dir_1)
- os.mkdir(self.sub_dir_2)
- os.mkdir(self.cgi_dir_in_sub_dir)
- self.nocgi_path = None
- self.file1_path = None
- self.file2_path = None
- self.file3_path = None
- self.file4_path = None
- self.file5_path = None
-
- # The shebang line should be pure ASCII: use symlink if possible.
- # See issue #7668.
- self._pythonexe_symlink = None
- if os_helper.can_symlink():
- self.pythonexe = os.path.join(self.parent_dir, 'python')
- self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__()
- else:
- self.pythonexe = sys.executable
-
- try:
- # The python executable path is written as the first line of the
- # CGI Python script. The encoding cookie cannot be used, and so the
- # path should be encodable to the default script encoding (utf-8)
- self.pythonexe.encode('utf-8')
- except UnicodeEncodeError:
- self.tearDown()
- self.skipTest("Python executable path is not encodable to utf-8")
-
- self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
- with open(self.nocgi_path, 'w', encoding='utf-8') as fp:
- fp.write(cgi_file1 % self.pythonexe)
- os.chmod(self.nocgi_path, 0o777)
-
- self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
- with open(self.file1_path, 'w', encoding='utf-8') as file1:
- file1.write(cgi_file1 % self.pythonexe)
- os.chmod(self.file1_path, 0o777)
-
- self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
- with open(self.file2_path, 'w', encoding='utf-8') as file2:
- file2.write(cgi_file2 % self.pythonexe)
- os.chmod(self.file2_path, 0o777)
-
- self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
- with open(self.file3_path, 'w', encoding='utf-8') as file3:
- file3.write(cgi_file1 % self.pythonexe)
- os.chmod(self.file3_path, 0o777)
-
- self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
- with open(self.file4_path, 'w', encoding='utf-8') as file4:
- file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
- os.chmod(self.file4_path, 0o777)
-
- self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py')
- with open(self.file5_path, 'w', encoding='utf-8') as file5:
- file5.write(cgi_file1 % self.pythonexe)
- os.chmod(self.file5_path, 0o777)
-
- self.file6_path = os.path.join(self.cgi_dir, 'file6.py')
- with open(self.file6_path, 'w', encoding='utf-8') as file6:
- file6.write(cgi_file6 % self.pythonexe)
- os.chmod(self.file6_path, 0o777)
-
- os.chdir(self.parent_dir)
-
- def tearDown(self):
- self.request_handler._test_case_self = None
- try:
- os.chdir(self.cwd)
- if self._pythonexe_symlink:
- self._pythonexe_symlink.__exit__(None, None, None)
- if self.nocgi_path:
- os.remove(self.nocgi_path)
- if self.file1_path:
- os.remove(self.file1_path)
- if self.file2_path:
- os.remove(self.file2_path)
- if self.file3_path:
- os.remove(self.file3_path)
- if self.file4_path:
- os.remove(self.file4_path)
- if self.file5_path:
- os.remove(self.file5_path)
- if self.file6_path:
- os.remove(self.file6_path)
- os.rmdir(self.cgi_child_dir)
- os.rmdir(self.cgi_dir)
- os.rmdir(self.cgi_dir_in_sub_dir)
- os.rmdir(self.sub_dir_2)
- os.rmdir(self.sub_dir_1)
- # The 'gmon.out' file can be written in the current working
- # directory if C-level code profiling with gprof is enabled.
- os_helper.unlink(os.path.join(self.parent_dir, 'gmon.out'))
- os.rmdir(self.parent_dir)
- finally:
- BaseTestCase.tearDown(self)
-
- def test_url_collapse_path(self):
- # verify tail is the last portion and head is the rest on proper urls
- test_vectors = {
- '': '//',
- '..': IndexError,
- '/.//..': IndexError,
- '/': '//',
- '//': '//',
- '/\\': '//\\',
- '/.//': '//',
- 'cgi-bin/file1.py': '/cgi-bin/file1.py',
- '/cgi-bin/file1.py': '/cgi-bin/file1.py',
- 'a': '//a',
- '/a': '//a',
- '//a': '//a',
- './a': '//a',
- './C:/': '/C:/',
- '/a/b': '/a/b',
- '/a/b/': '/a/b/',
- '/a/b/.': '/a/b/',
- '/a/b/c/..': '/a/b/',
- '/a/b/c/../d': '/a/b/d',
- '/a/b/c/../d/e/../f': '/a/b/d/f',
- '/a/b/c/../d/e/../../f': '/a/b/f',
- '/a/b/c/../d/e/.././././..//f': '/a/b/f',
- '../a/b/c/../d/e/.././././..//f': IndexError,
- '/a/b/c/../d/e/../../../f': '/a/f',
- '/a/b/c/../d/e/../../../../f': '//f',
- '/a/b/c/../d/e/../../../../../f': IndexError,
- '/a/b/c/../d/e/../../../../f/..': '//',
- '/a/b/c/../d/e/../../../../f/../.': '//',
- }
- for path, expected in test_vectors.items():
- if isinstance(expected, type) and issubclass(expected, Exception):
- self.assertRaises(expected,
- server._url_collapse_path, path)
- else:
- actual = server._url_collapse_path(path)
- self.assertEqual(expected, actual,
- msg='path = %r\nGot: %r\nWanted: %r' %
- (path, actual, expected))
-
- def test_headers_and_content(self):
- res = self.request('/cgi-bin/file1.py')
- self.assertEqual(
- (res.read(), res.getheader('Content-type'), res.status),
- (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
-
- def test_issue19435(self):
- res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
- self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
-
- def test_post(self):
- params = urllib.parse.urlencode(
- {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
- headers = {'Content-type' : 'application/x-www-form-urlencoded'}
- res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
-
- self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
-
- def test_invaliduri(self):
- res = self.request('/cgi-bin/invalid')
- res.read()
- self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
-
- def test_authorization(self):
- headers = {b'Authorization' : b'Basic ' +
- base64.b64encode(b'username:pass')}
- res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
- self.assertEqual(
- (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
-
- def test_no_leading_slash(self):
- # http://bugs.python.org/issue2254
- res = self.request('cgi-bin/file1.py')
- self.assertEqual(
- (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
-
- def test_os_environ_is_not_altered(self):
- signature = "Test CGI Server"
- os.environ['SERVER_SOFTWARE'] = signature
- res = self.request('/cgi-bin/file1.py')
- self.assertEqual(
- (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
- self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
-
- def test_urlquote_decoding_in_cgi_check(self):
- res = self.request('/cgi-bin%2ffile1.py')
- self.assertEqual(
- (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
-
- def test_nested_cgi_path_issue21323(self):
- res = self.request('/cgi-bin/child-dir/file3.py')
- self.assertEqual(
- (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
-
- def test_query_with_multiple_question_mark(self):
- res = self.request('/cgi-bin/file4.py?a=b?c=d')
- self.assertEqual(
- (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
-
- def test_query_with_continuous_slashes(self):
- res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
- self.assertEqual(
- (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
- 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
-
- def test_cgi_path_in_sub_directories(self):
- try:
- CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin')
- res = self.request('/sub/dir/cgi-bin/file5.py')
- self.assertEqual(
- (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
- (res.read(), res.getheader('Content-type'), res.status))
- finally:
- CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin')
-
- def test_accept(self):
- browser_accept = \
- 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
- tests = (
- ((('Accept', browser_accept),), browser_accept),
- ((), ''),
- # Hack case to get two values for the one header
- ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')),
- 'text/html,text/plain'),
- )
- for headers, expected in tests:
- headers = OrderedDict(headers)
- with self.subTest(headers):
- res = self.request('/cgi-bin/file6.py', 'GET', headers=headers)
- self.assertEqual(http.HTTPStatus.OK, res.status)
- expected = f"HTTP_ACCEPT={expected}".encode('ascii')
- self.assertIn(expected, res.read())
-
class SocketlessRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, directory=None):
@@ -1095,6 +840,7 @@ class SocketlessRequestHandler(SimpleHTTPRequestHandler):
def log_message(self, format, *args):
pass
+
class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
def handle_expect_100(self):
self.send_error(HTTPStatus.EXPECTATION_FAILED)
@@ -1536,6 +1282,176 @@ class ScriptTestCase(unittest.TestCase):
self.assertEqual(mock_server.address_family, socket.AF_INET)
+class CommandLineTestCase(unittest.TestCase):
+ default_port = 8000
+ default_bind = None
+ default_protocol = 'HTTP/1.0'
+ default_handler = SimpleHTTPRequestHandler
+ default_server = unittest.mock.ANY
+ tls_cert = certdata_file('ssl_cert.pem')
+ tls_key = certdata_file('ssl_key.pem')
+ tls_password = 'somepass'
+ tls_cert_options = ['--tls-cert']
+ tls_key_options = ['--tls-key']
+ tls_password_options = ['--tls-password-file']
+ args = {
+ 'HandlerClass': default_handler,
+ 'ServerClass': default_server,
+ 'protocol': default_protocol,
+ 'port': default_port,
+ 'bind': default_bind,
+ 'tls_cert': None,
+ 'tls_key': None,
+ 'tls_password': None,
+ }
+
+ def setUp(self):
+ super().setUp()
+ self.tls_password_file = tempfile.mktemp()
+ with open(self.tls_password_file, 'wb') as f:
+ f.write(self.tls_password.encode())
+ self.addCleanup(os_helper.unlink, self.tls_password_file)
+
+ def invoke_httpd(self, *args, stdout=None, stderr=None):
+ stdout = StringIO() if stdout is None else stdout
+ stderr = StringIO() if stderr is None else stderr
+ with contextlib.redirect_stdout(stdout), \
+ contextlib.redirect_stderr(stderr):
+ server._main(args)
+ return stdout.getvalue(), stderr.getvalue()
+
+ @mock.patch('http.server.test')
+ def test_port_flag(self, mock_func):
+ ports = [8000, 65535]
+ for port in ports:
+ with self.subTest(port=port):
+ self.invoke_httpd(str(port))
+ call_args = self.args | dict(port=port)
+ mock_func.assert_called_once_with(**call_args)
+ mock_func.reset_mock()
+
+ @mock.patch('http.server.test')
+ def test_directory_flag(self, mock_func):
+ options = ['-d', '--directory']
+ directories = ['.', '/foo', '\\bar', '/',
+ 'C:\\', 'C:\\foo', 'C:\\bar',
+ '/home/user', './foo/foo2', 'D:\\foo\\bar']
+ for flag in options:
+ for directory in directories:
+ with self.subTest(flag=flag, directory=directory):
+ self.invoke_httpd(flag, directory)
+ mock_func.assert_called_once_with(**self.args)
+ mock_func.reset_mock()
+
+ @mock.patch('http.server.test')
+ def test_bind_flag(self, mock_func):
+ options = ['-b', '--bind']
+ bind_addresses = ['localhost', '127.0.0.1', '::1',
+ '0.0.0.0', '8.8.8.8']
+ for flag in options:
+ for bind_address in bind_addresses:
+ with self.subTest(flag=flag, bind_address=bind_address):
+ self.invoke_httpd(flag, bind_address)
+ call_args = self.args | dict(bind=bind_address)
+ mock_func.assert_called_once_with(**call_args)
+ mock_func.reset_mock()
+
+ @mock.patch('http.server.test')
+ def test_protocol_flag(self, mock_func):
+ options = ['-p', '--protocol']
+ protocols = ['HTTP/1.0', 'HTTP/1.1', 'HTTP/2.0', 'HTTP/3.0']
+ for flag in options:
+ for protocol in protocols:
+ with self.subTest(flag=flag, protocol=protocol):
+ self.invoke_httpd(flag, protocol)
+ call_args = self.args | dict(protocol=protocol)
+ mock_func.assert_called_once_with(**call_args)
+ mock_func.reset_mock()
+
+ @unittest.skipIf(ssl is None, "requires ssl")
+ @mock.patch('http.server.test')
+ def test_tls_cert_and_key_flags(self, mock_func):
+ for tls_cert_option in self.tls_cert_options:
+ for tls_key_option in self.tls_key_options:
+ self.invoke_httpd(tls_cert_option, self.tls_cert,
+ tls_key_option, self.tls_key)
+ call_args = self.args | {
+ 'tls_cert': self.tls_cert,
+ 'tls_key': self.tls_key,
+ }
+ mock_func.assert_called_once_with(**call_args)
+ mock_func.reset_mock()
+
+ @unittest.skipIf(ssl is None, "requires ssl")
+ @mock.patch('http.server.test')
+ def test_tls_cert_and_key_and_password_flags(self, mock_func):
+ for tls_cert_option in self.tls_cert_options:
+ for tls_key_option in self.tls_key_options:
+ for tls_password_option in self.tls_password_options:
+ self.invoke_httpd(tls_cert_option,
+ self.tls_cert,
+ tls_key_option,
+ self.tls_key,
+ tls_password_option,
+ self.tls_password_file)
+ call_args = self.args | {
+ 'tls_cert': self.tls_cert,
+ 'tls_key': self.tls_key,
+ 'tls_password': self.tls_password,
+ }
+ mock_func.assert_called_once_with(**call_args)
+ mock_func.reset_mock()
+
+ @unittest.skipIf(ssl is None, "requires ssl")
+ @mock.patch('http.server.test')
+ def test_missing_tls_cert_flag(self, mock_func):
+ for tls_key_option in self.tls_key_options:
+ with self.assertRaises(SystemExit):
+ self.invoke_httpd(tls_key_option, self.tls_key)
+ mock_func.reset_mock()
+
+ for tls_password_option in self.tls_password_options:
+ with self.assertRaises(SystemExit):
+ self.invoke_httpd(tls_password_option, self.tls_password)
+ mock_func.reset_mock()
+
+ @unittest.skipIf(ssl is None, "requires ssl")
+ @mock.patch('http.server.test')
+ def test_invalid_password_file(self, mock_func):
+ non_existent_file = 'non_existent_file'
+ for tls_password_option in self.tls_password_options:
+ for tls_cert_option in self.tls_cert_options:
+ with self.assertRaises(SystemExit):
+ self.invoke_httpd(tls_cert_option,
+ self.tls_cert,
+ tls_password_option,
+ non_existent_file)
+
+ @mock.patch('http.server.test')
+ def test_no_arguments(self, mock_func):
+ self.invoke_httpd()
+ mock_func.assert_called_once_with(**self.args)
+ mock_func.reset_mock()
+
+ @mock.patch('http.server.test')
+ def test_help_flag(self, _):
+ options = ['-h', '--help']
+ for option in options:
+ stdout, stderr = StringIO(), StringIO()
+ with self.assertRaises(SystemExit):
+ self.invoke_httpd(option, stdout=stdout, stderr=stderr)
+ self.assertIn('usage', stdout.getvalue())
+ self.assertEqual(stderr.getvalue(), '')
+
+ @mock.patch('http.server.test')
+ def test_unknown_flag(self, _):
+ stdout, stderr = StringIO(), StringIO()
+ with self.assertRaises(SystemExit):
+ self.invoke_httpd('--unknown-flag', stdout=stdout, stderr=stderr)
+ self.assertEqual(stdout.getvalue(), '')
+ self.assertIn('error', stderr.getvalue())
+
+
def setUpModule():
unittest.addModuleCleanup(os.chdir, os.getcwd())
diff --git a/Lib/test/test_importlib/import_/test_relative_imports.py b/Lib/test/test_importlib/import_/test_relative_imports.py
index e535d119763..1549cbe96ce 100644
--- a/Lib/test/test_importlib/import_/test_relative_imports.py
+++ b/Lib/test/test_importlib/import_/test_relative_imports.py
@@ -223,6 +223,21 @@ class RelativeImports:
self.__import__('sys', {'__package__': '', '__spec__': None},
level=1)
+ def test_malicious_relative_import(self):
+ # https://github.com/python/cpython/issues/134100
+ # Test to make sure UAF bug with error msg doesn't come back to life
+ import sys
+ loooong = "".ljust(0x23000, "b")
+ name = f"a.{loooong}.c"
+
+ with util.uncache(name):
+ sys.modules[name] = {}
+ with self.assertRaisesRegex(
+ KeyError,
+ r"'a\.b+' not in sys\.modules as expected"
+ ):
+ __import__(f"{loooong}.c", {"__package__": "a"}, level=1)
+
(Frozen_RelativeImports,
Source_RelativeImports
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index 66c7afce88f..1e2d572b1cb 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -1452,6 +1452,14 @@ class LowLevelTests(TestBase):
self.assertFalse(
self.interp_exists(interpid))
+ with self.subTest('basic C-API'):
+ interpid = _testinternalcapi.create_interpreter()
+ self.assertTrue(
+ self.interp_exists(interpid))
+ _testinternalcapi.destroy_interpreter(interpid, basic=True)
+ self.assertFalse(
+ self.interp_exists(interpid))
+
def test_get_config(self):
# This test overlaps with
# test.test_capi.test_misc.InterpreterConfigTests.
diff --git a/Lib/test/test_ioctl.py b/Lib/test/test_ioctl.py
index 7a986048bda..3c7a58aa2bc 100644
--- a/Lib/test/test_ioctl.py
+++ b/Lib/test/test_ioctl.py
@@ -127,9 +127,8 @@ class IoctlTestsTty(unittest.TestCase):
self._check_ioctl_not_mutate_len(1024)
def test_ioctl_mutate_2048(self):
- # Test with a larger buffer, just for the record.
self._check_ioctl_mutate_len(2048)
- self.assertRaises(ValueError, self._check_ioctl_not_mutate_len, 2048)
+ self._check_ioctl_not_mutate_len(1024)
@unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 1e5adcc8db1..fa5b1e43816 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -61,7 +61,7 @@ import warnings
import weakref
from http.server import HTTPServer, BaseHTTPRequestHandler
-from unittest.mock import patch
+from unittest.mock import call, Mock, patch
from urllib.parse import urlparse, parse_qs
from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
ThreadingTCPServer, StreamRequestHandler)
@@ -5655,12 +5655,19 @@ class BasicConfigTest(unittest.TestCase):
assertRaises = self.assertRaises
handlers = [logging.StreamHandler()]
stream = sys.stderr
+ formatter = logging.Formatter()
assertRaises(ValueError, logging.basicConfig, filename='test.log',
stream=stream)
assertRaises(ValueError, logging.basicConfig, filename='test.log',
handlers=handlers)
assertRaises(ValueError, logging.basicConfig, stream=stream,
handlers=handlers)
+ assertRaises(ValueError, logging.basicConfig, formatter=formatter,
+ format='%(message)s')
+ assertRaises(ValueError, logging.basicConfig, formatter=formatter,
+ datefmt='%H:%M:%S')
+ assertRaises(ValueError, logging.basicConfig, formatter=formatter,
+ style='%')
# Issue 23207: test for invalid kwargs
assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
# Should pop both filename and filemode even if filename is None
@@ -5795,6 +5802,20 @@ class BasicConfigTest(unittest.TestCase):
# didn't write anything due to the encoding error
self.assertEqual(data, r'')
+ def test_formatter_given(self):
+ mock_formatter = Mock()
+ mock_handler = Mock(formatter=None)
+ with patch("logging.Formatter") as mock_formatter_init:
+ logging.basicConfig(formatter=mock_formatter, handlers=[mock_handler])
+ self.assertEqual(mock_handler.setFormatter.call_args_list, [call(mock_formatter)])
+ self.assertEqual(mock_formatter_init.call_count, 0)
+
+ def test_formatter_not_given(self):
+ mock_handler = Mock(formatter=None)
+ with patch("logging.Formatter") as mock_formatter_init:
+ logging.basicConfig(handlers=[mock_handler])
+ self.assertEqual(mock_formatter_init.call_count, 1)
+
@support.requires_working_socket()
def test_log_taskName(self):
async def log_record():
diff --git a/Lib/test/test_ntpath.py b/Lib/test/test_ntpath.py
index c10387b58e3..f83ef225a6e 100644
--- a/Lib/test/test_ntpath.py
+++ b/Lib/test/test_ntpath.py
@@ -124,6 +124,22 @@ class TestNtpath(NtpathTestCase):
tester('ntpath.splitdrive("//?/UNC/server/share/dir")',
("//?/UNC/server/share", "/dir"))
+ def test_splitdrive_invalid_paths(self):
+ splitdrive = ntpath.splitdrive
+ self.assertEqual(splitdrive('\\\\ser\x00ver\\sha\x00re\\di\x00r'),
+ ('\\\\ser\x00ver\\sha\x00re', '\\di\x00r'))
+ self.assertEqual(splitdrive(b'\\\\ser\x00ver\\sha\x00re\\di\x00r'),
+ (b'\\\\ser\x00ver\\sha\x00re', b'\\di\x00r'))
+ self.assertEqual(splitdrive("\\\\\udfff\\\udffe\\\udffd"),
+ ('\\\\\udfff\\\udffe', '\\\udffd'))
+ if sys.platform == 'win32':
+ self.assertRaises(UnicodeDecodeError, splitdrive, b'\\\\\xff\\share\\dir')
+ self.assertRaises(UnicodeDecodeError, splitdrive, b'\\\\server\\\xff\\dir')
+ self.assertRaises(UnicodeDecodeError, splitdrive, b'\\\\server\\share\\\xff')
+ else:
+ self.assertEqual(splitdrive(b'\\\\\xff\\\xfe\\\xfd'),
+ (b'\\\\\xff\\\xfe', b'\\\xfd'))
+
def test_splitroot(self):
tester("ntpath.splitroot('')", ('', '', ''))
tester("ntpath.splitroot('foo')", ('', '', 'foo'))
@@ -214,6 +230,22 @@ class TestNtpath(NtpathTestCase):
tester('ntpath.splitroot(" :/foo")', (" :", "/", "foo"))
tester('ntpath.splitroot("/:/foo")', ("", "/", ":/foo"))
+ def test_splitroot_invalid_paths(self):
+ splitroot = ntpath.splitroot
+ self.assertEqual(splitroot('\\\\ser\x00ver\\sha\x00re\\di\x00r'),
+ ('\\\\ser\x00ver\\sha\x00re', '\\', 'di\x00r'))
+ self.assertEqual(splitroot(b'\\\\ser\x00ver\\sha\x00re\\di\x00r'),
+ (b'\\\\ser\x00ver\\sha\x00re', b'\\', b'di\x00r'))
+ self.assertEqual(splitroot("\\\\\udfff\\\udffe\\\udffd"),
+ ('\\\\\udfff\\\udffe', '\\', '\udffd'))
+ if sys.platform == 'win32':
+ self.assertRaises(UnicodeDecodeError, splitroot, b'\\\\\xff\\share\\dir')
+ self.assertRaises(UnicodeDecodeError, splitroot, b'\\\\server\\\xff\\dir')
+ self.assertRaises(UnicodeDecodeError, splitroot, b'\\\\server\\share\\\xff')
+ else:
+ self.assertEqual(splitroot(b'\\\\\xff\\\xfe\\\xfd'),
+ (b'\\\\\xff\\\xfe', b'\\', b'\xfd'))
+
def test_split(self):
tester('ntpath.split("c:\\foo\\bar")', ('c:\\foo', 'bar'))
tester('ntpath.split("\\\\conky\\mountpoint\\foo\\bar")',
@@ -226,6 +258,21 @@ class TestNtpath(NtpathTestCase):
tester('ntpath.split("c:/")', ('c:/', ''))
tester('ntpath.split("//conky/mountpoint/")', ('//conky/mountpoint/', ''))
+ def test_split_invalid_paths(self):
+ split = ntpath.split
+ self.assertEqual(split('c:\\fo\x00o\\ba\x00r'),
+ ('c:\\fo\x00o', 'ba\x00r'))
+ self.assertEqual(split(b'c:\\fo\x00o\\ba\x00r'),
+ (b'c:\\fo\x00o', b'ba\x00r'))
+ self.assertEqual(split('c:\\\udfff\\\udffe'),
+ ('c:\\\udfff', '\udffe'))
+ if sys.platform == 'win32':
+ self.assertRaises(UnicodeDecodeError, split, b'c:\\\xff\\bar')
+ self.assertRaises(UnicodeDecodeError, split, b'c:\\foo\\\xff')
+ else:
+ self.assertEqual(split(b'c:\\\xff\\\xfe'),
+ (b'c:\\\xff', b'\xfe'))
+
def test_isabs(self):
tester('ntpath.isabs("foo\\bar")', 0)
tester('ntpath.isabs("foo/bar")', 0)
@@ -333,6 +380,30 @@ class TestNtpath(NtpathTestCase):
tester("ntpath.join('D:a', './c:b')", 'D:a\\.\\c:b')
tester("ntpath.join('D:/a', './c:b')", 'D:\\a\\.\\c:b')
+ def test_normcase(self):
+ normcase = ntpath.normcase
+ self.assertEqual(normcase(''), '')
+ self.assertEqual(normcase(b''), b'')
+ self.assertEqual(normcase('ABC'), 'abc')
+ self.assertEqual(normcase(b'ABC'), b'abc')
+ self.assertEqual(normcase('\xc4\u0141\u03a8'), '\xe4\u0142\u03c8')
+ expected = '\u03c9\u2126' if sys.platform == 'win32' else '\u03c9\u03c9'
+ self.assertEqual(normcase('\u03a9\u2126'), expected)
+ if sys.platform == 'win32' or sys.getfilesystemencoding() == 'utf-8':
+ self.assertEqual(normcase('\xc4\u0141\u03a8'.encode()),
+ '\xe4\u0142\u03c8'.encode())
+ self.assertEqual(normcase('\u03a9\u2126'.encode()),
+ expected.encode())
+
+ def test_normcase_invalid_paths(self):
+ normcase = ntpath.normcase
+ self.assertEqual(normcase('abc\x00def'), 'abc\x00def')
+ self.assertEqual(normcase(b'abc\x00def'), b'abc\x00def')
+ self.assertEqual(normcase('\udfff'), '\udfff')
+ if sys.platform == 'win32':
+ path = b'ABC' + bytes(range(128, 256))
+ self.assertEqual(normcase(path), path.lower())
+
def test_normpath(self):
tester("ntpath.normpath('A//////././//.//B')", r'A\B')
tester("ntpath.normpath('A/./B')", r'A\B')
@@ -381,6 +452,21 @@ class TestNtpath(NtpathTestCase):
tester("ntpath.normpath('\\\\')", '\\\\')
tester("ntpath.normpath('//?/UNC/server/share/..')", '\\\\?\\UNC\\server\\share\\')
+ def test_normpath_invalid_paths(self):
+ normpath = ntpath.normpath
+ self.assertEqual(normpath('fo\x00o'), 'fo\x00o')
+ self.assertEqual(normpath(b'fo\x00o'), b'fo\x00o')
+ self.assertEqual(normpath('fo\x00o\\..\\bar'), 'bar')
+ self.assertEqual(normpath(b'fo\x00o\\..\\bar'), b'bar')
+ self.assertEqual(normpath('\udfff'), '\udfff')
+ self.assertEqual(normpath('\udfff\\..\\foo'), 'foo')
+ if sys.platform == 'win32':
+ self.assertRaises(UnicodeDecodeError, normpath, b'\xff')
+ self.assertRaises(UnicodeDecodeError, normpath, b'\xff\\..\\foo')
+ else:
+ self.assertEqual(normpath(b'\xff'), b'\xff')
+ self.assertEqual(normpath(b'\xff\\..\\foo'), b'foo')
+
def test_realpath_curdir(self):
expected = ntpath.normpath(os.getcwd())
tester("ntpath.realpath('.')", expected)
@@ -420,10 +506,6 @@ class TestNtpath(NtpathTestCase):
d = drives.pop().encode()
self.assertEqual(ntpath.realpath(d), d)
- # gh-106242: Embedded nulls and non-strict fallback to abspath
- self.assertEqual(ABSTFN + "\0spam",
- ntpath.realpath(os_helper.TESTFN + "\0spam", strict=False))
-
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_strict(self):
@@ -434,8 +516,51 @@ class TestNtpath(NtpathTestCase):
self.addCleanup(os_helper.unlink, ABSTFN)
self.assertRaises(FileNotFoundError, ntpath.realpath, ABSTFN, strict=True)
self.assertRaises(FileNotFoundError, ntpath.realpath, ABSTFN + "2", strict=True)
+
+ @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
+ def test_realpath_invalid_paths(self):
+ realpath = ntpath.realpath
+ ABSTFN = ntpath.abspath(os_helper.TESTFN)
+ ABSTFNb = os.fsencode(ABSTFN)
+ path = ABSTFN + '\x00'
+ # gh-106242: Embedded nulls and non-strict fallback to abspath
+ self.assertEqual(realpath(path, strict=False), path)
# gh-106242: Embedded nulls should raise OSError (not ValueError)
- self.assertRaises(OSError, ntpath.realpath, ABSTFN + "\0spam", strict=True)
+ self.assertRaises(OSError, realpath, path, strict=True)
+ path = ABSTFNb + b'\x00'
+ self.assertEqual(realpath(path, strict=False), path)
+ self.assertRaises(OSError, realpath, path, strict=True)
+ path = ABSTFN + '\\nonexistent\\x\x00'
+ self.assertEqual(realpath(path, strict=False), path)
+ self.assertRaises(OSError, realpath, path, strict=True)
+ path = ABSTFNb + b'\\nonexistent\\x\x00'
+ self.assertEqual(realpath(path, strict=False), path)
+ self.assertRaises(OSError, realpath, path, strict=True)
+ path = ABSTFN + '\x00\\..'
+ self.assertEqual(realpath(path, strict=False), os.getcwd())
+ self.assertEqual(realpath(path, strict=True), os.getcwd())
+ path = ABSTFNb + b'\x00\\..'
+ self.assertEqual(realpath(path, strict=False), os.getcwdb())
+ self.assertEqual(realpath(path, strict=True), os.getcwdb())
+ path = ABSTFN + '\\nonexistent\\x\x00\\..'
+ self.assertEqual(realpath(path, strict=False), ABSTFN + '\\nonexistent')
+ self.assertRaises(OSError, realpath, path, strict=True)
+ path = ABSTFNb + b'\\nonexistent\\x\x00\\..'
+ self.assertEqual(realpath(path, strict=False), ABSTFNb + b'\\nonexistent')
+ self.assertRaises(OSError, realpath, path, strict=True)
+
+ path = ABSTFNb + b'\xff'
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ path = ABSTFNb + b'\\nonexistent\\\xff'
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ path = ABSTFNb + b'\xff\\..'
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ path = ABSTFNb + b'\\nonexistent\\\xff\\..'
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@@ -812,8 +937,6 @@ class TestNtpath(NtpathTestCase):
tester('ntpath.abspath("C:/nul")', "\\\\.\\nul")
tester('ntpath.abspath("C:\\nul")', "\\\\.\\nul")
self.assertTrue(ntpath.isabs(ntpath.abspath("C:spam")))
- self.assertEqual(ntpath.abspath("C:\x00"), ntpath.join(ntpath.abspath("C:"), "\x00"))
- self.assertEqual(ntpath.abspath("\x00:spam"), "\x00:\\spam")
tester('ntpath.abspath("//..")', "\\\\")
tester('ntpath.abspath("//../")', "\\\\..\\")
tester('ntpath.abspath("//../..")', "\\\\..\\")
@@ -847,6 +970,26 @@ class TestNtpath(NtpathTestCase):
drive, _ = ntpath.splitdrive(cwd_dir)
tester('ntpath.abspath("/abc/")', drive + "\\abc")
+ def test_abspath_invalid_paths(self):
+ abspath = ntpath.abspath
+ if sys.platform == 'win32':
+ self.assertEqual(abspath("C:\x00"), ntpath.join(abspath("C:"), "\x00"))
+ self.assertEqual(abspath(b"C:\x00"), ntpath.join(abspath(b"C:"), b"\x00"))
+ self.assertEqual(abspath("\x00:spam"), "\x00:\\spam")
+ self.assertEqual(abspath(b"\x00:spam"), b"\x00:\\spam")
+ self.assertEqual(abspath('c:\\fo\x00o'), 'c:\\fo\x00o')
+ self.assertEqual(abspath(b'c:\\fo\x00o'), b'c:\\fo\x00o')
+ self.assertEqual(abspath('c:\\fo\x00o\\..\\bar'), 'c:\\bar')
+ self.assertEqual(abspath(b'c:\\fo\x00o\\..\\bar'), b'c:\\bar')
+ self.assertEqual(abspath('c:\\\udfff'), 'c:\\\udfff')
+ self.assertEqual(abspath('c:\\\udfff\\..\\foo'), 'c:\\foo')
+ if sys.platform == 'win32':
+ self.assertRaises(UnicodeDecodeError, abspath, b'c:\\\xff')
+ self.assertRaises(UnicodeDecodeError, abspath, b'c:\\\xff\\..\\foo')
+ else:
+ self.assertEqual(abspath(b'c:\\\xff'), b'c:\\\xff')
+ self.assertEqual(abspath(b'c:\\\xff\\..\\foo'), b'c:\\foo')
+
def test_relpath(self):
tester('ntpath.relpath("a")', 'a')
tester('ntpath.relpath(ntpath.abspath("a"))', 'a')
@@ -989,6 +1132,18 @@ class TestNtpath(NtpathTestCase):
self.assertTrue(ntpath.ismount(b"\\\\localhost\\c$"))
self.assertTrue(ntpath.ismount(b"\\\\localhost\\c$\\"))
+ def test_ismount_invalid_paths(self):
+ ismount = ntpath.ismount
+ self.assertFalse(ismount("c:\\\udfff"))
+ if sys.platform == 'win32':
+ self.assertRaises(ValueError, ismount, "c:\\\x00")
+ self.assertRaises(ValueError, ismount, b"c:\\\x00")
+ self.assertRaises(UnicodeDecodeError, ismount, b"c:\\\xff")
+ else:
+ self.assertFalse(ismount("c:\\\x00"))
+ self.assertFalse(ismount(b"c:\\\x00"))
+ self.assertFalse(ismount(b"c:\\\xff"))
+
def test_isreserved(self):
self.assertFalse(ntpath.isreserved(''))
self.assertFalse(ntpath.isreserved('.'))
@@ -1095,6 +1250,13 @@ class TestNtpath(NtpathTestCase):
self.assertFalse(ntpath.isjunction('tmpdir'))
self.assertPathEqual(ntpath.realpath('testjunc'), ntpath.realpath('tmpdir'))
+ def test_isfile_invalid_paths(self):
+ isfile = ntpath.isfile
+ self.assertIs(isfile('/tmp\udfffabcds'), False)
+ self.assertIs(isfile(b'/tmp\xffabcds'), False)
+ self.assertIs(isfile('/tmp\x00abcds'), False)
+ self.assertIs(isfile(b'/tmp\x00abcds'), False)
+
@unittest.skipIf(sys.platform != 'win32', "drive letters are a windows concept")
def test_isfile_driveletter(self):
drive = os.environ.get('SystemDrive')
@@ -1195,9 +1357,6 @@ class PathLikeTests(NtpathTestCase):
def test_path_normcase(self):
self._check_function(self.path.normcase)
- if sys.platform == 'win32':
- self.assertEqual(ntpath.normcase('\u03a9\u2126'), 'ωΩ')
- self.assertEqual(ntpath.normcase('abc\x00def'), 'abc\x00def')
def test_path_isabs(self):
self._check_function(self.path.isabs)
diff --git a/Lib/test/test_pathlib/support/lexical_path.py b/Lib/test/test_pathlib/support/lexical_path.py
index f29a521af9b..fd7fbf283a6 100644
--- a/Lib/test/test_pathlib/support/lexical_path.py
+++ b/Lib/test/test_pathlib/support/lexical_path.py
@@ -9,9 +9,10 @@ import posixpath
from . import is_pypi
if is_pypi:
- from pathlib_abc import _JoinablePath
+ from pathlib_abc import vfspath, _JoinablePath
else:
from pathlib.types import _JoinablePath
+ from pathlib._os import vfspath
class LexicalPath(_JoinablePath):
@@ -22,20 +23,20 @@ class LexicalPath(_JoinablePath):
self._segments = pathsegments
def __hash__(self):
- return hash(str(self))
+ return hash(vfspath(self))
def __eq__(self, other):
if not isinstance(other, LexicalPath):
return NotImplemented
- return str(self) == str(other)
+ return vfspath(self) == vfspath(other)
- def __str__(self):
+ def __vfspath__(self):
if not self._segments:
return ''
return self.parser.join(*self._segments)
def __repr__(self):
- return f'{type(self).__name__}({str(self)!r})'
+ return f'{type(self).__name__}({vfspath(self)!r})'
def with_segments(self, *pathsegments):
return type(self)(*pathsegments)
diff --git a/Lib/test/test_pathlib/support/local_path.py b/Lib/test/test_pathlib/support/local_path.py
index d481fd45ead..c1423c545bf 100644
--- a/Lib/test/test_pathlib/support/local_path.py
+++ b/Lib/test/test_pathlib/support/local_path.py
@@ -97,7 +97,7 @@ class LocalPathInfo(PathInfo):
__slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink')
def __init__(self, path):
- self._path = str(path)
+ self._path = os.fspath(path)
self._exists = None
self._is_dir = None
self._is_file = None
@@ -139,14 +139,12 @@ class ReadableLocalPath(_ReadablePath, LexicalPath):
Simple implementation of a ReadablePath class for local filesystem paths.
"""
__slots__ = ('info',)
+ __fspath__ = LexicalPath.__vfspath__
def __init__(self, *pathsegments):
super().__init__(*pathsegments)
self.info = LocalPathInfo(self)
- def __fspath__(self):
- return str(self)
-
def __open_rb__(self, buffering=-1):
return open(self, 'rb')
@@ -163,9 +161,7 @@ class WritableLocalPath(_WritablePath, LexicalPath):
"""
__slots__ = ()
-
- def __fspath__(self):
- return str(self)
+ __fspath__ = LexicalPath.__vfspath__
def __open_wb__(self, buffering=-1):
return open(self, 'wb')
diff --git a/Lib/test/test_pathlib/support/zip_path.py b/Lib/test/test_pathlib/support/zip_path.py
index 2905260c9df..21e1d07423a 100644
--- a/Lib/test/test_pathlib/support/zip_path.py
+++ b/Lib/test/test_pathlib/support/zip_path.py
@@ -16,9 +16,10 @@ from stat import S_IFMT, S_ISDIR, S_ISREG, S_ISLNK
from . import is_pypi
if is_pypi:
- from pathlib_abc import PathInfo, _ReadablePath, _WritablePath
+ from pathlib_abc import vfspath, PathInfo, _ReadablePath, _WritablePath
else:
from pathlib.types import PathInfo, _ReadablePath, _WritablePath
+ from pathlib._os import vfspath
class ZipPathGround:
@@ -34,16 +35,16 @@ class ZipPathGround:
root.zip_file.close()
def create_file(self, path, data=b''):
- path.zip_file.writestr(str(path), data)
+ path.zip_file.writestr(vfspath(path), data)
def create_dir(self, path):
- zip_info = zipfile.ZipInfo(str(path) + '/')
+ zip_info = zipfile.ZipInfo(vfspath(path) + '/')
zip_info.external_attr |= stat.S_IFDIR << 16
zip_info.external_attr |= stat.FILE_ATTRIBUTE_DIRECTORY
path.zip_file.writestr(zip_info, '')
def create_symlink(self, path, target):
- zip_info = zipfile.ZipInfo(str(path))
+ zip_info = zipfile.ZipInfo(vfspath(path))
zip_info.external_attr = stat.S_IFLNK << 16
path.zip_file.writestr(zip_info, target.encode())
@@ -62,28 +63,28 @@ class ZipPathGround:
self.create_symlink(p.joinpath('brokenLinkLoop'), 'brokenLinkLoop')
def readtext(self, p):
- with p.zip_file.open(str(p), 'r') as f:
+ with p.zip_file.open(vfspath(p), 'r') as f:
f = io.TextIOWrapper(f, encoding='utf-8')
return f.read()
def readbytes(self, p):
- with p.zip_file.open(str(p), 'r') as f:
+ with p.zip_file.open(vfspath(p), 'r') as f:
return f.read()
readlink = readtext
def isdir(self, p):
- path_str = str(p) + "/"
+ path_str = vfspath(p) + "/"
return path_str in p.zip_file.NameToInfo
def isfile(self, p):
- info = p.zip_file.NameToInfo.get(str(p))
+ info = p.zip_file.NameToInfo.get(vfspath(p))
if info is None:
return False
return not stat.S_ISLNK(info.external_attr >> 16)
def islink(self, p):
- info = p.zip_file.NameToInfo.get(str(p))
+ info = p.zip_file.NameToInfo.get(vfspath(p))
if info is None:
return False
return stat.S_ISLNK(info.external_attr >> 16)
@@ -240,20 +241,20 @@ class ReadableZipPath(_ReadablePath):
zip_file.filelist = ZipFileList(zip_file)
def __hash__(self):
- return hash((str(self), self.zip_file))
+ return hash((vfspath(self), self.zip_file))
def __eq__(self, other):
if not isinstance(other, ReadableZipPath):
return NotImplemented
- return str(self) == str(other) and self.zip_file is other.zip_file
+ return vfspath(self) == vfspath(other) and self.zip_file is other.zip_file
- def __str__(self):
+ def __vfspath__(self):
if not self._segments:
return ''
return self.parser.join(*self._segments)
def __repr__(self):
- return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})'
+ return f'{type(self).__name__}({vfspath(self)!r}, zip_file={self.zip_file!r})'
def with_segments(self, *pathsegments):
return type(self)(*pathsegments, zip_file=self.zip_file)
@@ -261,7 +262,7 @@ class ReadableZipPath(_ReadablePath):
@property
def info(self):
tree = self.zip_file.filelist.tree
- return tree.resolve(str(self), follow_symlinks=False)
+ return tree.resolve(vfspath(self), follow_symlinks=False)
def __open_rb__(self, buffering=-1):
info = self.info.resolve()
@@ -301,36 +302,36 @@ class WritableZipPath(_WritablePath):
self.zip_file = zip_file
def __hash__(self):
- return hash((str(self), self.zip_file))
+ return hash((vfspath(self), self.zip_file))
def __eq__(self, other):
if not isinstance(other, WritableZipPath):
return NotImplemented
- return str(self) == str(other) and self.zip_file is other.zip_file
+ return vfspath(self) == vfspath(other) and self.zip_file is other.zip_file
- def __str__(self):
+ def __vfspath__(self):
if not self._segments:
return ''
return self.parser.join(*self._segments)
def __repr__(self):
- return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})'
+ return f'{type(self).__name__}({vfspath(self)!r}, zip_file={self.zip_file!r})'
def with_segments(self, *pathsegments):
return type(self)(*pathsegments, zip_file=self.zip_file)
def __open_wb__(self, buffering=-1):
- return self.zip_file.open(str(self), 'w')
+ return self.zip_file.open(vfspath(self), 'w')
def mkdir(self, mode=0o777):
- zinfo = zipfile.ZipInfo(str(self) + '/')
+ zinfo = zipfile.ZipInfo(vfspath(self) + '/')
zinfo.external_attr |= stat.S_IFDIR << 16
zinfo.external_attr |= stat.FILE_ATTRIBUTE_DIRECTORY
self.zip_file.writestr(zinfo, '')
def symlink_to(self, target, target_is_directory=False):
- zinfo = zipfile.ZipInfo(str(self))
+ zinfo = zipfile.ZipInfo(vfspath(self))
zinfo.external_attr = stat.S_IFLNK << 16
if target_is_directory:
zinfo.external_attr |= 0x10
- self.zip_file.writestr(zinfo, str(target))
+ self.zip_file.writestr(zinfo, vfspath(target))
diff --git a/Lib/test/test_pathlib/test_join_windows.py b/Lib/test/test_pathlib/test_join_windows.py
index 2cc634f25ef..f30c80605f7 100644
--- a/Lib/test/test_pathlib/test_join_windows.py
+++ b/Lib/test/test_pathlib/test_join_windows.py
@@ -8,6 +8,11 @@ import unittest
from .support import is_pypi
from .support.lexical_path import LexicalWindowsPath
+if is_pypi:
+ from pathlib_abc import vfspath
+else:
+ from pathlib._os import vfspath
+
class JoinTestBase:
def test_join(self):
@@ -70,17 +75,17 @@ class JoinTestBase:
self.assertEqual(p / './dd:s', P(r'C:/a/b\./dd:s'))
self.assertEqual(p / 'E:d:s', P('E:d:s'))
- def test_str(self):
+ def test_vfspath(self):
p = self.cls(r'a\b\c')
- self.assertEqual(str(p), 'a\\b\\c')
+ self.assertEqual(vfspath(p), 'a\\b\\c')
p = self.cls(r'c:\a\b\c')
- self.assertEqual(str(p), 'c:\\a\\b\\c')
+ self.assertEqual(vfspath(p), 'c:\\a\\b\\c')
p = self.cls('\\\\a\\b\\')
- self.assertEqual(str(p), '\\\\a\\b\\')
+ self.assertEqual(vfspath(p), '\\\\a\\b\\')
p = self.cls(r'\\a\b\c')
- self.assertEqual(str(p), '\\\\a\\b\\c')
+ self.assertEqual(vfspath(p), '\\\\a\\b\\c')
p = self.cls(r'\\a\b\c\d')
- self.assertEqual(str(p), '\\\\a\\b\\c\\d')
+ self.assertEqual(vfspath(p), '\\\\a\\b\\c\\d')
def test_parts(self):
P = self.cls
diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py
index 8a313cc4292..37ef9fa1946 100644
--- a/Lib/test/test_pathlib/test_pathlib.py
+++ b/Lib/test/test_pathlib/test_pathlib.py
@@ -20,7 +20,7 @@ from test.support import cpython_only
from test.support import is_emscripten, is_wasi
from test.support import infinite_recursion
from test.support import os_helper
-from test.support.os_helper import TESTFN, FakePath
+from test.support.os_helper import TESTFN, FS_NONASCII, FakePath
try:
import fcntl
except ImportError:
@@ -770,12 +770,16 @@ class PurePathTest(unittest.TestCase):
self.assertEqual(self.make_uri(P('c:/')), 'file:///c:/')
self.assertEqual(self.make_uri(P('c:/a/b.c')), 'file:///c:/a/b.c')
self.assertEqual(self.make_uri(P('c:/a/b%#c')), 'file:///c:/a/b%25%23c')
- self.assertEqual(self.make_uri(P('c:/a/b\xe9')), 'file:///c:/a/b%C3%A9')
self.assertEqual(self.make_uri(P('//some/share/')), 'file://some/share/')
self.assertEqual(self.make_uri(P('//some/share/a/b.c')),
'file://some/share/a/b.c')
- self.assertEqual(self.make_uri(P('//some/share/a/b%#c\xe9')),
- 'file://some/share/a/b%25%23c%C3%A9')
+
+ from urllib.parse import quote_from_bytes
+ QUOTED_FS_NONASCII = quote_from_bytes(os.fsencode(FS_NONASCII))
+ self.assertEqual(self.make_uri(P('c:/a/b' + FS_NONASCII)),
+ 'file:///c:/a/b' + QUOTED_FS_NONASCII)
+ self.assertEqual(self.make_uri(P('//some/share/a/b%#c' + FS_NONASCII)),
+ 'file://some/share/a/b%25%23c' + QUOTED_FS_NONASCII)
@needs_windows
def test_ordering_windows(self):
diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py
index 818e807dd3a..3b673a47c8c 100644
--- a/Lib/test/test_platform.py
+++ b/Lib/test/test_platform.py
@@ -383,15 +383,6 @@ class PlatformTest(unittest.TestCase):
finally:
platform._uname_cache = None
- def test_java_ver(self):
- import re
- msg = re.escape(
- "'java_ver' is deprecated and slated for removal in Python 3.15"
- )
- with self.assertWarnsRegex(DeprecationWarning, msg):
- res = platform.java_ver()
- self.assertEqual(len(res), 4)
-
@unittest.skipUnless(support.MS_WINDOWS, 'This test only makes sense on Windows')
def test_win32_ver(self):
release1, version1, csd1, ptype1 = 'a', 'b', 'c', 'd'
diff --git a/Lib/test/test_posixpath.py b/Lib/test/test_posixpath.py
index fa19d549c26..c70688ff5e9 100644
--- a/Lib/test/test_posixpath.py
+++ b/Lib/test/test_posixpath.py
@@ -229,6 +229,7 @@ class PosixPathTest(unittest.TestCase):
finally:
safe_rmdir(ABSTFN)
+ def test_ismount_invalid_paths(self):
self.assertIs(posixpath.ismount('/\udfff'), False)
self.assertIs(posixpath.ismount(b'/\xff'), False)
self.assertIs(posixpath.ismount('/\x00'), False)
@@ -489,6 +490,79 @@ class PosixPathTest(unittest.TestCase):
finally:
os_helper.unlink(ABSTFN)
+ def test_realpath_invalid_paths(self):
+ path = '/\x00'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(ValueError, realpath, path, strict=True)
+ path = b'/\x00'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(ValueError, realpath, path, strict=True)
+ path = '/nonexistent/x\x00'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ path = b'/nonexistent/x\x00'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ path = '/\x00/..'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(ValueError, realpath, path, strict=True)
+ path = b'/\x00/..'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(ValueError, realpath, path, strict=True)
+ path = '/nonexistent/x\x00/..'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ path = b'/nonexistent/x\x00/..'
+ self.assertRaises(ValueError, realpath, path, strict=False)
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+
+ path = '/\udfff'
+ if sys.platform == 'win32':
+ self.assertEqual(realpath(path, strict=False), path)
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ else:
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=True)
+ path = '/nonexistent/\udfff'
+ if sys.platform == 'win32':
+ self.assertEqual(realpath(path, strict=False), path)
+ else:
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ path = '/\udfff/..'
+ if sys.platform == 'win32':
+ self.assertEqual(realpath(path, strict=False), '/')
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ else:
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=True)
+ path = '/nonexistent/\udfff/..'
+ if sys.platform == 'win32':
+ self.assertEqual(realpath(path, strict=False), '/nonexistent')
+ else:
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+
+ path = b'/\xff'
+ if sys.platform == 'win32':
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ else:
+ self.assertEqual(realpath(path, strict=False), path)
+ if support.is_wasi:
+ self.assertRaises(OSError, realpath, path, strict=True)
+ else:
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ path = b'/nonexistent/\xff'
+ if sys.platform == 'win32':
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
+ else:
+ self.assertEqual(realpath(path, strict=False), path)
+ if support.is_wasi:
+ self.assertRaises(OSError, realpath, path, strict=True)
+ else:
+ self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
def test_realpath_relative(self):
diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py
index fc8114891d1..59f5d1f893f 100644
--- a/Lib/test/test_pyrepl/test_pyrepl.py
+++ b/Lib/test/test_pyrepl/test_pyrepl.py
@@ -10,7 +10,7 @@ import sys
import tempfile
from unittest import TestCase, skipUnless, skipIf
from unittest.mock import patch
-from test.support import force_not_colorized, make_clean_env
+from test.support import force_not_colorized, make_clean_env, Py_DEBUG
from test.support import SHORT_TIMEOUT, STDLIB_DIR
from test.support.import_helper import import_module
from test.support.os_helper import EnvironmentVarGuard, unlink
@@ -959,6 +959,26 @@ class TestPyReplModuleCompleter(TestCase):
output = reader.readline()
self.assertEqual(output, expected)
+ def test_builtin_completion_top_level(self):
+ import importlib
+ # Make iter_modules() search only the standard library.
+ # This makes the test more reliable in case there are
+ # other user packages/scripts on PYTHONPATH which can
+ # intefere with the completions.
+ lib_path = os.path.dirname(importlib.__path__[0])
+ sys.path = [lib_path]
+
+ cases = (
+ ("import bui\t\n", "import builtins"),
+ ("from bui\t\n", "from builtins"),
+ )
+ for code, expected in cases:
+ with self.subTest(code=code):
+ events = code_to_events(code)
+ reader = self.prepare_reader(events, namespace={})
+ output = reader.readline()
+ self.assertEqual(output, expected)
+
def test_relative_import_completions(self):
cases = (
("from .readl\t\n", "from .readline"),
@@ -1055,11 +1075,15 @@ class TestPyReplModuleCompleter(TestCase):
self.assertEqual(actual, parsed)
# The parser should not get tripped up by any
# other preceding statements
- code = f'import xyz\n{code}'
- with self.subTest(code=code):
+ _code = f'import xyz\n{code}'
+ parser = ImportParser(_code)
+ actual = parser.parse()
+ with self.subTest(code=_code):
self.assertEqual(actual, parsed)
- code = f'import xyz;{code}'
- with self.subTest(code=code):
+ _code = f'import xyz;{code}'
+ parser = ImportParser(_code)
+ actual = parser.parse()
+ with self.subTest(code=_code):
self.assertEqual(actual, parsed)
def test_parse_error(self):
@@ -1610,3 +1634,16 @@ class TestMain(ReplTestCase):
# Extra stuff (newline and `exit` rewrites) are necessary
# because of how run_repl works.
self.assertNotIn(">>> \n>>> >>>", cleaned_output)
+
+ @skipUnless(Py_DEBUG, '-X showrefcount requires a Python debug build')
+ def test_showrefcount(self):
+ env = os.environ.copy()
+ env.pop("PYTHON_BASIC_REPL", "")
+ output, _ = self.run_repl("1\n1+2\nexit()\n", cmdline_args=['-Xshowrefcount'], env=env)
+ matches = re.findall(r'\[-?\d+ refs, \d+ blocks\]', output)
+ self.assertEqual(len(matches), 3)
+
+ env["PYTHON_BASIC_REPL"] = "1"
+ output, _ = self.run_repl("1\n1+2\nexit()\n", cmdline_args=['-Xshowrefcount'], env=env)
+ matches = re.findall(r'\[-?\d+ refs, \d+ blocks\]', output)
+ self.assertEqual(len(matches), 3)
diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py
index 57526f88f93..1f655264f1c 100644
--- a/Lib/test/test_pyrepl/test_reader.py
+++ b/Lib/test/test_pyrepl/test_reader.py
@@ -517,6 +517,37 @@ class TestReaderInColor(ScreenEqualMixin, TestCase):
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected)
+ def test_syntax_highlighting_literal_brace_in_fstring_or_tstring(self):
+ code = dedent(
+ """\
+ f"{{"
+ f"}}"
+ f"a{{b"
+ f"a}}b"
+ f"a{{b}}c"
+ t"a{{b}}c"
+ f"{{{0}}}"
+ f"{ {0} }"
+ """
+ )
+ expected = dedent(
+ """\
+ {s}f"{z}{s}<<{z}{s}"{z}
+ {s}f"{z}{s}>>{z}{s}"{z}
+ {s}f"{z}{s}a<<{z}{s}b{z}{s}"{z}
+ {s}f"{z}{s}a>>{z}{s}b{z}{s}"{z}
+ {s}f"{z}{s}a<<{z}{s}b>>{z}{s}c{z}{s}"{z}
+ {s}t"{z}{s}a<<{z}{s}b>>{z}{s}c{z}{s}"{z}
+ {s}f"{z}{s}<<{z}{o}<{z}{n}0{z}{o}>{z}{s}>>{z}{s}"{z}
+ {s}f"{z}{o}<{z} {o}<{z}{n}0{z}{o}>{z} {o}>{z}{s}"{z}
+ """
+ ).format(**colors).replace("<", "{").replace(">", "}")
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events)
+ self.assert_screen_equal(reader, code, clean=True)
+ self.maxDiff=None
+ self.assert_screen_equal(reader, expected)
+
def test_control_characters(self):
code = 'flag = "🏳️‍🌈"'
events = code_to_events(code)
diff --git a/Lib/test/test_string/test_templatelib.py b/Lib/test/test_string/test_templatelib.py
index 5b9490c2be6..85fcff486d6 100644
--- a/Lib/test/test_string/test_templatelib.py
+++ b/Lib/test/test_string/test_templatelib.py
@@ -148,6 +148,13 @@ class TemplateIterTests(unittest.TestCase):
self.assertEqual(res[1].format_spec, '')
self.assertEqual(res[2], ' yz')
+ def test_exhausted(self):
+ # See https://github.com/python/cpython/issues/134119.
+ template_iter = iter(t"{1}")
+ self.assertIsInstance(next(template_iter), Interpolation)
+ self.assertRaises(StopIteration, next, template_iter)
+ self.assertRaises(StopIteration, next, template_iter)
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py
index 0eccf03a1a9..c7ac7914158 100644
--- a/Lib/test/test_syntax.py
+++ b/Lib/test/test_syntax.py
@@ -1431,6 +1431,23 @@ Better error message for using `except as` with not a name:
Traceback (most recent call last):
SyntaxError: cannot use except* statement with literal
+Regression tests for gh-133999:
+
+ >>> try: pass
+ ... except TypeError as name: raise from None
+ Traceback (most recent call last):
+ SyntaxError: invalid syntax
+
+ >>> try: pass
+ ... except* TypeError as name: raise from None
+ Traceback (most recent call last):
+ SyntaxError: invalid syntax
+
+ >>> match 1:
+ ... case 1 | 2 as abc: raise from None
+ Traceback (most recent call last):
+ SyntaxError: invalid syntax
+
Ensure that early = are not matched by the parser as invalid comparisons
>>> f(2, 4, x=34); 1 $ 2
Traceback (most recent call last):
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index 59ef5c99309..fb1c8492a64 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -1976,12 +1976,13 @@ class TestRemoteExec(unittest.TestCase):
def tearDown(self):
test.support.reap_children()
- def _run_remote_exec_test(self, script_code, python_args=None, env=None, prologue=''):
+ def _run_remote_exec_test(self, script_code, python_args=None, env=None,
+ prologue='',
+ script_path=os_helper.TESTFN + '_remote.py'):
# Create the script that will be remotely executed
- script = os_helper.TESTFN + '_remote.py'
- self.addCleanup(os_helper.unlink, script)
+ self.addCleanup(os_helper.unlink, script_path)
- with open(script, 'w') as f:
+ with open(script_path, 'w') as f:
f.write(script_code)
# Create and run the target process
@@ -2050,7 +2051,7 @@ sock.close()
self.assertEqual(response, b"ready")
# Try remote exec on the target process
- sys.remote_exec(proc.pid, script)
+ sys.remote_exec(proc.pid, script_path)
# Signal script to continue
client_socket.sendall(b"continue")
@@ -2073,14 +2074,32 @@ sock.close()
def test_remote_exec(self):
"""Test basic remote exec functionality"""
- script = '''
-print("Remote script executed successfully!")
-'''
+ script = 'print("Remote script executed successfully!")'
returncode, stdout, stderr = self._run_remote_exec_test(script)
# self.assertEqual(returncode, 0)
self.assertIn(b"Remote script executed successfully!", stdout)
self.assertEqual(stderr, b"")
+ def test_remote_exec_bytes(self):
+ script = 'print("Remote script executed successfully!")'
+ script_path = os.fsencode(os_helper.TESTFN) + b'_bytes_remote.py'
+ returncode, stdout, stderr = self._run_remote_exec_test(script,
+ script_path=script_path)
+ self.assertIn(b"Remote script executed successfully!", stdout)
+ self.assertEqual(stderr, b"")
+
+ @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 'requires undecodable path')
+ @unittest.skipIf(sys.platform == 'darwin',
+ 'undecodable paths are not supported on macOS')
+ def test_remote_exec_undecodable(self):
+ script = 'print("Remote script executed successfully!")'
+ script_path = os_helper.TESTFN_UNDECODABLE + b'_undecodable_remote.py'
+ for script_path in [script_path, os.fsdecode(script_path)]:
+ returncode, stdout, stderr = self._run_remote_exec_test(script,
+ script_path=script_path)
+ self.assertIn(b"Remote script executed successfully!", stdout)
+ self.assertEqual(stderr, b"")
+
def test_remote_exec_with_self_process(self):
"""Test remote exec with the target process being the same as the test process"""
@@ -2157,6 +2176,13 @@ raise Exception("Remote script exception")
with self.assertRaises(OSError):
sys.remote_exec(99999, "print('should not run')")
+ def test_remote_exec_invalid_script(self):
+ """Test remote exec with invalid script type"""
+ with self.assertRaises(TypeError):
+ sys.remote_exec(0, None)
+ with self.assertRaises(TypeError):
+ sys.remote_exec(0, 123)
+
def test_remote_exec_syntax_error(self):
"""Test remote exec with syntax error in script"""
script = '''
diff --git a/Lib/test/test_sysconfig.py b/Lib/test/test_sysconfig.py
index 53e55383bf9..963cf753ce6 100644
--- a/Lib/test/test_sysconfig.py
+++ b/Lib/test/test_sysconfig.py
@@ -531,13 +531,10 @@ class TestSysConfig(unittest.TestCase, VirtualEnvironmentMixin):
Python_h = os.path.join(srcdir, 'Include', 'Python.h')
self.assertTrue(os.path.exists(Python_h), Python_h)
# <srcdir>/PC/pyconfig.h.in always exists even if unused
- pyconfig_h = os.path.join(srcdir, 'PC', 'pyconfig.h.in')
- self.assertTrue(os.path.exists(pyconfig_h), pyconfig_h)
pyconfig_h_in = os.path.join(srcdir, 'pyconfig.h.in')
self.assertTrue(os.path.exists(pyconfig_h_in), pyconfig_h_in)
if os.name == 'nt':
- # <executable dir>/pyconfig.h exists on Windows in a build tree
- pyconfig_h = os.path.join(sys.executable, '..', 'pyconfig.h')
+ pyconfig_h = os.path.join(srcdir, 'PC', 'pyconfig.h')
self.assertTrue(os.path.exists(pyconfig_h), pyconfig_h)
elif os.name == 'posix':
makefile_dir = os.path.dirname(sysconfig.get_makefile_filename())
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index 2d9649237a9..2018a20afd1 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -3490,11 +3490,12 @@ class ArchiveMaker:
with t.open() as tar:
... # `tar` is now a TarFile with 'filename' in it!
"""
- def __init__(self):
+ def __init__(self, **kwargs):
self.bio = io.BytesIO()
+ self.tar_kwargs = dict(kwargs)
def __enter__(self):
- self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio)
+ self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio, **self.tar_kwargs)
return self
def __exit__(self, *exc):
@@ -4073,7 +4074,10 @@ class TestExtractionFilters(unittest.TestCase):
# that in the test archive.)
with tarfile.TarFile.open(tarname) as tar:
for tarinfo in tar.getmembers():
- filtered = tarfile.tar_filter(tarinfo, '')
+ try:
+ filtered = tarfile.tar_filter(tarinfo, '')
+ except UnicodeEncodeError:
+ continue
self.assertIs(filtered.name, tarinfo.name)
self.assertIs(filtered.type, tarinfo.type)
@@ -4084,11 +4088,48 @@ class TestExtractionFilters(unittest.TestCase):
for tarinfo in tar.getmembers():
try:
filtered = tarfile.data_filter(tarinfo, '')
- except tarfile.FilterError:
+ except (tarfile.FilterError, UnicodeEncodeError):
continue
self.assertIs(filtered.name, tarinfo.name)
self.assertIs(filtered.type, tarinfo.type)
+ @unittest.skipIf(sys.platform == 'win32', 'requires native bytes paths')
+ def test_filter_unencodable(self):
+ # Sanity check using a valid path.
+ tarinfo = tarfile.TarInfo(os_helper.TESTFN)
+ filtered = tarfile.tar_filter(tarinfo, '')
+ self.assertIs(filtered.name, tarinfo.name)
+ filtered = tarfile.data_filter(tarinfo, '')
+ self.assertIs(filtered.name, tarinfo.name)
+
+ tarinfo = tarfile.TarInfo('test\x00')
+ self.assertRaises(ValueError, tarfile.tar_filter, tarinfo, '')
+ self.assertRaises(ValueError, tarfile.data_filter, tarinfo, '')
+ tarinfo = tarfile.TarInfo('\ud800')
+ self.assertRaises(UnicodeEncodeError, tarfile.tar_filter, tarinfo, '')
+ self.assertRaises(UnicodeEncodeError, tarfile.data_filter, tarinfo, '')
+
+ @unittest.skipIf(sys.platform == 'win32', 'requires native bytes paths')
+ def test_extract_unencodable(self):
+ # Create a member with name \xed\xa0\x80 which is UTF-8 encoded
+ # lone surrogate \ud800.
+ with ArchiveMaker(encoding='ascii', errors='surrogateescape') as arc:
+ arc.add('\udced\udca0\udc80')
+ with os_helper.temp_cwd() as tmp:
+ tar = arc.open(encoding='utf-8', errors='surrogatepass',
+ errorlevel=1)
+ self.assertEqual(tar.getnames(), ['\ud800'])
+ with self.assertRaises(UnicodeEncodeError):
+ tar.extractall()
+ self.assertEqual(os.listdir(), [])
+
+ tar = arc.open(encoding='utf-8', errors='surrogatepass',
+ errorlevel=0, debug=1)
+ with support.captured_stderr() as stderr:
+ tar.extractall()
+ self.assertEqual(os.listdir(), [])
+ self.assertIn('tarfile: UnicodeEncodeError ', stderr.getvalue())
+
def test_change_default_filter_on_instance(self):
tar = tarfile.TarFile(tarname, 'r')
def strict_filter(tarinfo, path):
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index abe63c10c0a..b6e2d419019 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -2137,8 +2137,7 @@ class CRLockTests(lock_tests.RLockTests):
]
for args, kwargs in arg_types:
with self.subTest(args=args, kwargs=kwargs):
- with self.assertWarns(DeprecationWarning):
- threading.RLock(*args, **kwargs)
+ self.assertRaises(TypeError, threading.RLock, *args, **kwargs)
# Subtypes with custom `__init__` are allowed (but, not recommended):
class CustomRLock(self.locktype):
@@ -2156,6 +2155,9 @@ class ConditionAsRLockTests(lock_tests.RLockTests):
# Condition uses an RLock by default and exports its API.
locktype = staticmethod(threading.Condition)
+ def test_constructor_noargs(self):
+ self.skipTest("Condition allows positional arguments")
+
def test_recursion_count(self):
self.skipTest("Condition does not expose _recursion_count()")
diff --git a/Lib/test/test_typing.py b/Lib/test/test_typing.py
index bc03e6bf2d7..246be22a0d8 100644
--- a/Lib/test/test_typing.py
+++ b/Lib/test/test_typing.py
@@ -8487,6 +8487,36 @@ class TypedDictTests(BaseTestCase):
self.assertEqual(Child.__required_keys__, frozenset(['a']))
self.assertEqual(Child.__optional_keys__, frozenset())
+ def test_inheritance_pep563(self):
+ def _make_td(future, class_name, annos, base, extra_names=None):
+ lines = []
+ if future:
+ lines.append('from __future__ import annotations')
+ lines.append('from typing import TypedDict')
+ lines.append(f'class {class_name}({base}):')
+ for name, anno in annos.items():
+ lines.append(f' {name}: {anno}')
+ code = '\n'.join(lines)
+ ns = run_code(code, extra_names)
+ return ns[class_name]
+
+ for base_future in (True, False):
+ for child_future in (True, False):
+ with self.subTest(base_future=base_future, child_future=child_future):
+ base = _make_td(
+ base_future, "Base", {"base": "int"}, "TypedDict"
+ )
+ self.assertIsNotNone(base.__annotate__)
+ child = _make_td(
+ child_future, "Child", {"child": "int"}, "Base", {"Base": base}
+ )
+ base_anno = ForwardRef("int", module="builtins") if base_future else int
+ child_anno = ForwardRef("int", module="builtins") if child_future else int
+ self.assertEqual(base.__annotations__, {'base': base_anno})
+ self.assertEqual(
+ child.__annotations__, {'child': child_anno, 'base': base_anno}
+ )
+
def test_required_notrequired_keys(self):
self.assertEqual(NontotalMovie.__required_keys__,
frozenset({"title"}))
@@ -10668,6 +10698,9 @@ class UnionGenericAliasTests(BaseTestCase):
with self.assertWarns(DeprecationWarning):
self.assertNotEqual(int, typing._UnionGenericAlias)
+ def test_hashable(self):
+ self.assertEqual(hash(typing._UnionGenericAlias), hash(Union))
+
def load_tests(loader, tests, pattern):
import doctest
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
index c965860fbb1..bc1030eea60 100644
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -109,7 +109,7 @@ class urlopen_FileTests(unittest.TestCase):
finally:
f.close()
self.pathname = os_helper.TESTFN
- self.quoted_pathname = urllib.parse.quote(self.pathname)
+ self.quoted_pathname = urllib.parse.quote(os.fsencode(self.pathname))
self.returned_obj = urllib.request.urlopen("file:%s" % self.quoted_pathname)
def tearDown(self):
diff --git a/Lib/test/test_wave.py b/Lib/test/test_wave.py
index 5e771c8de96..6c3362857fc 100644
--- a/Lib/test/test_wave.py
+++ b/Lib/test/test_wave.py
@@ -136,32 +136,6 @@ class MiscTestCase(unittest.TestCase):
not_exported = {'WAVE_FORMAT_PCM', 'WAVE_FORMAT_EXTENSIBLE', 'KSDATAFORMAT_SUBTYPE_PCM'}
support.check__all__(self, wave, not_exported=not_exported)
- def test_read_deprecations(self):
- filename = support.findfile('pluck-pcm8.wav', subdir='audiodata')
- with wave.open(filename) as reader:
- with self.assertWarns(DeprecationWarning):
- with self.assertRaises(wave.Error):
- reader.getmark('mark')
- with self.assertWarns(DeprecationWarning):
- self.assertIsNone(reader.getmarkers())
-
- def test_write_deprecations(self):
- with io.BytesIO(b'') as tmpfile:
- with wave.open(tmpfile, 'wb') as writer:
- writer.setnchannels(1)
- writer.setsampwidth(1)
- writer.setframerate(1)
- writer.setcomptype('NONE', 'not compressed')
-
- with self.assertWarns(DeprecationWarning):
- with self.assertRaises(wave.Error):
- writer.setmark(0, 0, 'mark')
- with self.assertWarns(DeprecationWarning):
- with self.assertRaises(wave.Error):
- writer.getmark('mark')
- with self.assertWarns(DeprecationWarning):
- self.assertIsNone(writer.getmarkers())
-
class WaveLowLevelTest(unittest.TestCase):
diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py
index ae898150658..43056978848 100644
--- a/Lib/test/test_zipfile/test_core.py
+++ b/Lib/test/test_zipfile/test_core.py
@@ -3642,7 +3642,7 @@ class EncodedMetadataTests(unittest.TestCase):
except OSError:
pass
except UnicodeEncodeError:
- self.skipTest(f'cannot encode file name {fn!r}')
+ self.skipTest(f'cannot encode file name {fn!a}')
zipfile.main(["--metadata-encoding=shift_jis", "-e", TESTFN, TESTFN2])
listing = os.listdir(TESTFN2)
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py
index 713294c4c27..53ca592ea38 100644
--- a/Lib/test/test_zstd.py
+++ b/Lib/test/test_zstd.py
@@ -288,8 +288,8 @@ class CompressorTestCase(unittest.TestCase):
KEY = 100001234
option = {CompressionParameter.compression_level: 10,
KEY: 200000000}
- pattern = r'Zstd compression parameter.*?"unknown parameter \(key %d\)"' \
- % KEY
+ pattern = (r'Invalid zstd compression parameter.*?'
+ fr'"unknown parameter \(key {KEY}\)"')
with self.assertRaisesRegex(ZstdError, pattern):
ZstdCompressor(options=option)
@@ -420,8 +420,8 @@ class DecompressorTestCase(unittest.TestCase):
KEY = 100001234
options = {DecompressionParameter.window_log_max: DecompressionParameter.window_log_max.bounds()[1],
KEY: 200000000}
- pattern = r'Zstd decompression parameter.*?"unknown parameter \(key %d\)"' \
- % KEY
+ pattern = (r'Invalid zstd decompression parameter.*?'
+ fr'"unknown parameter \(key {KEY}\)"')
with self.assertRaisesRegex(ZstdError, pattern):
ZstdDecompressor(options=options)
@@ -507,7 +507,7 @@ class DecompressorTestCase(unittest.TestCase):
self.assertFalse(d.needs_input)
def test_decompressor_arg(self):
- zd = ZstdDict(b'12345678', True)
+ zd = ZstdDict(b'12345678', is_raw=True)
with self.assertRaises(TypeError):
d = ZstdDecompressor(zstd_dict={})
@@ -1021,6 +1021,10 @@ class DecompressorFlagsTestCase(unittest.TestCase):
class ZstdDictTestCase(unittest.TestCase):
def test_is_raw(self):
+ # must be passed as a keyword argument
+ with self.assertRaises(TypeError):
+ ZstdDict(bytes(8), True)
+
# content < 8
b = b'1234567'
with self.assertRaises(ValueError):
@@ -1068,9 +1072,9 @@ class ZstdDictTestCase(unittest.TestCase):
# corrupted
zd = ZstdDict(dict_content, is_raw=False)
- with self.assertRaisesRegex(ZstdError, r'ZSTD_CDict.*?corrupted'):
+ with self.assertRaisesRegex(ZstdError, r'ZSTD_CDict.*?content\.$'):
ZstdCompressor(zstd_dict=zd.as_digested_dict)
- with self.assertRaisesRegex(ZstdError, r'ZSTD_DDict.*?corrupted'):
+ with self.assertRaisesRegex(ZstdError, r'ZSTD_DDict.*?content\.$'):
ZstdDecompressor(zd)
# wrong type
@@ -1096,7 +1100,7 @@ class ZstdDictTestCase(unittest.TestCase):
TRAINED_DICT = train_dict(SAMPLES, DICT_SIZE1)
- ZstdDict(TRAINED_DICT.dict_content, False)
+ ZstdDict(TRAINED_DICT.dict_content, is_raw=False)
self.assertNotEqual(TRAINED_DICT.dict_id, 0)
self.assertGreater(len(TRAINED_DICT.dict_content), 0)
@@ -1250,7 +1254,7 @@ class ZstdDictTestCase(unittest.TestCase):
def test_as_prefix(self):
# V1
V1 = THIS_FILE_BYTES
- zd = ZstdDict(V1, True)
+ zd = ZstdDict(V1, is_raw=True)
# V2
mid = len(V1) // 2
@@ -1266,7 +1270,7 @@ class ZstdDictTestCase(unittest.TestCase):
self.assertEqual(decompress(dat, zd.as_prefix), V2)
# use wrong prefix
- zd2 = ZstdDict(SAMPLES[0], True)
+ zd2 = ZstdDict(SAMPLES[0], is_raw=True)
try:
decompressed = decompress(dat, zd2.as_prefix)
except ZstdError: # expected
@@ -2426,6 +2430,7 @@ class OpenTestCase(unittest.TestCase):
self.assertEqual(f.write(arr), LENGTH)
self.assertEqual(f.tell(), LENGTH)
+@unittest.skip("it fails for now, see gh-133885")
class FreeThreadingMethodTests(unittest.TestCase):
@unittest.skipUnless(Py_GIL_DISABLED, 'this test can only possibly fail with GIL disabled')