diff options
Diffstat (limited to 'Lib/test')
47 files changed, 1217 insertions, 280 deletions
diff --git a/Lib/test/.ruff.toml b/Lib/test/.ruff.toml index a1eac32a83a..7aa8a4785d6 100644 --- a/Lib/test/.ruff.toml +++ b/Lib/test/.ruff.toml @@ -9,8 +9,9 @@ extend-exclude = [ "encoded_modules/module_iso_8859_1.py", "encoded_modules/module_koi8_r.py", # SyntaxError because of t-strings - "test_tstring.py", + "test_annotationlib.py", "test_string/test_templatelib.py", + "test_tstring.py", # New grammar constructions may not yet be recognized by Ruff, # and tests re-use the same names as only the grammar is being checked. "test_grammar.py", diff --git a/Lib/test/_code_definitions.py b/Lib/test/_code_definitions.py index c3daa0dccf5..733a15b25f6 100644 --- a/Lib/test/_code_definitions.py +++ b/Lib/test/_code_definitions.py @@ -1,4 +1,32 @@ +def simple_script(): + assert True + + +def complex_script(): + obj = 'a string' + pickle = __import__('pickle') + def spam_minimal(): + pass + spam_minimal() + data = pickle.dumps(obj) + res = pickle.loads(data) + assert res == obj, (res, obj) + + +def script_with_globals(): + obj1, obj2 = spam(42) + assert obj1 == 42 + assert obj2 is None + + +def script_with_explicit_empty_return(): + return None + + +def script_with_return(): + return True + def spam_minimal(): # no arg defaults or kwarg defaults @@ -141,6 +169,11 @@ ham_C_closure, *_ = eggs_closure_C(2) TOP_FUNCTIONS = [ # shallow + simple_script, + complex_script, + script_with_globals, + script_with_explicit_empty_return, + script_with_return, spam_minimal, spam_with_builtins, spam_with_globals_and_builtins, @@ -178,6 +211,52 @@ FUNCTIONS = [ *NESTED_FUNCTIONS, ] +STATELESS_FUNCTIONS = [ + simple_script, + complex_script, + script_with_explicit_empty_return, + script_with_return, + spam, + spam_minimal, + spam_with_builtins, + spam_args_attrs_and_builtins, + spam_returns_arg, + spam_annotated, + spam_with_inner_not_closure, + spam_with_inner_closure, + spam_N, + spam_C, + spam_NN, + spam_NC, + spam_CN, + spam_CC, + eggs_nested, + eggs_nested_N, + ham_nested, + ham_C_nested +] +STATELESS_CODE = [ + *STATELESS_FUNCTIONS, + script_with_globals, + spam_with_globals_and_builtins, + spam_full, +] + +PURE_SCRIPT_FUNCTIONS = [ + simple_script, + complex_script, + script_with_explicit_empty_return, + spam_minimal, + spam_with_builtins, + spam_with_inner_not_closure, + spam_with_inner_closure, +] +SCRIPT_FUNCTIONS = [ + *PURE_SCRIPT_FUNCTIONS, + script_with_globals, + spam_with_globals_and_builtins, +] + # generators diff --git a/Lib/test/_test_gc_fast_cycles.py b/Lib/test/_test_gc_fast_cycles.py new file mode 100644 index 00000000000..4e2c7d72a02 --- /dev/null +++ b/Lib/test/_test_gc_fast_cycles.py @@ -0,0 +1,48 @@ +# Run by test_gc. +from test import support +import _testinternalcapi +import gc +import unittest + +class IncrementalGCTests(unittest.TestCase): + + # Use small increments to emulate longer running process in a shorter time + @support.gc_threshold(200, 10) + def test_incremental_gc_handles_fast_cycle_creation(self): + + class LinkedList: + + #Use slots to reduce number of implicit objects + __slots__ = "next", "prev", "surprise" + + def __init__(self, next=None, prev=None): + self.next = next + if next is not None: + next.prev = self + self.prev = prev + if prev is not None: + prev.next = self + + def make_ll(depth): + head = LinkedList() + for i in range(depth): + head = LinkedList(head, head.prev) + return head + + head = make_ll(1000) + + assert(gc.isenabled()) + olds = [] + initial_heap_size = _testinternalcapi.get_tracked_heap_size() + for i in range(20_000): + newhead = make_ll(20) + newhead.surprise = head + olds.append(newhead) + if len(olds) == 20: + new_objects = _testinternalcapi.get_tracked_heap_size() - initial_heap_size + self.assertLess(new_objects, 27_000, f"Heap growing. Reached limit after {i} iterations") + del olds[:] + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 4dc9a31d22f..1b690cb88bf 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2463,6 +2463,12 @@ class _TestValue(BaseTestCase): self.assertNotHasAttr(arr5, 'get_lock') self.assertNotHasAttr(arr5, 'get_obj') + @unittest.skipIf(c_int is None, "requires _ctypes") + def test_invalid_typecode(self): + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.Value('x', None) + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.RawValue('x', None) class _TestArray(BaseTestCase): @@ -2543,6 +2549,12 @@ class _TestArray(BaseTestCase): self.assertNotHasAttr(arr5, 'get_lock') self.assertNotHasAttr(arr5, 'get_obj') + @unittest.skipIf(c_int is None, "requires _ctypes") + def test_invalid_typecode(self): + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.Array('x', []) + with self.assertRaisesRegex(TypeError, 'bad typecode'): + self.RawArray('x', []) # # # diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py index 55844ec35a9..d1882a310bb 100644 --- a/Lib/test/datetimetester.py +++ b/Lib/test/datetimetester.py @@ -773,6 +773,9 @@ class TestTimeDelta(HarmlessMixedComparison, unittest.TestCase): microseconds=999999)), "999999999 days, 23:59:59.999999") + # test the Doc/library/datetime.rst recipe + eq(f'-({-td(hours=-1)!s})', "-(1:00:00)") + def test_repr(self): name = 'datetime.' + self.theclass.__name__ self.assertEqual(repr(self.theclass(1)), diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index 63a2e427d18..72b8ea89e62 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -31,7 +31,7 @@ WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_' EXIT_TIMEOUT = 120.0 -ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network', +ALL_RESOURCES = ('audio', 'console', 'curses', 'largefile', 'network', 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime') # Other resources excluded from --use=all: diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index bdc7ef62943..dcba6369541 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -2272,7 +2272,11 @@ class AbstractPicklingErrorTests: def test_nested_lookup_error(self): # Nested name does not exist - obj = REX('AbstractPickleTests.spam') + global TestGlobal + class TestGlobal: + class A: + pass + obj = REX('TestGlobal.A.B.C') obj.__module__ = __name__ for proto in protocols: with self.subTest(proto=proto): @@ -2280,9 +2284,9 @@ class AbstractPicklingErrorTests: self.dumps(obj, proto) self.assertEqual(str(cm.exception), f"Can't pickle {obj!r}: " - f"it's not found as {__name__}.AbstractPickleTests.spam") + f"it's not found as {__name__}.TestGlobal.A.B.C") self.assertEqual(str(cm.exception.__context__), - "type object 'AbstractPickleTests' has no attribute 'spam'") + "type object 'A' has no attribute 'B'") obj.__module__ = None for proto in protocols: @@ -2290,21 +2294,25 @@ class AbstractPicklingErrorTests: with self.assertRaises(pickle.PicklingError) as cm: self.dumps(obj, proto) self.assertEqual(str(cm.exception), - f"Can't pickle {obj!r}: it's not found as __main__.AbstractPickleTests.spam") + f"Can't pickle {obj!r}: " + f"it's not found as __main__.TestGlobal.A.B.C") self.assertEqual(str(cm.exception.__context__), - "module '__main__' has no attribute 'AbstractPickleTests'") + "module '__main__' has no attribute 'TestGlobal'") def test_wrong_object_lookup_error(self): # Name is bound to different object - obj = REX('AbstractPickleTests') + global TestGlobal + class TestGlobal: + pass + obj = REX('TestGlobal') obj.__module__ = __name__ - AbstractPickleTests.ham = [] for proto in protocols: with self.subTest(proto=proto): with self.assertRaises(pickle.PicklingError) as cm: self.dumps(obj, proto) self.assertEqual(str(cm.exception), - f"Can't pickle {obj!r}: it's not the same object as {__name__}.AbstractPickleTests") + f"Can't pickle {obj!r}: " + f"it's not the same object as {__name__}.TestGlobal") self.assertIsNone(cm.exception.__context__) obj.__module__ = None @@ -2313,9 +2321,10 @@ class AbstractPicklingErrorTests: with self.assertRaises(pickle.PicklingError) as cm: self.dumps(obj, proto) self.assertEqual(str(cm.exception), - f"Can't pickle {obj!r}: it's not found as __main__.AbstractPickleTests") + f"Can't pickle {obj!r}: " + f"it's not found as __main__.TestGlobal") self.assertEqual(str(cm.exception.__context__), - "module '__main__' has no attribute 'AbstractPickleTests'") + "module '__main__' has no attribute 'TestGlobal'") def test_local_lookup_error(self): # Test that whichmodule() errors out cleanly when looking up diff --git a/Lib/test/support/strace_helper.py b/Lib/test/support/strace_helper.py index 798d6c68869..1a9d2b520b7 100644 --- a/Lib/test/support/strace_helper.py +++ b/Lib/test/support/strace_helper.py @@ -178,7 +178,10 @@ def get_syscalls(code, strace_flags, prelude="", cleanup="", # Moderately expensive (spawns a subprocess), so share results when possible. @cache def _can_strace(): - res = strace_python("import sys; sys.exit(0)", [], check=False) + res = strace_python("import sys; sys.exit(0)", + # --trace option needs strace 5.5 (gh-133741) + ["--trace=%process"], + check=False) if res.strace_returncode == 0 and res.python_returncode == 0: assert res.events(), "Should have parsed multiple calls" return True diff --git a/Lib/test/test_annotationlib.py b/Lib/test/test_annotationlib.py index c3c245ddaf8..73a821d15e3 100644 --- a/Lib/test/test_annotationlib.py +++ b/Lib/test/test_annotationlib.py @@ -7,6 +7,7 @@ import collections import functools import itertools import pickle +from string.templatelib import Interpolation, Template import typing import unittest from annotationlib import ( @@ -273,6 +274,43 @@ class TestStringFormat(unittest.TestCase): }, ) + def test_template_str(self): + def f( + x: t"{a}", + y: list[t"{a}"], + z: t"{a:b} {c!r} {d!s:t}", + a: t"a{b}c{d}e{f}g", + b: t"{a:{1}}", + c: t"{a | b * c}", + ): pass + + annos = get_annotations(f, format=Format.STRING) + self.assertEqual(annos, { + "x": "t'{a}'", + "y": "list[t'{a}']", + "z": "t'{a:b} {c!r} {d!s:t}'", + "a": "t'a{b}c{d}e{f}g'", + # interpolations in the format spec are eagerly evaluated so we can't recover the source + "b": "t'{a:1}'", + "c": "t'{a | b * c}'", + }) + + def g( + x: t"{a}", + ): ... + + annos = get_annotations(g, format=Format.FORWARDREF) + templ = annos["x"] + # Template and Interpolation don't have __eq__ so we have to compare manually + self.assertIsInstance(templ, Template) + self.assertEqual(templ.strings, ("", "")) + self.assertEqual(len(templ.interpolations), 1) + interp = templ.interpolations[0] + self.assertEqual(interp.value, support.EqualToForwardRef("a", owner=g)) + self.assertEqual(interp.expression, "a") + self.assertIsNone(interp.conversion) + self.assertEqual(interp.format_spec, "") + def test_getitem(self): def f(x: undef1[str, undef2]): pass @@ -340,7 +378,7 @@ class TestStringFormat(unittest.TestCase): def g( w: a[[int, str], float], - x: a[{int, str}, 3], + x: a[{int}, 3], y: a[{int: str}, 4], z: a[(int, str), 5], ): @@ -350,7 +388,7 @@ class TestStringFormat(unittest.TestCase): anno, { "w": "a[[int, str], float]", - "x": "a[{int, str}, 3]", + "x": "a[{int}, 3]", "y": "a[{int: str}, 4]", "z": "a[(int, str), 5]", }, diff --git a/Lib/test/test_ast/test_ast.py b/Lib/test/test_ast/test_ast.py index 02628868db0..0776559b900 100644 --- a/Lib/test/test_ast/test_ast.py +++ b/Lib/test/test_ast/test_ast.py @@ -1315,6 +1315,15 @@ class CopyTests(unittest.TestCase): self.assertIs(repl.id, 'y') self.assertIs(repl.ctx, context) + def test_replace_accept_missing_field_with_default(self): + node = ast.FunctionDef(name="foo", args=ast.arguments()) + self.assertIs(node.returns, None) + self.assertEqual(node.decorator_list, []) + node2 = copy.replace(node, name="bar") + self.assertEqual(node2.name, "bar") + self.assertIs(node2.returns, None) + self.assertEqual(node2.decorator_list, []) + def test_replace_reject_known_custom_instance_fields_commits(self): node = ast.parse('x').body[0].value node.extra = extra = object() # add instance 'extra' field diff --git a/Lib/test/test_asyncio/test_ssl.py b/Lib/test/test_asyncio/test_ssl.py index 986ecc2c5a9..3a7185cd897 100644 --- a/Lib/test/test_asyncio/test_ssl.py +++ b/Lib/test/test_asyncio/test_ssl.py @@ -195,9 +195,10 @@ class TestSSL(test_utils.TestCase): except (BrokenPipeError, ConnectionError): pass - def test_create_server_ssl_1(self): + @support.bigmemtest(size=25, memuse=90*2**20, dry_run=False) + def test_create_server_ssl_1(self, size): CNT = 0 # number of clients that were successful - TOTAL_CNT = 25 # total number of clients that test will create + TOTAL_CNT = size # total number of clients that test will create TIMEOUT = support.LONG_TIMEOUT # timeout for this test A_DATA = b'A' * 1024 * BUF_MULTIPLIER @@ -1038,9 +1039,10 @@ class TestSSL(test_utils.TestCase): self.loop.run_until_complete(run_main()) - def test_create_server_ssl_over_ssl(self): + @support.bigmemtest(size=25, memuse=90*2**20, dry_run=False) + def test_create_server_ssl_over_ssl(self, size): CNT = 0 # number of clients that were successful - TOTAL_CNT = 25 # total number of clients that test will create + TOTAL_CNT = size # total number of clients that test will create TIMEOUT = support.LONG_TIMEOUT # timeout for this test A_DATA = b'A' * 1024 * BUF_MULTIPLIER diff --git a/Lib/test/test_capi/test_import.py b/Lib/test/test_capi/test_import.py index 25136624ca4..57e0316fda8 100644 --- a/Lib/test/test_capi/test_import.py +++ b/Lib/test/test_capi/test_import.py @@ -134,7 +134,7 @@ class ImportTests(unittest.TestCase): # CRASHES importmodule(NULL) def test_importmodulenoblock(self): - # Test deprecated PyImport_ImportModuleNoBlock() + # Test deprecated (stable ABI only) PyImport_ImportModuleNoBlock() importmodulenoblock = _testlimitedcapi.PyImport_ImportModuleNoBlock with check_warnings(('', DeprecationWarning)): self.check_import_func(importmodulenoblock) diff --git a/Lib/test/test_capi/test_opt.py b/Lib/test/test_capi/test_opt.py index ba7bcb4540a..651148336f7 100644 --- a/Lib/test/test_capi/test_opt.py +++ b/Lib/test/test_capi/test_opt.py @@ -1942,6 +1942,23 @@ class TestUopsOptimization(unittest.TestCase): self.assertNotIn("_COMPARE_OP_INT", uops) self.assertNotIn("_GUARD_IS_TRUE_POP", uops) + def test_call_isinstance_guards_removed(self): + def testfunc(n): + x = 0 + for _ in range(n): + y = isinstance(42, int) + if y: + x += 1 + return x + + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD) + self.assertIsNotNone(ex) + uops = get_opnames(ex) + self.assertIn("_CALL_ISINSTANCE", uops) + self.assertNotIn("_GUARD_THIRD_NULL", uops) + self.assertNotIn("_GUARD_CALLABLE_ISINSTANCE", uops) + def global_identity(x): return x diff --git a/Lib/test/test_clinic.py b/Lib/test/test_clinic.py index 6461b647925..f7fc3b38733 100644 --- a/Lib/test/test_clinic.py +++ b/Lib/test/test_clinic.py @@ -2835,6 +2835,10 @@ class ClinicExternalTest(TestCase): "size_t", "slice_index", "str", + "uint16", + "uint32", + "uint64", + "uint8", "unicode", "unsigned_char", "unsigned_int", diff --git a/Lib/test/test_code.py b/Lib/test/test_code.py index b646042a3b8..32cf8aacaf6 100644 --- a/Lib/test/test_code.py +++ b/Lib/test/test_code.py @@ -220,6 +220,7 @@ try: import _testinternalcapi except ModuleNotFoundError: _testinternalcapi = None +import test._code_definitions as defs COPY_FREE_VARS = opmap['COPY_FREE_VARS'] @@ -671,8 +672,21 @@ class CodeTest(unittest.TestCase): VARARGS = CO_FAST_LOCAL | CO_FAST_ARG_VAR | CO_FAST_ARG_POS VARKWARGS = CO_FAST_LOCAL | CO_FAST_ARG_VAR | CO_FAST_ARG_KW - import test._code_definitions as defs funcs = { + defs.simple_script: {}, + defs.complex_script: { + 'obj': CO_FAST_LOCAL, + 'pickle': CO_FAST_LOCAL, + 'spam_minimal': CO_FAST_LOCAL, + 'data': CO_FAST_LOCAL, + 'res': CO_FAST_LOCAL, + }, + defs.script_with_globals: { + 'obj1': CO_FAST_LOCAL, + 'obj2': CO_FAST_LOCAL, + }, + defs.script_with_explicit_empty_return: {}, + defs.script_with_return: {}, defs.spam_minimal: {}, defs.spam_with_builtins: { 'x': CO_FAST_LOCAL, @@ -897,8 +911,20 @@ class CodeTest(unittest.TestCase): }, } - import test._code_definitions as defs funcs = { + defs.simple_script: new_var_counts(), + defs.complex_script: new_var_counts( + purelocals=5, + globalvars=1, + attrs=2, + ), + defs.script_with_globals: new_var_counts( + purelocals=2, + globalvars=1, + ), + defs.script_with_explicit_empty_return: new_var_counts(), + defs.script_with_return: new_var_counts(), + defs.spam_minimal: new_var_counts(), defs.spam_minimal: new_var_counts(), defs.spam_with_builtins: new_var_counts( purelocals=4, @@ -1025,42 +1051,35 @@ class CodeTest(unittest.TestCase): counts = _testinternalcapi.get_code_var_counts(func.__code__) self.assertEqual(counts, expected) - def func_with_globals_and_builtins(): - mod1 = _testinternalcapi - mod2 = dis - mods = (mod1, mod2) - checks = tuple(callable(m) for m in mods) - return callable(mod2), tuple(mods), list(mods), checks - - func = func_with_globals_and_builtins + func = defs.spam_with_globals_and_builtins with self.subTest(f'{func} code'): expected = new_var_counts( - purelocals=4, - globalvars=5, + purelocals=5, + globalvars=6, ) counts = _testinternalcapi.get_code_var_counts(func.__code__) self.assertEqual(counts, expected) with self.subTest(f'{func} with own globals and builtins'): expected = new_var_counts( - purelocals=4, - globalvars=(2, 3), + purelocals=5, + globalvars=(2, 4), ) counts = _testinternalcapi.get_code_var_counts(func) self.assertEqual(counts, expected) with self.subTest(f'{func} without globals'): expected = new_var_counts( - purelocals=4, - globalvars=(0, 3, 2), + purelocals=5, + globalvars=(0, 4, 2), ) counts = _testinternalcapi.get_code_var_counts(func, globalsns={}) self.assertEqual(counts, expected) with self.subTest(f'{func} without both'): expected = new_var_counts( - purelocals=4, - globalvars=5, + purelocals=5, + globalvars=6, ) counts = _testinternalcapi.get_code_var_counts(func, globalsns={}, builtinsns={}) @@ -1068,12 +1087,34 @@ class CodeTest(unittest.TestCase): with self.subTest(f'{func} without builtins'): expected = new_var_counts( - purelocals=4, - globalvars=(2, 0, 3), + purelocals=5, + globalvars=(2, 0, 4), ) counts = _testinternalcapi.get_code_var_counts(func, builtinsns={}) self.assertEqual(counts, expected) + @unittest.skipIf(_testinternalcapi is None, "missing _testinternalcapi") + def test_stateless(self): + self.maxDiff = None + + for func in defs.STATELESS_CODE: + with self.subTest((func, '(code)')): + _testinternalcapi.verify_stateless_code(func.__code__) + for func in defs.STATELESS_FUNCTIONS: + with self.subTest((func, '(func)')): + _testinternalcapi.verify_stateless_code(func) + + for func in defs.FUNCTIONS: + if func not in defs.STATELESS_CODE: + with self.subTest((func, '(code)')): + with self.assertRaises(Exception): + _testinternalcapi.verify_stateless_code(func.__code__) + + if func not in defs.STATELESS_FUNCTIONS: + with self.subTest((func, '(func)')): + with self.assertRaises(Exception): + _testinternalcapi.verify_stateless_code(func) + def isinterned(s): return s is sys.intern(('_' + s + '_')[1:-1]) diff --git a/Lib/test/test_codeop.py b/Lib/test/test_codeop.py index 0eefc22d11b..ed10bd3dcb6 100644 --- a/Lib/test/test_codeop.py +++ b/Lib/test/test_codeop.py @@ -322,7 +322,7 @@ class CodeopTests(unittest.TestCase): dedent("""\ def foo(x,x): pass - """), "duplicate argument 'x' in function definition") + """), "duplicate parameter 'x' in function definition") diff --git a/Lib/test/test_crossinterp.py b/Lib/test/test_crossinterp.py index 5ac0080db43..b366a29645e 100644 --- a/Lib/test/test_crossinterp.py +++ b/Lib/test/test_crossinterp.py @@ -758,6 +758,126 @@ class CodeTests(_GetXIDataTests): ]) +class PureShareableScriptTests(_GetXIDataTests): + + MODE = 'script-pure' + + VALID_SCRIPTS = [ + '', + 'spam', + '# a comment', + 'print("spam")', + 'raise Exception("spam")', + """if True: + do_something() + """, + """if True: + def spam(x): + return x + class Spam: + def eggs(self): + return 42 + x = Spam().eggs() + raise ValueError(spam(x)) + """, + ] + INVALID_SCRIPTS = [ + ' pass', # IndentationError + '----', # SyntaxError + """if True: + def spam(): + # no body + spam() + """, # IndentationError + ] + + def test_valid_str(self): + self.assert_roundtrip_not_equal([ + *self.VALID_SCRIPTS, + ], expecttype=types.CodeType) + + def test_invalid_str(self): + self.assert_not_shareable([ + *self.INVALID_SCRIPTS, + ]) + + def test_valid_bytes(self): + self.assert_roundtrip_not_equal([ + *(s.encode('utf8') for s in self.VALID_SCRIPTS), + ], expecttype=types.CodeType) + + def test_invalid_bytes(self): + self.assert_not_shareable([ + *(s.encode('utf8') for s in self.INVALID_SCRIPTS), + ]) + + def test_pure_script_code(self): + self.assert_roundtrip_equal_not_identical([ + *(f.__code__ for f in defs.PURE_SCRIPT_FUNCTIONS), + ]) + + def test_impure_script_code(self): + self.assert_not_shareable([ + *(f.__code__ for f in defs.SCRIPT_FUNCTIONS + if f not in defs.PURE_SCRIPT_FUNCTIONS), + ]) + + def test_other_code(self): + self.assert_not_shareable([ + *(f.__code__ for f in defs.FUNCTIONS + if f not in defs.SCRIPT_FUNCTIONS), + *(f.__code__ for f in defs.FUNCTION_LIKE), + ]) + + def test_pure_script_function(self): + self.assert_roundtrip_not_equal([ + *defs.PURE_SCRIPT_FUNCTIONS, + ], expecttype=types.CodeType) + + def test_impure_script_function(self): + self.assert_not_shareable([ + *(f for f in defs.SCRIPT_FUNCTIONS + if f not in defs.PURE_SCRIPT_FUNCTIONS), + ]) + + def test_other_function(self): + self.assert_not_shareable([ + *(f for f in defs.FUNCTIONS + if f not in defs.SCRIPT_FUNCTIONS), + *defs.FUNCTION_LIKE, + ]) + + def test_other_objects(self): + self.assert_not_shareable([ + None, + True, + False, + Ellipsis, + NotImplemented, + (), + [], + {}, + object(), + ]) + + +class ShareableScriptTests(PureShareableScriptTests): + + MODE = 'script' + + def test_impure_script_code(self): + self.assert_roundtrip_equal_not_identical([ + *(f.__code__ for f in defs.SCRIPT_FUNCTIONS + if f not in defs.PURE_SCRIPT_FUNCTIONS), + ]) + + def test_impure_script_function(self): + self.assert_roundtrip_not_equal([ + *(f for f in defs.SCRIPT_FUNCTIONS + if f not in defs.PURE_SCRIPT_FUNCTIONS), + ], expecttype=types.CodeType) + + class ShareableTypeTests(_GetXIDataTests): MODE = 'xidata' diff --git a/Lib/test/test_dict.py b/Lib/test/test_dict.py index 3104cbc66cb..69f1a098920 100644 --- a/Lib/test/test_dict.py +++ b/Lib/test/test_dict.py @@ -338,17 +338,34 @@ class DictTest(unittest.TestCase): self.assertRaises(Exc, baddict2.fromkeys, [1]) # test fast path for dictionary inputs + res = dict(zip(range(6), [0]*6)) d = dict(zip(range(6), range(6))) - self.assertEqual(dict.fromkeys(d, 0), dict(zip(range(6), [0]*6))) - + self.assertEqual(dict.fromkeys(d, 0), res) + # test fast path for set inputs + d = set(range(6)) + self.assertEqual(dict.fromkeys(d, 0), res) + # test slow path for other iterable inputs + d = list(range(6)) + self.assertEqual(dict.fromkeys(d, 0), res) + + # test fast path when object's constructor returns large non-empty dict class baddict3(dict): def __new__(cls): return d - d = {i : i for i in range(10)} + d = {i : i for i in range(1000)} res = d.copy() res.update(a=None, b=None, c=None) self.assertEqual(baddict3.fromkeys({"a", "b", "c"}), res) + # test slow path when object is a proper subclass of dict + class baddict4(dict): + def __init__(self): + dict.__init__(self, d) + d = {i : i for i in range(1000)} + res = d.copy() + res.update(a=None, b=None, c=None) + self.assertEqual(baddict4.fromkeys({"a", "b", "c"}), res) + def test_copy(self): d = {1: 1, 2: 2, 3: 3} self.assertIsNot(d.copy(), d) diff --git a/Lib/test/test_embed.py b/Lib/test/test_embed.py index 95b2d80464c..46222e521ae 100644 --- a/Lib/test/test_embed.py +++ b/Lib/test/test_embed.py @@ -296,7 +296,7 @@ class EmbeddingTests(EmbeddingTestsMixin, unittest.TestCase): if MS_WINDOWS: expected_path = self.test_exe else: - expected_path = os.path.join(os.getcwd(), "spam") + expected_path = os.path.join(os.getcwd(), "_testembed") expected_output = f"sys.executable: {expected_path}\n" self.assertIn(expected_output, out) self.assertEqual(err, '') @@ -969,7 +969,6 @@ class InitConfigTests(EmbeddingTestsMixin, unittest.TestCase): 'utf8_mode': True, } config = { - 'program_name': './globalvar', 'site_import': False, 'bytes_warning': True, 'warnoptions': ['default::BytesWarning'], diff --git a/Lib/test/test_free_threading/test_io.py b/Lib/test/test_free_threading/test_io.py new file mode 100644 index 00000000000..f9bec740ddf --- /dev/null +++ b/Lib/test/test_free_threading/test_io.py @@ -0,0 +1,109 @@ +import threading +from unittest import TestCase +from test.support import threading_helper +from random import randint +from io import BytesIO +from sys import getsizeof + + +class TestBytesIO(TestCase): + # Test pretty much everything that can break under free-threading. + # Non-deterministic, but at least one of these things will fail if + # BytesIO object is not free-thread safe. + + def check(self, funcs, *args): + barrier = threading.Barrier(len(funcs)) + threads = [] + + for func in funcs: + thread = threading.Thread(target=func, args=(barrier, *args)) + + threads.append(thread) + + with threading_helper.start_threads(threads): + pass + + @threading_helper.requires_working_threading() + @threading_helper.reap_threads + def test_free_threading(self): + """Test for segfaults and aborts.""" + + def write(barrier, b, *ignore): + barrier.wait() + try: b.write(b'0' * randint(100, 1000)) + except ValueError: pass # ignore write fail to closed file + + def writelines(barrier, b, *ignore): + barrier.wait() + b.write(b'0\n' * randint(100, 1000)) + + def truncate(barrier, b, *ignore): + barrier.wait() + try: b.truncate(0) + except: BufferError # ignore exported buffer + + def read(barrier, b, *ignore): + barrier.wait() + b.read() + + def read1(barrier, b, *ignore): + barrier.wait() + b.read1() + + def readline(barrier, b, *ignore): + barrier.wait() + b.readline() + + def readlines(barrier, b, *ignore): + barrier.wait() + b.readlines() + + def readinto(barrier, b, into, *ignore): + barrier.wait() + b.readinto(into) + + def close(barrier, b, *ignore): + barrier.wait() + b.close() + + def getvalue(barrier, b, *ignore): + barrier.wait() + b.getvalue() + + def getbuffer(barrier, b, *ignore): + barrier.wait() + b.getbuffer() + + def iter(barrier, b, *ignore): + barrier.wait() + list(b) + + def getstate(barrier, b, *ignore): + barrier.wait() + b.__getstate__() + + def setstate(barrier, b, st, *ignore): + barrier.wait() + b.__setstate__(st) + + def sizeof(barrier, b, *ignore): + barrier.wait() + getsizeof(b) + + self.check([write] * 10, BytesIO()) + self.check([writelines] * 10, BytesIO()) + self.check([write] * 10 + [truncate] * 10, BytesIO()) + self.check([truncate] + [read] * 10, BytesIO(b'0\n'*204800)) + self.check([truncate] + [read1] * 10, BytesIO(b'0\n'*204800)) + self.check([truncate] + [readline] * 10, BytesIO(b'0\n'*20480)) + self.check([truncate] + [readlines] * 10, BytesIO(b'0\n'*20480)) + self.check([truncate] + [readinto] * 10, BytesIO(b'0\n'*204800), bytearray(b'0\n'*204800)) + self.check([close] + [write] * 10, BytesIO()) + self.check([truncate] + [getvalue] * 10, BytesIO(b'0\n'*204800)) + self.check([truncate] + [getbuffer] * 10, BytesIO(b'0\n'*204800)) + self.check([truncate] + [iter] * 10, BytesIO(b'0\n'*20480)) + self.check([truncate] + [getstate] * 10, BytesIO(b'0\n'*204800)) + self.check([truncate] + [setstate] * 10, BytesIO(b'0\n'*204800), (b'123', 0, None)) + self.check([truncate] + [sizeof] * 10, BytesIO(b'0\n'*204800)) + + # no tests for seek or tell because they don't break anything diff --git a/Lib/test/test_functools.py b/Lib/test/test_functools.py index 2e794b0fc95..f7e09fd771e 100644 --- a/Lib/test/test_functools.py +++ b/Lib/test/test_functools.py @@ -21,6 +21,7 @@ from weakref import proxy import contextlib from inspect import Signature +from test.support import ALWAYS_EQ from test.support import import_helper from test.support import threading_helper from test.support import cpython_only @@ -244,6 +245,13 @@ class TestPartial: actual_args, actual_kwds = p('x', 'y') self.assertEqual(actual_args, ('x', 0, 'y', 1)) self.assertEqual(actual_kwds, {}) + # Checks via `is` and not `eq` + # thus ALWAYS_EQ isn't treated as Placeholder + p = self.partial(capture, ALWAYS_EQ) + actual_args, actual_kwds = p() + self.assertEqual(len(actual_args), 1) + self.assertIs(actual_args[0], ALWAYS_EQ) + self.assertEqual(actual_kwds, {}) def test_placeholders_optimization(self): PH = self.module.Placeholder @@ -260,6 +268,17 @@ class TestPartial: self.assertEqual(p2.args, (PH, 0)) self.assertEqual(p2(1), ((1, 0), {})) + def test_placeholders_kw_restriction(self): + PH = self.module.Placeholder + with self.assertRaisesRegex(TypeError, "Placeholder"): + self.partial(capture, a=PH) + # Passes, as checks via `is` and not `eq` + p = self.partial(capture, a=ALWAYS_EQ) + actual_args, actual_kwds = p() + self.assertEqual(actual_args, ()) + self.assertEqual(len(actual_kwds), 1) + self.assertIs(actual_kwds['a'], ALWAYS_EQ) + def test_construct_placeholder_singleton(self): PH = self.module.Placeholder tp = type(PH) diff --git a/Lib/test/test_future_stmt/test_future.py b/Lib/test/test_future_stmt/test_future.py index 42c6cb3fefa..71f1e616116 100644 --- a/Lib/test/test_future_stmt/test_future.py +++ b/Lib/test/test_future_stmt/test_future.py @@ -422,6 +422,11 @@ class AnnotationsFutureTestCase(unittest.TestCase): eq('(((a)))', 'a') eq('(((a, b)))', '(a, b)') eq("1 + 2 + 3") + eq("t''") + eq("t'{a + b}'") + eq("t'{a!s}'") + eq("t'{a:b}'") + eq("t'{a:b=}'") def test_fstring_debug_annotations(self): # f-strings with '=' don't round trip very well, so set the expected diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index 8fae12c478c..95c98c6ac63 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -7,7 +7,7 @@ from test.support import (verbose, refcount_test, Py_GIL_DISABLED) from test.support.import_helper import import_module from test.support.os_helper import temp_dir, TESTFN, unlink -from test.support.script_helper import assert_python_ok, make_script +from test.support.script_helper import assert_python_ok, make_script, run_test_script from test.support import threading_helper, gc_threshold import gc @@ -1127,64 +1127,14 @@ class GCTests(unittest.TestCase): class IncrementalGCTests(unittest.TestCase): - - def setUp(self): - # Reenable GC as it is disabled module-wide - gc.enable() - - def tearDown(self): - gc.disable() - @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") @requires_gil_enabled("Free threading does not support incremental GC") - # Use small increments to emulate longer running process in a shorter time - @gc_threshold(200, 10) def test_incremental_gc_handles_fast_cycle_creation(self): - - class LinkedList: - - #Use slots to reduce number of implicit objects - __slots__ = "next", "prev", "surprise" - - def __init__(self, next=None, prev=None): - self.next = next - if next is not None: - next.prev = self - self.prev = prev - if prev is not None: - prev.next = self - - def make_ll(depth): - head = LinkedList() - for i in range(depth): - head = LinkedList(head, head.prev) - return head - - head = make_ll(1000) - count = 1000 - - # There will be some objects we aren't counting, - # e.g. the gc stats dicts. This test checks - # that the counts don't grow, so we try to - # correct for the uncounted objects - # This is just an estimate. - CORRECTION = 20 - - enabled = gc.isenabled() - gc.enable() - olds = [] - initial_heap_size = _testinternalcapi.get_tracked_heap_size() - for i in range(20_000): - newhead = make_ll(20) - count += 20 - newhead.surprise = head - olds.append(newhead) - if len(olds) == 20: - new_objects = _testinternalcapi.get_tracked_heap_size() - initial_heap_size - self.assertLess(new_objects, 27_000, f"Heap growing. Reached limit after {i} iterations") - del olds[:] - if not enabled: - gc.disable() + # Run this test in a fresh process. The number of alive objects (which can + # be from unit tests run before this one) can influence how quickly cyclic + # garbage is found. + script = support.findfile("_test_gc_fast_cycles.py") + run_test_script(script) class GCCallbackTests(unittest.TestCase): diff --git a/Lib/test/test_generated_cases.py b/Lib/test/test_generated_cases.py index d481cb07f75..a71ddc01d1c 100644 --- a/Lib/test/test_generated_cases.py +++ b/Lib/test/test_generated_cases.py @@ -2069,6 +2069,189 @@ class TestGeneratedAbstractCases(unittest.TestCase): with self.assertRaisesRegex(AssertionError, "All abstract uops"): self.run_cases_test(input, input2, output) + def test_validate_uop_input_length_mismatch(self): + input = """ + op(OP, (arg1 -- out)) { + SPAM(); + } + """ + input2 = """ + op(OP, (arg1, arg2 -- out)) { + } + """ + output = """ + """ + with self.assertRaisesRegex(SyntaxError, + "Must have the same number of inputs"): + self.run_cases_test(input, input2, output) + + def test_validate_uop_output_length_mismatch(self): + input = """ + op(OP, (arg1 -- out)) { + SPAM(); + } + """ + input2 = """ + op(OP, (arg1 -- out1, out2)) { + } + """ + output = """ + """ + with self.assertRaisesRegex(SyntaxError, + "Must have the same number of outputs"): + self.run_cases_test(input, input2, output) + + def test_validate_uop_input_name_mismatch(self): + input = """ + op(OP, (foo -- out)) { + SPAM(); + } + """ + input2 = """ + op(OP, (bar -- out)) { + } + """ + output = """ + """ + with self.assertRaisesRegex(SyntaxError, + "Inputs must have equal names"): + self.run_cases_test(input, input2, output) + + def test_validate_uop_output_name_mismatch(self): + input = """ + op(OP, (arg1 -- foo)) { + SPAM(); + } + """ + input2 = """ + op(OP, (arg1 -- bar)) { + } + """ + output = """ + """ + with self.assertRaisesRegex(SyntaxError, + "Outputs must have equal names"): + self.run_cases_test(input, input2, output) + + def test_validate_uop_unused_input(self): + input = """ + op(OP, (unused -- )) { + } + """ + input2 = """ + op(OP, (foo -- )) { + } + """ + output = """ + case OP: { + stack_pointer += -1; + assert(WITHIN_STACK_BOUNDS()); + break; + } + """ + self.run_cases_test(input, input2, output) + + input = """ + op(OP, (foo -- )) { + } + """ + input2 = """ + op(OP, (unused -- )) { + } + """ + output = """ + case OP: { + stack_pointer += -1; + assert(WITHIN_STACK_BOUNDS()); + break; + } + """ + self.run_cases_test(input, input2, output) + + def test_validate_uop_unused_output(self): + input = """ + op(OP, ( -- unused)) { + } + """ + input2 = """ + op(OP, ( -- foo)) { + foo = NULL; + } + """ + output = """ + case OP: { + JitOptSymbol *foo; + foo = NULL; + stack_pointer[0] = foo; + stack_pointer += 1; + assert(WITHIN_STACK_BOUNDS()); + break; + } + """ + self.run_cases_test(input, input2, output) + + input = """ + op(OP, ( -- foo)) { + foo = NULL; + } + """ + input2 = """ + op(OP, ( -- unused)) { + } + """ + output = """ + case OP: { + stack_pointer += 1; + assert(WITHIN_STACK_BOUNDS()); + break; + } + """ + self.run_cases_test(input, input2, output) + + def test_validate_uop_input_size_mismatch(self): + input = """ + op(OP, (arg1[2] -- )) { + } + """ + input2 = """ + op(OP, (arg1[4] -- )) { + } + """ + output = """ + """ + with self.assertRaisesRegex(SyntaxError, + "Inputs must have equal sizes"): + self.run_cases_test(input, input2, output) + + def test_validate_uop_output_size_mismatch(self): + input = """ + op(OP, ( -- out[2])) { + } + """ + input2 = """ + op(OP, ( -- out[4])) { + } + """ + output = """ + """ + with self.assertRaisesRegex(SyntaxError, + "Outputs must have equal sizes"): + self.run_cases_test(input, input2, output) + + def test_validate_uop_unused_size_mismatch(self): + input = """ + op(OP, (foo[2] -- )) { + } + """ + input2 = """ + op(OP, (unused[4] -- )) { + } + """ + output = """ + """ + with self.assertRaisesRegex(SyntaxError, + "Inputs must have equal sizes"): + self.run_cases_test(input, input2, output) if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_htmlparser.py b/Lib/test/test_htmlparser.py index 4fdba06cf4c..61fa24fab57 100644 --- a/Lib/test/test_htmlparser.py +++ b/Lib/test/test_htmlparser.py @@ -317,6 +317,16 @@ text ("endtag", element_lower)], collector=Collector(convert_charrefs=False)) + def test_EOF_in_cdata(self): + content = """<!-- not a comment --> ¬-an-entity-ref; + <a href="" /> </p><p> <span></span></style> + '</script' + '>'""" + s = f'<script>{content}' + self._run_check(s, [ + ("starttag", 'script', []), + ("data", content) + ]) + def test_comments(self): html = ("<!-- I'm a valid comment -->" '<!--me too!-->' @@ -566,12 +576,33 @@ text for html, expected in data: self._run_check(html, expected) - def test_broken_comments(self): + def test_EOF_in_comments_or_decls(self): + data = [ + ('<!', [('data', '<!')]), + ('<!-', [('data', '<!-')]), + ('<!--', [('data', '<!--')]), + ('<![', [('data', '<![')]), + ('<![CDATA[', [('data', '<![CDATA[')]), + ('<![CDATA[x', [('data', '<![CDATA[x')]), + ('<!DOCTYPE', [('data', '<!DOCTYPE')]), + ('<!DOCTYPE HTML', [('data', '<!DOCTYPE HTML')]), + ] + for html, expected in data: + self._run_check(html, expected) + def test_bogus_comments(self): html = ('<! not really a comment >' '<! not a comment either -->' '<! -- close enough -->' '<!><!<-- this was an empty comment>' - '<!!! another bogus comment !!!>') + '<!!! another bogus comment !!!>' + # see #32876 + '<![with square brackets]!>' + '<![\nmultiline\nbogusness\n]!>' + '<![more brackets]-[and a hyphen]!>' + '<![cdata[should be uppercase]]>' + '<![CDATA [whitespaces are not ignored]]>' + '<![CDATA]]>' # required '[' after CDATA + ) expected = [ ('comment', ' not really a comment '), ('comment', ' not a comment either --'), @@ -579,39 +610,65 @@ text ('comment', ''), ('comment', '<-- this was an empty comment'), ('comment', '!! another bogus comment !!!'), + ('comment', '[with square brackets]!'), + ('comment', '[\nmultiline\nbogusness\n]!'), + ('comment', '[more brackets]-[and a hyphen]!'), + ('comment', '[cdata[should be uppercase]]'), + ('comment', '[CDATA [whitespaces are not ignored]]'), + ('comment', '[CDATA]]'), ] self._run_check(html, expected) def test_broken_condcoms(self): # these condcoms are missing the '--' after '<!' and before the '>' + # and they are considered bogus comments according to + # "8.2.4.42. Markup declaration open state" html = ('<![if !(IE)]>broken condcom<![endif]>' '<![if ! IE]><link href="favicon.tiff"/><![endif]>' '<![if !IE 6]><img src="firefox.png" /><![endif]>' '<![if !ie 6]><b>foo</b><![endif]>' '<![if (!IE)|(lt IE 9)]><img src="mammoth.bmp" /><![endif]>') - # According to the HTML5 specs sections "8.2.4.44 Bogus comment state" - # and "8.2.4.45 Markup declaration open state", comment tokens should - # be emitted instead of 'unknown decl', but calling unknown_decl - # provides more flexibility. - # See also Lib/_markupbase.py:parse_declaration expected = [ - ('unknown decl', 'if !(IE)'), + ('comment', '[if !(IE)]'), ('data', 'broken condcom'), - ('unknown decl', 'endif'), - ('unknown decl', 'if ! IE'), + ('comment', '[endif]'), + ('comment', '[if ! IE]'), ('startendtag', 'link', [('href', 'favicon.tiff')]), - ('unknown decl', 'endif'), - ('unknown decl', 'if !IE 6'), + ('comment', '[endif]'), + ('comment', '[if !IE 6]'), ('startendtag', 'img', [('src', 'firefox.png')]), - ('unknown decl', 'endif'), - ('unknown decl', 'if !ie 6'), + ('comment', '[endif]'), + ('comment', '[if !ie 6]'), ('starttag', 'b', []), ('data', 'foo'), ('endtag', 'b'), - ('unknown decl', 'endif'), - ('unknown decl', 'if (!IE)|(lt IE 9)'), + ('comment', '[endif]'), + ('comment', '[if (!IE)|(lt IE 9)]'), ('startendtag', 'img', [('src', 'mammoth.bmp')]), - ('unknown decl', 'endif') + ('comment', '[endif]') + ] + self._run_check(html, expected) + + def test_cdata_declarations(self): + # More tests should be added. See also "8.2.4.42. Markup + # declaration open state", "8.2.4.69. CDATA section state", + # and issue 32876 + html = ('<![CDATA[just some plain text]]>') + expected = [('unknown decl', 'CDATA[just some plain text')] + self._run_check(html, expected) + + def test_cdata_declarations_multiline(self): + html = ('<code><![CDATA[' + ' if (a < b && a > b) {' + ' printf("[<marquee>How?</marquee>]");' + ' }' + ']]></code>') + expected = [ + ('starttag', 'code', []), + ('unknown decl', + 'CDATA[ if (a < b && a > b) { ' + 'printf("[<marquee>How?</marquee>]"); }'), + ('endtag', 'code') ] self._run_check(html, expected) diff --git a/Lib/test/test_importlib/test_threaded_import.py b/Lib/test/test_importlib/test_threaded_import.py index 9af1e4d505c..f78dc399720 100644 --- a/Lib/test/test_importlib/test_threaded_import.py +++ b/Lib/test/test_importlib/test_threaded_import.py @@ -135,10 +135,12 @@ class ThreadedImportTests(unittest.TestCase): if verbose: print("OK.") - def test_parallel_module_init(self): + @support.bigmemtest(size=50, memuse=76*2**20, dry_run=False) + def test_parallel_module_init(self, size): self.check_parallel_module_init() - def test_parallel_meta_path(self): + @support.bigmemtest(size=50, memuse=76*2**20, dry_run=False) + def test_parallel_meta_path(self, size): finder = Finder() sys.meta_path.insert(0, finder) try: @@ -148,7 +150,8 @@ class ThreadedImportTests(unittest.TestCase): finally: sys.meta_path.remove(finder) - def test_parallel_path_hooks(self): + @support.bigmemtest(size=50, memuse=76*2**20, dry_run=False) + def test_parallel_path_hooks(self, size): # Here the Finder instance is only used to check concurrent calls # to path_hook(). finder = Finder() @@ -242,13 +245,15 @@ class ThreadedImportTests(unittest.TestCase): __import__(TESTFN) del sys.modules[TESTFN] - def test_concurrent_futures_circular_import(self): + @support.bigmemtest(size=1, memuse=1.8*2**30, dry_run=False) + def test_concurrent_futures_circular_import(self, size): # Regression test for bpo-43515 fn = os.path.join(os.path.dirname(__file__), 'partial', 'cfimport.py') script_helper.assert_python_ok(fn) - def test_multiprocessing_pool_circular_import(self): + @support.bigmemtest(size=1, memuse=1.8*2**30, dry_run=False) + def test_multiprocessing_pool_circular_import(self, size): # Regression test for bpo-41567 fn = os.path.join(os.path.dirname(__file__), 'partial', 'pool_in_threads.py') diff --git a/Lib/test/test_inspect/test_inspect.py b/Lib/test/test_inspect/test_inspect.py index c9b37fcd8f6..1c325c0d507 100644 --- a/Lib/test/test_inspect/test_inspect.py +++ b/Lib/test/test_inspect/test_inspect.py @@ -4997,6 +4997,37 @@ class TestSignatureObject(unittest.TestCase): with self.assertRaisesRegex(NameError, "undefined"): signature_func(ida.f) + def test_signature_deferred_annotations(self): + def f(x: undef): + pass + + class C: + x: undef + + def __init__(self, x: undef): + self.x = x + + sig = inspect.signature(f, annotation_format=Format.FORWARDREF) + self.assertEqual(list(sig.parameters), ['x']) + sig = inspect.signature(C, annotation_format=Format.FORWARDREF) + self.assertEqual(list(sig.parameters), ['x']) + + class CallableWrapper: + def __init__(self, func): + self.func = func + self.__annotate__ = func.__annotate__ + + def __call__(self, *args, **kwargs): + return self.func(*args, **kwargs) + + @property + def __annotations__(self): + return self.__annotate__(Format.VALUE) + + cw = CallableWrapper(f) + sig = inspect.signature(cw, annotation_format=Format.FORWARDREF) + self.assertEqual(list(sig.parameters), ['args', 'kwargs']) + def test_signature_none_annotation(self): class funclike: # Has to be callable, and have correct @@ -6146,12 +6177,14 @@ class TestRepl(unittest.TestCase): object. """ + # TODO(picnixz): refactor this as it's used by test_repl.py + # To run the REPL without using a terminal, spawn python with the command # line option '-i' and the process name set to '<stdin>'. # The directory of argv[0] must match the directory of the Python # executable for the Popen() call to python to succeed as the directory - # path may be used by Py_GetPath() to build the default module search - # path. + # path may be used by PyConfig_Get("module_search_paths") to build the + # default module search path. stdin_fname = os.path.join(os.path.dirname(sys.executable), "<stdin>") cmd_line = [stdin_fname, '-E', '-i'] cmd_line.extend(args) diff --git a/Lib/test/test_linecache.py b/Lib/test/test_linecache.py index e4aa41ebb43..02f65338428 100644 --- a/Lib/test/test_linecache.py +++ b/Lib/test/test_linecache.py @@ -4,10 +4,12 @@ import linecache import unittest import os.path import tempfile +import threading import tokenize from importlib.machinery import ModuleSpec from test import support from test.support import os_helper +from test.support import threading_helper from test.support.script_helper import assert_python_ok @@ -374,5 +376,40 @@ class LineCacheInvalidationTests(unittest.TestCase): self.assertIn(self.unchanged_file, linecache.cache) +class MultiThreadingTest(unittest.TestCase): + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_read_write_safety(self): + + with tempfile.TemporaryDirectory() as tmpdirname: + filenames = [] + for i in range(10): + name = os.path.join(tmpdirname, f"test_{i}.py") + with open(name, "w") as h: + h.write("import time\n") + h.write("import system\n") + filenames.append(name) + + def linecache_get_line(b): + b.wait() + for _ in range(100): + for name in filenames: + linecache.getline(name, 1) + + def check(funcs): + barrier = threading.Barrier(len(funcs)) + threads = [] + + for func in funcs: + thread = threading.Thread(target=func, args=(barrier,)) + + threads.append(thread) + + with threading_helper.start_threads(threads): + pass + + check([linecache_get_line] * 20) + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 3f113ec1be4..1e5adcc8db1 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -4214,6 +4214,89 @@ class ConfigDictTest(BaseTest): handler = logging.getHandlerByName('custom') self.assertEqual(handler.custom_kwargs, custom_kwargs) + # See gh-91555 and gh-90321 + @support.requires_subprocess() + def test_deadlock_in_queue(self): + queue = multiprocessing.Queue() + handler = logging.handlers.QueueHandler(queue) + logger = multiprocessing.get_logger() + level = logger.level + try: + logger.setLevel(logging.DEBUG) + logger.addHandler(handler) + logger.debug("deadlock") + finally: + logger.setLevel(level) + logger.removeHandler(handler) + + def test_recursion_in_custom_handler(self): + class BadHandler(logging.Handler): + def __init__(self): + super().__init__() + def emit(self, record): + logger.debug("recurse") + logger = logging.getLogger("test_recursion_in_custom_handler") + logger.addHandler(BadHandler()) + logger.setLevel(logging.DEBUG) + logger.debug("boom") + + @threading_helper.requires_working_threading() + def test_thread_supression_noninterference(self): + lock = threading.Lock() + logger = logging.getLogger("test_thread_supression_noninterference") + + # Block on the first call, allow others through + # + # NOTE: We need to bypass the base class's lock, otherwise that will + # block multiple calls to the same handler itself. + class BlockOnceHandler(TestHandler): + def __init__(self, barrier): + super().__init__(support.Matcher()) + self.barrier = barrier + + def createLock(self): + self.lock = None + + def handle(self, record): + self.emit(record) + + def emit(self, record): + if self.barrier: + barrier = self.barrier + self.barrier = None + barrier.wait() + with lock: + pass + super().emit(record) + logger.info("blow up if not supressed") + + barrier = threading.Barrier(2) + handler = BlockOnceHandler(barrier) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + t1 = threading.Thread(target=logger.debug, args=("1",)) + with lock: + + # Ensure first thread is blocked in the handler, hence supressing logging... + t1.start() + barrier.wait() + + # ...but the second thread should still be able to log... + t2 = threading.Thread(target=logger.debug, args=("2",)) + t2.start() + t2.join(timeout=3) + + self.assertEqual(len(handler.buffer), 1) + self.assertTrue(handler.matches(levelno=logging.DEBUG, message='2')) + + # The first thread should still be blocked here + self.assertTrue(t1.is_alive()) + + # Now the lock has been released the first thread should complete + t1.join() + self.assertEqual(len(handler.buffer), 2) + self.assertTrue(handler.matches(levelno=logging.DEBUG, message='1')) class ManagerTest(BaseTest): def test_manager_loggerclass(self): diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py index 68c2b508fa6..6b74e21ad73 100644 --- a/Lib/test/test_pdb.py +++ b/Lib/test/test_pdb.py @@ -4749,7 +4749,9 @@ class TestREPLSession(unittest.TestCase): @support.force_not_colorized_test_class @support.requires_subprocess() class PdbTestReadline(unittest.TestCase): - def setUpClass(): + + @classmethod + def setUpClass(cls): # Ensure that the readline module is loaded # If this fails, the test is skipped because SkipTest will be raised readline = import_module('readline') diff --git a/Lib/test/test_positional_only_arg.py b/Lib/test/test_positional_only_arg.py index eea0625012d..e412cb1d58d 100644 --- a/Lib/test/test_positional_only_arg.py +++ b/Lib/test/test_positional_only_arg.py @@ -37,8 +37,8 @@ class PositionalOnlyTestCase(unittest.TestCase): check_syntax_error(self, "def f(/): pass") check_syntax_error(self, "def f(*, a, /): pass") check_syntax_error(self, "def f(*, /, a): pass") - check_syntax_error(self, "def f(a, /, a): pass", "duplicate argument 'a' in function definition") - check_syntax_error(self, "def f(a, /, *, a): pass", "duplicate argument 'a' in function definition") + check_syntax_error(self, "def f(a, /, a): pass", "duplicate parameter 'a' in function definition") + check_syntax_error(self, "def f(a, /, *, a): pass", "duplicate parameter 'a' in function definition") check_syntax_error(self, "def f(a, b/2, c): pass") check_syntax_error(self, "def f(a, /, c, /): pass") check_syntax_error(self, "def f(a, /, c, /, d): pass") @@ -59,8 +59,8 @@ class PositionalOnlyTestCase(unittest.TestCase): check_syntax_error(self, "async def f(/): pass") check_syntax_error(self, "async def f(*, a, /): pass") check_syntax_error(self, "async def f(*, /, a): pass") - check_syntax_error(self, "async def f(a, /, a): pass", "duplicate argument 'a' in function definition") - check_syntax_error(self, "async def f(a, /, *, a): pass", "duplicate argument 'a' in function definition") + check_syntax_error(self, "async def f(a, /, a): pass", "duplicate parameter 'a' in function definition") + check_syntax_error(self, "async def f(a, /, *, a): pass", "duplicate parameter 'a' in function definition") check_syntax_error(self, "async def f(a, b/2, c): pass") check_syntax_error(self, "async def f(a, /, c, /): pass") check_syntax_error(self, "async def f(a, /, c, /, d): pass") @@ -247,8 +247,8 @@ class PositionalOnlyTestCase(unittest.TestCase): check_syntax_error(self, "lambda /: None") check_syntax_error(self, "lambda *, a, /: None") check_syntax_error(self, "lambda *, /, a: None") - check_syntax_error(self, "lambda a, /, a: None", "duplicate argument 'a' in function definition") - check_syntax_error(self, "lambda a, /, *, a: None", "duplicate argument 'a' in function definition") + check_syntax_error(self, "lambda a, /, a: None", "duplicate parameter 'a' in function definition") + check_syntax_error(self, "lambda a, /, *, a: None", "duplicate parameter 'a' in function definition") check_syntax_error(self, "lambda a, /, b, /: None") check_syntax_error(self, "lambda a, /, b, /, c: None") check_syntax_error(self, "lambda a, /, b, /, c, *, d: None") diff --git a/Lib/test/test_pyrepl/test_interact.py b/Lib/test/test_pyrepl/test_interact.py index a20719033fc..8c0eeab6dca 100644 --- a/Lib/test/test_pyrepl/test_interact.py +++ b/Lib/test/test_pyrepl/test_interact.py @@ -113,7 +113,7 @@ class TestSimpleInteract(unittest.TestCase): r = """ def f(x, x): ... ^ -SyntaxError: duplicate argument 'x' in function definition""" +SyntaxError: duplicate parameter 'x' in function definition""" self.assertIn(r, f.getvalue()) def test_runsource_shows_syntax_error_for_failed_compilation(self): diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py index 93029ab6e08..fc8114891d1 100644 --- a/Lib/test/test_pyrepl/test_pyrepl.py +++ b/Lib/test/test_pyrepl/test_pyrepl.py @@ -452,6 +452,11 @@ class TestPyReplAutoindent(TestCase): ) # fmt: on + events = code_to_events(input_code) + reader = self.prepare_reader(events) + output = multiline_input(reader) + self.assertEqual(output, output_code) + def test_auto_indent_continuation(self): # auto indenting according to previous user indentation # fmt: off diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py index 4ee320a5a4d..57526f88f93 100644 --- a/Lib/test/test_pyrepl/test_reader.py +++ b/Lib/test/test_pyrepl/test_reader.py @@ -497,6 +497,26 @@ class TestReaderInColor(ScreenEqualMixin, TestCase): self.assert_screen_equal(reader, code, clean=True) self.assert_screen_equal(reader, expected) + def test_syntax_highlighting_indentation_error(self): + code = dedent( + """\ + def unfinished_function(): + var = 1 + oops + """ + ) + expected = dedent( + """\ + {k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z} + var {o}={z} {n}1{z} + oops + """ + ).format(**colors) + events = code_to_events(code) + reader, _ = handle_all_events(events) + self.assert_screen_equal(reader, code, clean=True) + self.assert_screen_equal(reader, expected) + def test_control_characters(self): code = 'flag = "🏳️🌈"' events = code_to_events(code) diff --git a/Lib/test/test_repl.py b/Lib/test/test_repl.py index 228b326699e..f4a4634fc62 100644 --- a/Lib/test/test_repl.py +++ b/Lib/test/test_repl.py @@ -38,8 +38,8 @@ def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): # line option '-i' and the process name set to '<stdin>'. # The directory of argv[0] must match the directory of the Python # executable for the Popen() call to python to succeed as the directory - # path may be used by Py_GetPath() to build the default module search - # path. + # path may be used by PyConfig_Get("module_search_paths") to build the + # default module search path. stdin_fname = os.path.join(os.path.dirname(sys.executable), "<stdin>") cmd_line = [stdin_fname, '-I', '-i'] cmd_line.extend(args) @@ -197,7 +197,7 @@ class TestInteractiveInterpreter(unittest.TestCase): expected_lines = [ ' def f(x, x): ...', ' ^', - "SyntaxError: duplicate argument 'x' in function definition" + "SyntaxError: duplicate parameter 'x' in function definition" ] self.assertEqual(output.splitlines()[4:-1], expected_lines) diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py index ad0dcb3cccb..37e0f74f688 100644 --- a/Lib/test/test_sqlite3/test_cli.py +++ b/Lib/test/test_sqlite3/test_cli.py @@ -8,10 +8,11 @@ from test.support import ( captured_stdout, captured_stderr, captured_stdin, - force_not_colorized, + force_not_colorized_test_class, ) +@force_not_colorized_test_class class CommandLineInterface(unittest.TestCase): def _do_test(self, *args, expect_success=True): @@ -37,7 +38,6 @@ class CommandLineInterface(unittest.TestCase): self.assertEqual(out, "") return err - @force_not_colorized def test_cli_help(self): out = self.expect_success("-h") self.assertIn("usage: ", out) @@ -69,6 +69,7 @@ class CommandLineInterface(unittest.TestCase): self.assertIn("(0,)", out) +@force_not_colorized_test_class class InteractiveSession(unittest.TestCase): MEMORY_DB_MSG = "Connected to a transient in-memory database" PS1 = "sqlite> " @@ -116,6 +117,38 @@ class InteractiveSession(unittest.TestCase): self.assertEqual(out.count(self.PS2), 0) self.assertIn(sqlite3.sqlite_version, out) + def test_interact_empty_source(self): + out, err = self.run_cli(commands=("", " ")) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertEndsWith(out, self.PS1) + self.assertEqual(out.count(self.PS1), 3) + self.assertEqual(out.count(self.PS2), 0) + + def test_interact_dot_commands_unknown(self): + out, err = self.run_cli(commands=(".unknown_command", )) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertEndsWith(out, self.PS1) + self.assertEqual(out.count(self.PS1), 2) + self.assertEqual(out.count(self.PS2), 0) + self.assertIn("Error", err) + # test "unknown_command" is pointed out in the error message + self.assertIn("unknown_command", err) + + def test_interact_dot_commands_empty(self): + out, err = self.run_cli(commands=(".")) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertEndsWith(out, self.PS1) + self.assertEqual(out.count(self.PS1), 2) + self.assertEqual(out.count(self.PS2), 0) + + def test_interact_dot_commands_with_whitespaces(self): + out, err = self.run_cli(commands=(".version ", ". version")) + self.assertIn(self.MEMORY_DB_MSG, err) + self.assertEqual(out.count(sqlite3.sqlite_version + "\n"), 2) + self.assertEndsWith(out, self.PS1) + self.assertEqual(out.count(self.PS1), 3) + self.assertEqual(out.count(self.PS2), 0) + def test_interact_valid_sql(self): out, err = self.run_cli(commands=("SELECT 1;",)) self.assertIn(self.MEMORY_DB_MSG, err) @@ -158,6 +191,14 @@ class InteractiveSession(unittest.TestCase): out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",)) self.assertIn("(0,)\n", out) + def test_color(self): + with unittest.mock.patch("_colorize.can_colorize", return_value=True): + out, err = self.run_cli(commands="TEXT\n") + self.assertIn("\x1b[1;35msqlite> \x1b[0m", out) + self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out) + out, err = self.run_cli(commands=("sel;",)) + self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: ' + '\x1b[35mnear "sel": syntax error\x1b[0m', err) if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_sqlite3/test_dbapi.py b/Lib/test/test_sqlite3/test_dbapi.py index c3aa3bf2d7b..291e0356253 100644 --- a/Lib/test/test_sqlite3/test_dbapi.py +++ b/Lib/test/test_sqlite3/test_dbapi.py @@ -550,17 +550,9 @@ class ConnectionTests(unittest.TestCase): cx.execute("insert into u values(0)") def test_connect_positional_arguments(self): - regex = ( - r"Passing more than 1 positional argument to sqlite3.connect\(\)" - " is deprecated. Parameters 'timeout', 'detect_types', " - "'isolation_level', 'check_same_thread', 'factory', " - "'cached_statements' and 'uri' will become keyword-only " - "parameters in Python 3.15." - ) - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: - cx = sqlite.connect(":memory:", 1.0) - cx.close() - self.assertEqual(cm.filename, __file__) + with self.assertRaisesRegex(TypeError, + r'connect\(\) takes at most 1 positional arguments'): + sqlite.connect(":memory:", 1.0) def test_connection_resource_warning(self): with self.assertWarns(ResourceWarning): diff --git a/Lib/test/test_sqlite3/test_factory.py b/Lib/test/test_sqlite3/test_factory.py index cc9f1ec5c4b..776659e3b16 100644 --- a/Lib/test/test_sqlite3/test_factory.py +++ b/Lib/test/test_sqlite3/test_factory.py @@ -71,18 +71,9 @@ class ConnectionFactoryTests(unittest.TestCase): def __init__(self, *args, **kwargs): super(Factory, self).__init__(*args, **kwargs) - regex = ( - r"Passing more than 1 positional argument to _sqlite3.Connection\(\) " - r"is deprecated. Parameters 'timeout', 'detect_types', " - r"'isolation_level', 'check_same_thread', 'factory', " - r"'cached_statements' and 'uri' will become keyword-only " - r"parameters in Python 3.15." - ) - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: - with memory_database(5.0, 0, None, True, Factory) as con: - self.assertIsNone(con.isolation_level) - self.assertIsInstance(con, Factory) - self.assertEqual(cm.filename, __file__) + with self.assertRaisesRegex(TypeError, + r'connect\(\) takes at most 1 positional arguments'): + memory_database(5.0, 0, None, True, Factory) class CursorFactoryTests(MemoryDatabaseMixin, unittest.TestCase): diff --git a/Lib/test/test_sqlite3/test_hooks.py b/Lib/test/test_sqlite3/test_hooks.py index 53b8a39bf29..2b907e35131 100644 --- a/Lib/test/test_sqlite3/test_hooks.py +++ b/Lib/test/test_sqlite3/test_hooks.py @@ -220,16 +220,9 @@ class ProgressTests(MemoryDatabaseMixin, unittest.TestCase): """) def test_progress_handler_keyword_args(self): - regex = ( - r"Passing keyword argument 'progress_handler' to " - r"_sqlite3.Connection.set_progress_handler\(\) is deprecated. " - r"Parameter 'progress_handler' will become positional-only in " - r"Python 3.15." - ) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: + with self.assertRaisesRegex(TypeError, + 'takes at least 1 positional argument'): self.con.set_progress_handler(progress_handler=lambda: None, n=1) - self.assertEqual(cm.filename, __file__) class TraceCallbackTests(MemoryDatabaseMixin, unittest.TestCase): @@ -353,16 +346,9 @@ class TraceCallbackTests(MemoryDatabaseMixin, unittest.TestCase): cx.execute("select 1") def test_trace_keyword_args(self): - regex = ( - r"Passing keyword argument 'trace_callback' to " - r"_sqlite3.Connection.set_trace_callback\(\) is deprecated. " - r"Parameter 'trace_callback' will become positional-only in " - r"Python 3.15." - ) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: + with self.assertRaisesRegex(TypeError, + 'takes exactly 1 positional argument'): self.con.set_trace_callback(trace_callback=lambda: None) - self.assertEqual(cm.filename, __file__) if __name__ == "__main__": diff --git a/Lib/test/test_sqlite3/test_userfunctions.py b/Lib/test/test_sqlite3/test_userfunctions.py index 3abc43a3b1a..11cf877a011 100644 --- a/Lib/test/test_sqlite3/test_userfunctions.py +++ b/Lib/test/test_sqlite3/test_userfunctions.py @@ -422,27 +422,9 @@ class FunctionTests(unittest.TestCase): self.con.execute, "select badreturn()") def test_func_keyword_args(self): - regex = ( - r"Passing keyword arguments 'name', 'narg' and 'func' to " - r"_sqlite3.Connection.create_function\(\) is deprecated. " - r"Parameters 'name', 'narg' and 'func' will become " - r"positional-only in Python 3.15." - ) - - def noop(): - return None - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: - self.con.create_function("noop", 0, func=noop) - self.assertEqual(cm.filename, __file__) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: - self.con.create_function("noop", narg=0, func=noop) - self.assertEqual(cm.filename, __file__) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: - self.con.create_function(name="noop", narg=0, func=noop) - self.assertEqual(cm.filename, __file__) + with self.assertRaisesRegex(TypeError, + 'takes exactly 3 positional arguments'): + self.con.create_function("noop", 0, func=lambda: None) class WindowSumInt: @@ -737,25 +719,9 @@ class AggregateTests(unittest.TestCase): self.assertEqual(val, txt) def test_agg_keyword_args(self): - regex = ( - r"Passing keyword arguments 'name', 'n_arg' and 'aggregate_class' to " - r"_sqlite3.Connection.create_aggregate\(\) is deprecated. " - r"Parameters 'name', 'n_arg' and 'aggregate_class' will become " - r"positional-only in Python 3.15." - ) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: + with self.assertRaisesRegex(TypeError, + 'takes exactly 3 positional arguments'): self.con.create_aggregate("test", 1, aggregate_class=AggrText) - self.assertEqual(cm.filename, __file__) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: - self.con.create_aggregate("test", n_arg=1, aggregate_class=AggrText) - self.assertEqual(cm.filename, __file__) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: - self.con.create_aggregate(name="test", n_arg=0, - aggregate_class=AggrText) - self.assertEqual(cm.filename, __file__) class AuthorizerTests(unittest.TestCase): @@ -800,16 +766,9 @@ class AuthorizerTests(unittest.TestCase): self.con.execute("select c2 from t1") def test_authorizer_keyword_args(self): - regex = ( - r"Passing keyword argument 'authorizer_callback' to " - r"_sqlite3.Connection.set_authorizer\(\) is deprecated. " - r"Parameter 'authorizer_callback' will become positional-only in " - r"Python 3.15." - ) - - with self.assertWarnsRegex(DeprecationWarning, regex) as cm: + with self.assertRaisesRegex(TypeError, + 'takes exactly 1 positional argument'): self.con.set_authorizer(authorizer_callback=lambda: None) - self.assertEqual(cm.filename, __file__) class AuthorizerRaiseExceptionTests(AuthorizerTests): diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 395b2ef88ab..06460d6047c 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -4488,6 +4488,7 @@ class ThreadedTests(unittest.TestCase): @requires_tls_version('TLSv1_3') @unittest.skipUnless(ssl.HAS_PSK, 'TLS-PSK disabled on this OpenSSL build') + @unittest.skipUnless(ssl.HAS_PSK_TLS13, 'TLS 1.3 PSK disabled on this OpenSSL build') def test_psk_tls1_3(self): psk = bytes.fromhex('deadbeef') identity_hint = 'identity-hint' diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py index 0ee17849e28..0eccf03a1a9 100644 --- a/Lib/test/test_syntax.py +++ b/Lib/test/test_syntax.py @@ -419,7 +419,7 @@ SyntaxError: invalid syntax >>> def foo(/,a,b=,c): ... pass Traceback (most recent call last): -SyntaxError: at least one argument must precede / +SyntaxError: at least one parameter must precede / >>> def foo(a,/,/,b,c): ... pass @@ -454,67 +454,67 @@ SyntaxError: / must be ahead of * >>> def foo(a,*b=3,c): ... pass Traceback (most recent call last): -SyntaxError: var-positional argument cannot have default value +SyntaxError: var-positional parameter cannot have default value >>> def foo(a,*b: int=,c): ... pass Traceback (most recent call last): -SyntaxError: var-positional argument cannot have default value +SyntaxError: var-positional parameter cannot have default value >>> def foo(a,**b=3): ... pass Traceback (most recent call last): -SyntaxError: var-keyword argument cannot have default value +SyntaxError: var-keyword parameter cannot have default value >>> def foo(a,**b: int=3): ... pass Traceback (most recent call last): -SyntaxError: var-keyword argument cannot have default value +SyntaxError: var-keyword parameter cannot have default value >>> def foo(a,*a, b, **c, d): ... pass Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> def foo(a,*a, b, **c, d=4): ... pass Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> def foo(a,*a, b, **c, *d): ... pass Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> def foo(a,*a, b, **c, **d): ... pass Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> def foo(a=1,/,**b,/,c): ... pass Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> def foo(*b,*d): ... pass Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> def foo(a,*b,c,*d,*e,c): ... pass Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> def foo(a,b,/,c,*b,c,*d,*e,c): ... pass Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> def foo(a,b,/,c,*b,c,*d,**e): ... pass Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> def foo(a=1,/*,b,c): ... pass @@ -538,7 +538,7 @@ SyntaxError: expected default value expression >>> lambda /,a,b,c: None Traceback (most recent call last): -SyntaxError: at least one argument must precede / +SyntaxError: at least one parameter must precede / >>> lambda a,/,/,b,c: None Traceback (most recent call last): @@ -570,47 +570,47 @@ SyntaxError: expected comma between / and * >>> lambda a,*b=3,c: None Traceback (most recent call last): -SyntaxError: var-positional argument cannot have default value +SyntaxError: var-positional parameter cannot have default value >>> lambda a,**b=3: None Traceback (most recent call last): -SyntaxError: var-keyword argument cannot have default value +SyntaxError: var-keyword parameter cannot have default value >>> lambda a, *a, b, **c, d: None Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> lambda a,*a, b, **c, d=4: None Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> lambda a,*a, b, **c, *d: None Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> lambda a,*a, b, **c, **d: None Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> lambda a=1,/,**b,/,c: None Traceback (most recent call last): -SyntaxError: arguments cannot follow var-keyword argument +SyntaxError: parameters cannot follow var-keyword parameter >>> lambda *b,*d: None Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> lambda a,*b,c,*d,*e,c: None Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> lambda a,b,/,c,*b,c,*d,*e,c: None Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> lambda a,b,/,c,*b,c,*d,**e: None Traceback (most recent call last): -SyntaxError: * argument may appear only once +SyntaxError: * may appear only once >>> lambda a=1,d=,c: None Traceback (most recent call last): @@ -1304,7 +1304,7 @@ Missing parens after function definition Traceback (most recent call last): SyntaxError: expected '(' -Parenthesized arguments in function definitions +Parenthesized parameters in function definitions >>> def f(x, (y, z), w): ... pass @@ -2178,7 +2178,7 @@ Corner-cases that used to fail to raise the correct error: >>> with (lambda *:0): pass Traceback (most recent call last): - SyntaxError: named arguments must follow bare * + SyntaxError: named parameters must follow bare * Corner-cases that used to crash: diff --git a/Lib/test/test_threadedtempfile.py b/Lib/test/test_threadedtempfile.py index 420fc6ec8be..acb427b0c78 100644 --- a/Lib/test/test_threadedtempfile.py +++ b/Lib/test/test_threadedtempfile.py @@ -15,6 +15,7 @@ provoking a 2.0 failure under Linux. import tempfile +from test import support from test.support import threading_helper import unittest import io @@ -49,7 +50,8 @@ class TempFileGreedy(threading.Thread): class ThreadedTempFileTest(unittest.TestCase): - def test_main(self): + @support.bigmemtest(size=NUM_THREADS, memuse=60*2**20, dry_run=False) + def test_main(self, size): threads = [TempFileGreedy() for i in range(NUM_THREADS)] with threading_helper.start_threads(threads, startEvent.set): pass diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 4ab38c2598b..abe63c10c0a 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -530,7 +530,8 @@ class ThreadTests(BaseTestCase): finally: sys.setswitchinterval(old_interval) - def test_join_from_multiple_threads(self): + @support.bigmemtest(size=20, memuse=72*2**20, dry_run=False) + def test_join_from_multiple_threads(self, size): # Thread.join() should be thread-safe errors = [] @@ -1431,7 +1432,8 @@ class ThreadJoinOnShutdown(BaseTestCase): self._run_and_join(script) @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_4_daemon_threads(self): + @support.bigmemtest(size=40, memuse=70*2**20, dry_run=False) + def test_4_daemon_threads(self, size): # Check that a daemon thread cannot crash the interpreter on shutdown # by manipulating internal structures that are being disposed of in # the main thread. diff --git a/Lib/test/test_unparse.py b/Lib/test/test_unparse.py index d3af7a8489e..d4db5e60af7 100644 --- a/Lib/test/test_unparse.py +++ b/Lib/test/test_unparse.py @@ -817,6 +817,15 @@ class CosmeticTestCase(ASTTestCase): self.check_ast_roundtrip("def f[T: int = int, **P = int, *Ts = *int]():\n pass") self.check_ast_roundtrip("class C[T: int = int, **P = int, *Ts = *int]():\n pass") + def test_tstr(self): + self.check_ast_roundtrip("t'{a + b}'") + self.check_ast_roundtrip("t'{a + b:x}'") + self.check_ast_roundtrip("t'{a + b!s}'") + self.check_ast_roundtrip("t'{ {a}}'") + self.check_ast_roundtrip("t'{ {a}=}'") + self.check_ast_roundtrip("t'{{a}}'") + self.check_ast_roundtrip("t''") + class ManualASTCreationTestCase(unittest.TestCase): """Test that AST nodes created without a type_params field unparse correctly.""" diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py index 5fe9d688410..8f277952007 100644 --- a/Lib/test/test_xml_etree.py +++ b/Lib/test/test_xml_etree.py @@ -2960,6 +2960,50 @@ class BadElementTest(ElementTestCase, unittest.TestCase): del b gc_collect() + def test_deepcopy_clear(self): + # Prevent crashes when __deepcopy__() clears the children list. + # See https://github.com/python/cpython/issues/133009. + class X(ET.Element): + def __deepcopy__(self, memo): + root.clear() + return self + + root = ET.Element('a') + evil = X('x') + root.extend([evil, ET.Element('y')]) + if is_python_implementation(): + # Mutating a list over which we iterate raises an error. + self.assertRaises(RuntimeError, copy.deepcopy, root) + else: + c = copy.deepcopy(root) + # In the C implementation, we can still copy the evil element. + self.assertListEqual(list(c), [evil]) + + def test_deepcopy_grow(self): + # Prevent crashes when __deepcopy__() mutates the children list. + # See https://github.com/python/cpython/issues/133009. + a = ET.Element('a') + b = ET.Element('b') + c = ET.Element('c') + + class X(ET.Element): + def __deepcopy__(self, memo): + root.append(a) + root.append(b) + return self + + root = ET.Element('top') + evil1, evil2 = X('1'), X('2') + root.extend([evil1, c, evil2]) + children = list(copy.deepcopy(root)) + # mock deep copies + self.assertIs(children[0], evil1) + self.assertIs(children[2], evil2) + # true deep copies + self.assertEqual(children[1].tag, c.tag) + self.assertEqual([c.tag for c in children[3:]], + [a.tag, b.tag, a.tag, b.tag]) + class MutationDeleteElementPath(str): def __new__(cls, elem, *args): diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index f4a25376e52..713294c4c27 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -281,7 +281,7 @@ class CompressorTestCase(unittest.TestCase): with self.assertRaisesRegex(ZstdError, (r'Error when setting zstd compression parameter "window_log", ' r'it should \d+ <= value <= \d+, provided value is 100\. ' - r'\(zstd v\d\.\d\.\d, (?:32|64)-bit build\)')): + r'\((?:32|64)-bit build\)')): compress(b'', options=option) def test_unknown_compression_parameter(self): @@ -413,7 +413,7 @@ class DecompressorTestCase(unittest.TestCase): with self.assertRaisesRegex(ZstdError, (r'Error when setting zstd decompression parameter "window_log_max", ' r'it should \d+ <= value <= \d+, provided value is 100\. ' - r'\(zstd v\d\.\d\.\d, (?:32|64)-bit build\)')): + r'\((?:32|64)-bit build\)')): decompress(b'', options=options) def test_unknown_decompression_parameter(self): @@ -1174,43 +1174,43 @@ class ZstdDictTestCase(unittest.TestCase): def test_train_dict_c(self): # argument wrong type with self.assertRaises(TypeError): - _zstd._train_dict({}, (), 100) + _zstd.train_dict({}, (), 100) with self.assertRaises(TypeError): - _zstd._train_dict(b'', 99, 100) + _zstd.train_dict(b'', 99, 100) with self.assertRaises(TypeError): - _zstd._train_dict(b'', (), 100.1) + _zstd.train_dict(b'', (), 100.1) # size > size_t with self.assertRaises(ValueError): - _zstd._train_dict(b'', (2**64+1,), 100) + _zstd.train_dict(b'', (2**64+1,), 100) # dict_size <= 0 with self.assertRaises(ValueError): - _zstd._train_dict(b'', (), 0) + _zstd.train_dict(b'', (), 0) def test_finalize_dict_c(self): with self.assertRaises(TypeError): - _zstd._finalize_dict(1, 2, 3, 4, 5) + _zstd.finalize_dict(1, 2, 3, 4, 5) # argument wrong type with self.assertRaises(TypeError): - _zstd._finalize_dict({}, b'', (), 100, 5) + _zstd.finalize_dict({}, b'', (), 100, 5) with self.assertRaises(TypeError): - _zstd._finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5) with self.assertRaises(TypeError): - _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5) with self.assertRaises(TypeError): - _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5) with self.assertRaises(TypeError): - _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1) # size > size_t with self.assertRaises(ValueError): - _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5) # dict_size <= 0 with self.assertRaises(ValueError): - _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5) def test_train_buffer_protocol_samples(self): def _nbytes(dat): @@ -1232,19 +1232,19 @@ class ZstdDictTestCase(unittest.TestCase): # wrong size list with self.assertRaisesRegex(ValueError, "The samples size tuple doesn't match the concatenation's size"): - _zstd._train_dict(concatenation, tuple(wrong_size_lst), 100*_1K) + _zstd.train_dict(concatenation, tuple(wrong_size_lst), 100*_1K) # correct size list - _zstd._train_dict(concatenation, tuple(correct_size_lst), 3*_1K) + _zstd.train_dict(concatenation, tuple(correct_size_lst), 3*_1K) # wrong size list with self.assertRaisesRegex(ValueError, "The samples size tuple doesn't match the concatenation's size"): - _zstd._finalize_dict(TRAINED_DICT.dict_content, + _zstd.finalize_dict(TRAINED_DICT.dict_content, concatenation, tuple(wrong_size_lst), 300*_1K, 5) # correct size list - _zstd._finalize_dict(TRAINED_DICT.dict_content, + _zstd.finalize_dict(TRAINED_DICT.dict_content, concatenation, tuple(correct_size_lst), 300*_1K, 5) def test_as_prefix(self): @@ -1682,10 +1682,10 @@ class FileTestCase(unittest.TestCase): # Trailing data isn't a valid compressed stream with ZstdFile(io.BytesIO(self.FRAME_42 + b'12345')) as f: - self.assertEqual(f.read(), self.DECOMPRESSED_42) + self.assertRaises(ZstdError, f.read) with ZstdFile(io.BytesIO(SKIPPABLE_FRAME + b'12345')) as f: - self.assertEqual(f.read(), b'') + self.assertRaises(ZstdError, f.read) def test_read_truncated(self): # Drop stream epilogue: 4 bytes checksum |