diff options
Diffstat (limited to 'Lib/test')
63 files changed, 1884 insertions, 547 deletions
diff --git a/Lib/test/_code_definitions.py b/Lib/test/_code_definitions.py index 06cf6a10231..0c7b7b03cb3 100644 --- a/Lib/test/_code_definitions.py +++ b/Lib/test/_code_definitions.py @@ -12,6 +12,40 @@ def spam_minimal(): return +def spam_with_builtins(): + x = 42 + values = (42,) + checks = tuple(callable(v) for v in values) + res = callable(values), tuple(values), list(values), checks + print(res) + + +def spam_with_globals_and_builtins(): + func1 = spam + func2 = spam_minimal + funcs = (func1, func2) + checks = tuple(callable(f) for f in funcs) + res = callable(funcs), tuple(funcs), list(funcs), checks + print(res) + + +def spam_returns_arg(x): + return x + + +def spam_with_inner_not_closure(): + def eggs(): + pass + eggs() + + +def spam_with_inner_closure(): + x = 42 + def eggs(): + print(x) + eggs() + + def spam_full(a, b, /, c, d:int=1, *args, e, f:object=None, **kwargs) -> tuple: # arg defaults, kwarg defaults # annotations @@ -98,6 +132,11 @@ ham_C_closure, *_ = eggs_closure_C(2) TOP_FUNCTIONS = [ # shallow spam_minimal, + spam_with_builtins, + spam_with_globals_and_builtins, + spam_returns_arg, + spam_with_inner_not_closure, + spam_with_inner_closure, spam_full, spam, # outer func @@ -127,6 +166,30 @@ FUNCTIONS = [ *NESTED_FUNCTIONS, ] +STATELESS_FUNCTIONS = [ + spam, + spam_minimal, + spam_with_builtins, + spam_returns_arg, + spam_with_inner_not_closure, + spam_with_inner_closure, + spam_N, + spam_C, + spam_NN, + spam_NC, + spam_CN, + spam_CC, + eggs_nested, + eggs_nested_N, + ham_nested, + ham_C_nested +] +STATELESS_CODE = [ + *STATELESS_FUNCTIONS, + spam_with_globals_and_builtins, + spam_full, +] + # generators diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py index c0346aa934d..c3d1f60a400 100644 --- a/Lib/test/libregrtest/setup.py +++ b/Lib/test/libregrtest/setup.py @@ -40,7 +40,7 @@ def setup_process() -> None: faulthandler.enable(all_threads=True, file=stderr_fd) # Display the Python traceback on SIGALRM or SIGUSR1 signal - signals = [] + signals: list[signal.Signals] = [] if hasattr(signal, 'SIGALRM'): signals.append(signal.SIGALRM) if hasattr(signal, 'SIGUSR1'): diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index c4a1506c9a7..63a2e427d18 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -335,43 +335,11 @@ def get_build_info(): build.append('with_assert') # --enable-experimental-jit - tier2 = re.search('-D_Py_TIER2=([0-9]+)', cflags) - if tier2: - tier2 = int(tier2.group(1)) - - if not sys.flags.ignore_environment: - PYTHON_JIT = os.environ.get('PYTHON_JIT', None) - if PYTHON_JIT: - PYTHON_JIT = (PYTHON_JIT != '0') - else: - PYTHON_JIT = None - - if tier2 == 1: # =yes - if PYTHON_JIT == False: - jit = 'JIT=off' - else: - jit = 'JIT' - elif tier2 == 3: # =yes-off - if PYTHON_JIT: - jit = 'JIT' + if sys._jit.is_available(): + if sys._jit.is_enabled(): + build.append("JIT") else: - jit = 'JIT=off' - elif tier2 == 4: # =interpreter - if PYTHON_JIT == False: - jit = 'JIT-interpreter=off' - else: - jit = 'JIT-interpreter' - elif tier2 == 6: # =interpreter-off (Secret option!) - if PYTHON_JIT: - jit = 'JIT-interpreter' - else: - jit = 'JIT-interpreter=off' - elif '-D_Py_JIT' in cflags: - jit = 'JIT' - else: - jit = None - if jit: - build.append(jit) + build.append("JIT (disabled)") # --enable-framework=name framework = sysconfig.get_config_var('PYTHONFRAMEWORK') diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 24984ad81ff..041f1250003 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -2648,13 +2648,9 @@ skip_on_s390x = unittest.skipIf(is_s390x, 'skipped on s390x') Py_TRACE_REFS = hasattr(sys, 'getobjects') -try: - from _testinternalcapi import jit_enabled -except ImportError: - requires_jit_enabled = requires_jit_disabled = unittest.skip("requires _testinternalcapi") -else: - requires_jit_enabled = unittest.skipUnless(jit_enabled(), "requires JIT enabled") - requires_jit_disabled = unittest.skipIf(jit_enabled(), "requires JIT disabled") +_JIT_ENABLED = sys._jit.is_enabled() +requires_jit_enabled = unittest.skipUnless(_JIT_ENABLED, "requires JIT enabled") +requires_jit_disabled = unittest.skipIf(_JIT_ENABLED, "requires JIT disabled") _BASE_COPY_SRC_DIR_IGNORED_NAMES = frozenset({ @@ -2855,36 +2851,59 @@ def iter_slot_wrappers(cls): @contextlib.contextmanager -def no_color(): +def force_color(color: bool): import _colorize from .os_helper import EnvironmentVarGuard with ( - swap_attr(_colorize, "can_colorize", lambda file=None: False), + swap_attr(_colorize, "can_colorize", lambda file=None: color), EnvironmentVarGuard() as env, ): env.unset("FORCE_COLOR", "NO_COLOR", "PYTHON_COLORS") - env.set("NO_COLOR", "1") + env.set("FORCE_COLOR" if color else "NO_COLOR", "1") yield +def force_colorized(func): + """Force the terminal to be colorized.""" + @functools.wraps(func) + def wrapper(*args, **kwargs): + with force_color(True): + return func(*args, **kwargs) + return wrapper + + def force_not_colorized(func): - """Force the terminal not to be colorized.""" + """Force the terminal NOT to be colorized.""" @functools.wraps(func) def wrapper(*args, **kwargs): - with no_color(): + with force_color(False): return func(*args, **kwargs) return wrapper +def force_colorized_test_class(cls): + """Force the terminal to be colorized for the entire test class.""" + original_setUpClass = cls.setUpClass + + @classmethod + @functools.wraps(cls.setUpClass) + def new_setUpClass(cls): + cls.enterClassContext(force_color(True)) + original_setUpClass() + + cls.setUpClass = new_setUpClass + return cls + + def force_not_colorized_test_class(cls): - """Force the terminal not to be colorized for the entire test class.""" + """Force the terminal NOT to be colorized for the entire test class.""" original_setUpClass = cls.setUpClass @classmethod @functools.wraps(cls.setUpClass) def new_setUpClass(cls): - cls.enterClassContext(no_color()) + cls.enterClassContext(force_color(False)) original_setUpClass() cls.setUpClass = new_setUpClass diff --git a/Lib/test/test_annotationlib.py b/Lib/test/test_annotationlib.py index 13c6a2a584b..c3c245ddaf8 100644 --- a/Lib/test/test_annotationlib.py +++ b/Lib/test/test_annotationlib.py @@ -1053,6 +1053,21 @@ class TestGetAnnotations(unittest.TestCase): }, ) + def test_partial_evaluation_error(self): + def f(x: range[1]): + pass + with self.assertRaisesRegex( + TypeError, "type 'range' is not subscriptable" + ): + f.__annotations__ + + self.assertEqual( + get_annotations(f, format=Format.FORWARDREF), + { + "x": support.EqualToForwardRef("range[1]", owner=f), + }, + ) + def test_partial_evaluation_cell(self): obj = object() diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py index c5a1f31aa52..5a6be1180c1 100644 --- a/Lib/test/test_argparse.py +++ b/Lib/test/test_argparse.py @@ -7058,7 +7058,7 @@ class TestColorized(TestCase): super().setUp() # Ensure color even if ran with NO_COLOR=1 _colorize.can_colorize = lambda *args, **kwargs: True - self.ansi = _colorize.ANSIColors() + self.theme = _colorize.get_theme(force_color=True).argparse def test_argparse_color(self): # Arrange: create a parser with a bit of everything @@ -7120,13 +7120,17 @@ class TestColorized(TestCase): sub2 = subparsers.add_parser("sub2", deprecated=True, help="sub2 help") sub2.add_argument("--baz", choices=("X", "Y", "Z"), help="baz help") - heading = self.ansi.BOLD_BLUE - label, label_b = self.ansi.YELLOW, self.ansi.BOLD_YELLOW - long, long_b = self.ansi.CYAN, self.ansi.BOLD_CYAN - pos, pos_b = short, short_b = self.ansi.GREEN, self.ansi.BOLD_GREEN - sub = self.ansi.BOLD_GREEN - prog = self.ansi.BOLD_MAGENTA - reset = self.ansi.RESET + prog = self.theme.prog + heading = self.theme.heading + long = self.theme.summary_long_option + short = self.theme.summary_short_option + label = self.theme.summary_label + pos = self.theme.summary_action + long_b = self.theme.long_option + short_b = self.theme.short_option + label_b = self.theme.label + pos_b = self.theme.action + reset = self.theme.reset # Act help_text = parser.format_help() @@ -7171,9 +7175,9 @@ class TestColorized(TestCase): {heading}subcommands:{reset} valid subcommands - {sub}{{sub1,sub2}}{reset} additional help - {sub}sub1{reset} sub1 help - {sub}sub2{reset} sub2 help + {pos_b}{{sub1,sub2}}{reset} additional help + {pos_b}sub1{reset} sub1 help + {pos_b}sub2{reset} sub2 help """ ), ) @@ -7187,10 +7191,10 @@ class TestColorized(TestCase): prog="PROG", usage="[prefix] %(prog)s [suffix]", ) - heading = self.ansi.BOLD_BLUE - prog = self.ansi.BOLD_MAGENTA - reset = self.ansi.RESET - usage = self.ansi.MAGENTA + heading = self.theme.heading + prog = self.theme.prog + reset = self.theme.reset + usage = self.theme.prog_extra # Act help_text = parser.format_help() diff --git a/Lib/test/test_asdl_parser.py b/Lib/test/test_asdl_parser.py index 2c198a6b8b2..b9df6568123 100644 --- a/Lib/test/test_asdl_parser.py +++ b/Lib/test/test_asdl_parser.py @@ -62,17 +62,17 @@ class TestAsdlParser(unittest.TestCase): alias = self.types['alias'] self.assertEqual( str(alias), - 'Product([Field(identifier, name), Field(identifier, asname, opt=True)], ' + 'Product([Field(identifier, name), Field(identifier, asname, quantifiers=[OPTIONAL])], ' '[Field(int, lineno), Field(int, col_offset), ' - 'Field(int, end_lineno, opt=True), Field(int, end_col_offset, opt=True)])') + 'Field(int, end_lineno, quantifiers=[OPTIONAL]), Field(int, end_col_offset, quantifiers=[OPTIONAL])])') def test_attributes(self): stmt = self.types['stmt'] self.assertEqual(len(stmt.attributes), 4) self.assertEqual(repr(stmt.attributes[0]), 'Field(int, lineno)') self.assertEqual(repr(stmt.attributes[1]), 'Field(int, col_offset)') - self.assertEqual(repr(stmt.attributes[2]), 'Field(int, end_lineno, opt=True)') - self.assertEqual(repr(stmt.attributes[3]), 'Field(int, end_col_offset, opt=True)') + self.assertEqual(repr(stmt.attributes[2]), 'Field(int, end_lineno, quantifiers=[OPTIONAL])') + self.assertEqual(repr(stmt.attributes[3]), 'Field(int, end_col_offset, quantifiers=[OPTIONAL])') def test_constructor_fields(self): ehandler = self.types['excepthandler'] diff --git a/Lib/test/test_ast/test_ast.py b/Lib/test/test_ast/test_ast.py index 6a9b7812ef6..09cf3186e05 100644 --- a/Lib/test/test_ast/test_ast.py +++ b/Lib/test/test_ast/test_ast.py @@ -26,6 +26,7 @@ from test import support from test.support import os_helper from test.support import skip_emscripten_stack_overflow, skip_wasi_stack_overflow from test.support.ast_helper import ASTTestMixin +from test.support.import_helper import ensure_lazy_imports from test.test_ast.utils import to_tuple from test.test_ast.snippets import ( eval_tests, eval_results, exec_tests, exec_results, single_tests, single_results @@ -47,6 +48,12 @@ def ast_repr_update_snapshots() -> None: AST_REPR_DATA_FILE.write_text("\n".join(data)) +class LazyImportTest(unittest.TestCase): + @support.cpython_only + def test_lazy_import(self): + ensure_lazy_imports("ast", {"contextlib", "enum", "inspect", "re", "collections", "argparse"}) + + class AST_Tests(unittest.TestCase): maxDiff = None @@ -3272,6 +3279,9 @@ class CommandLineTests(unittest.TestCase): ('--no-type-comments', '--no-type-comments'), ('-a', '--include-attributes'), ('-i=4', '--indent=4'), + ('--feature-version=3.13', '--feature-version=3.13'), + ('-O=-1', '--optimize=-1'), + ('--show-empty', '--show-empty'), ) self.set_source(''' print(1, 2, 3) @@ -3286,6 +3296,7 @@ class CommandLineTests(unittest.TestCase): with self.subTest(flags=args): self.invoke_ast(*args) + @support.force_not_colorized def test_help_message(self): for flag in ('-h', '--help', '--unknown'): with self.subTest(flag=flag): @@ -3389,7 +3400,7 @@ class CommandLineTests(unittest.TestCase): self.check_output(source, expect, flag) def test_indent_flag(self): - # test 'python -m ast -i/--indent' + # test 'python -m ast -i/--indent 0' source = 'pass' expect = ''' Module( @@ -3400,6 +3411,96 @@ class CommandLineTests(unittest.TestCase): with self.subTest(flag=flag): self.check_output(source, expect, flag) + def test_feature_version_flag(self): + # test 'python -m ast --feature-version 3.9/3.10' + source = ''' + match x: + case 1: + pass + ''' + expect = ''' + Module( + body=[ + Match( + subject=Name(id='x', ctx=Load()), + cases=[ + match_case( + pattern=MatchValue( + value=Constant(value=1)), + body=[ + Pass()])])]) + ''' + self.check_output(source, expect, '--feature-version=3.10') + with self.assertRaises(SyntaxError): + self.invoke_ast('--feature-version=3.9') + + def test_no_optimize_flag(self): + # test 'python -m ast -O/--optimize -1/0' + source = ''' + match a: + case 1+2j: + pass + ''' + expect = ''' + Module( + body=[ + Match( + subject=Name(id='a', ctx=Load()), + cases=[ + match_case( + pattern=MatchValue( + value=BinOp( + left=Constant(value=1), + op=Add(), + right=Constant(value=2j))), + body=[ + Pass()])])]) + ''' + for flag in ('-O=-1', '--optimize=-1', '-O=0', '--optimize=0'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) + + def test_optimize_flag(self): + # test 'python -m ast -O/--optimize 1/2' + source = ''' + match a: + case 1+2j: + pass + ''' + expect = ''' + Module( + body=[ + Match( + subject=Name(id='a', ctx=Load()), + cases=[ + match_case( + pattern=MatchValue( + value=Constant(value=(1+2j))), + body=[ + Pass()])])]) + ''' + for flag in ('-O=1', '--optimize=1', '-O=2', '--optimize=2'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) + + def test_show_empty_flag(self): + # test 'python -m ast --show-empty' + source = 'print(1, 2, 3)' + expect = ''' + Module( + body=[ + Expr( + value=Call( + func=Name(id='print', ctx=Load()), + args=[ + Constant(value=1), + Constant(value=2), + Constant(value=3)], + keywords=[]))], + type_ignores=[]) + ''' + self.check_output(source, expect, '--show-empty') + class ASTOptimiziationTests(unittest.TestCase): def wrap_expr(self, expr): diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index a2fb1022ae4..9f3b6f9acef 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -263,6 +263,24 @@ class EagerTaskFactoryLoopTests: self.run_coro(run()) + def test_eager_start_false(self): + name = None + + async def asyncfn(): + nonlocal name + name = asyncio.current_task().get_name() + + async def main(): + t = asyncio.get_running_loop().create_task( + asyncfn(), eager_start=False, name="example" + ) + self.assertFalse(t.done()) + self.assertIsNone(name) + await t + self.assertEqual(name, "example") + + self.run_coro(main()) + class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase): Task = tasks._PyTask @@ -505,5 +523,24 @@ class EagerCTaskTests(BaseEagerTaskFactoryTests, test_utils.TestCase): asyncio.current_task = asyncio.tasks.current_task = self._current_task return super().tearDown() + +class DefaultTaskFactoryEagerStart(test_utils.TestCase): + def test_eager_start_true_with_default_factory(self): + name = None + + async def asyncfn(): + nonlocal name + name = asyncio.current_task().get_name() + + async def main(): + t = asyncio.get_running_loop().create_task( + asyncfn(), eager_start=True, name="example" + ) + self.assertTrue(t.done()) + self.assertEqual(name, "example") + await t + + asyncio.run(main(), loop_factory=asyncio.EventLoop) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 8d7f1733454..44498ef790e 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -89,8 +89,8 @@ class BaseTaskTests: Future = None all_tasks = None - def new_task(self, loop, coro, name='TestTask', context=None): - return self.__class__.Task(coro, loop=loop, name=name, context=context) + def new_task(self, loop, coro, name='TestTask', context=None, eager_start=None): + return self.__class__.Task(coro, loop=loop, name=name, context=context, eager_start=eager_start) def new_future(self, loop): return self.__class__.Future(loop=loop) @@ -2686,6 +2686,35 @@ class BaseTaskTests: self.assertEqual([None, 1, 2], ret) + def test_eager_start_true(self): + name = None + + async def asyncfn(): + nonlocal name + name = self.current_task().get_name() + + async def main(): + t = self.new_task(coro=asyncfn(), loop=asyncio.get_running_loop(), eager_start=True, name="example") + self.assertTrue(t.done()) + self.assertEqual(name, "example") + await t + + def test_eager_start_false(self): + name = None + + async def asyncfn(): + nonlocal name + name = self.current_task().get_name() + + async def main(): + t = self.new_task(coro=asyncfn(), loop=asyncio.get_running_loop(), eager_start=False, name="example") + self.assertFalse(t.done()) + self.assertIsNone(name) + await t + self.assertEqual(name, "example") + + asyncio.run(main(), loop_factory=asyncio.EventLoop) + def test_get_coro(self): loop = asyncio.new_event_loop() coro = coroutine_function() diff --git a/Lib/test/test_base64.py b/Lib/test/test_base64.py index 409c8c109e8..9efebc43d91 100644 --- a/Lib/test/test_base64.py +++ b/Lib/test/test_base64.py @@ -3,8 +3,16 @@ import base64 import binascii import os from array import array +from test.support import cpython_only from test.support import os_helper from test.support import script_helper +from test.support.import_helper import ensure_lazy_imports + + +class LazyImportTest(unittest.TestCase): + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("base64", {"re", "getopt"}) class LegacyBase64TestCase(unittest.TestCase): diff --git a/Lib/test/test_calendar.py b/Lib/test/test_calendar.py index 073df310bb4..cbfee604b7a 100644 --- a/Lib/test/test_calendar.py +++ b/Lib/test/test_calendar.py @@ -987,6 +987,7 @@ class CommandLineTestCase(unittest.TestCase): self.assertCLIFails(*args) self.assertCmdFails(*args) + @support.force_not_colorized def test_help(self): stdout = self.run_cmd_ok('-h') self.assertIn(b'usage:', stdout) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index 98dc3b42ef0..a597f23a992 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -306,7 +306,7 @@ class CAPITest(unittest.TestCase): CURRENT_THREAD_REGEX + r' File .*, line 6 in <module>\n' r'\n' - r'Extension modules: _testcapi, _testinternalcapi \(total: 2\)\n') + r'Extension modules: _testcapi \(total: 1\)\n') else: # Python built with NDEBUG macro defined: # test _Py_CheckFunctionResult() instead. diff --git a/Lib/test/test_capi/test_object.py b/Lib/test/test_capi/test_object.py index 54a01ac7c4a..127862546b1 100644 --- a/Lib/test/test_capi/test_object.py +++ b/Lib/test/test_capi/test_object.py @@ -174,6 +174,16 @@ class EnableDeferredRefcountingTest(unittest.TestCase): self.assertTrue(_testinternalcapi.has_deferred_refcount(silly_list)) +class IsUniquelyReferencedTest(unittest.TestCase): + """Test PyUnstable_Object_IsUniquelyReferenced""" + def test_is_uniquely_referenced(self): + self.assertTrue(_testcapi.is_uniquely_referenced(object())) + self.assertTrue(_testcapi.is_uniquely_referenced([])) + # Immortals + self.assertFalse(_testcapi.is_uniquely_referenced("spanish inquisition")) + self.assertFalse(_testcapi.is_uniquely_referenced(42)) + # CRASHES is_uniquely_referenced(NULL) + class CAPITest(unittest.TestCase): def check_negative_refcount(self, code): # bpo-35059: Check that Py_DECREF() reports the correct filename diff --git a/Lib/test/test_capi/test_opt.py b/Lib/test/test_capi/test_opt.py index 7e0c60d5522..ba7bcb4540a 100644 --- a/Lib/test/test_capi/test_opt.py +++ b/Lib/test/test_capi/test_opt.py @@ -1919,9 +1919,11 @@ class TestUopsOptimization(unittest.TestCase): _, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) uops = get_opnames(ex) + self.assertNotIn("_GUARD_NOS_NULL", uops) + self.assertNotIn("_GUARD_CALLABLE_LEN", uops) + self.assertIn("_CALL_LEN", uops) self.assertNotIn("_GUARD_NOS_INT", uops) self.assertNotIn("_GUARD_TOS_INT", uops) - self.assertIn("_CALL_LEN", uops) def test_binary_op_subscr_tuple_int(self): def testfunc(n): diff --git a/Lib/test/test_cmd.py b/Lib/test/test_cmd.py index 0ae44f3987d..dbfec42fc21 100644 --- a/Lib/test/test_cmd.py +++ b/Lib/test/test_cmd.py @@ -11,9 +11,15 @@ import unittest import io import textwrap from test import support -from test.support.import_helper import import_module +from test.support.import_helper import ensure_lazy_imports, import_module from test.support.pty_helper import run_pty +class LazyImportTest(unittest.TestCase): + @support.cpython_only + def test_lazy_import(self): + ensure_lazy_imports("cmd", {"inspect", "string"}) + + class samplecmdclass(cmd.Cmd): """ Instance the sampleclass: diff --git a/Lib/test/test_code.py b/Lib/test/test_code.py index 7cf09ee7847..e66d7283b18 100644 --- a/Lib/test/test_code.py +++ b/Lib/test/test_code.py @@ -220,6 +220,7 @@ try: import _testinternalcapi except ModuleNotFoundError: _testinternalcapi = None +import test._code_definitions as defs COPY_FREE_VARS = opmap['COPY_FREE_VARS'] @@ -671,9 +672,31 @@ class CodeTest(unittest.TestCase): VARARGS = CO_FAST_LOCAL | CO_FAST_ARG_VAR | CO_FAST_ARG_POS VARKWARGS = CO_FAST_LOCAL | CO_FAST_ARG_VAR | CO_FAST_ARG_KW - import test._code_definitions as defs funcs = { defs.spam_minimal: {}, + defs.spam_with_builtins: { + 'x': CO_FAST_LOCAL, + 'values': CO_FAST_LOCAL, + 'checks': CO_FAST_LOCAL, + 'res': CO_FAST_LOCAL, + }, + defs.spam_with_globals_and_builtins: { + 'func1': CO_FAST_LOCAL, + 'func2': CO_FAST_LOCAL, + 'funcs': CO_FAST_LOCAL, + 'checks': CO_FAST_LOCAL, + 'res': CO_FAST_LOCAL, + }, + defs.spam_returns_arg: { + 'x': POSORKW, + }, + defs.spam_with_inner_not_closure: { + 'eggs': CO_FAST_LOCAL, + }, + defs.spam_with_inner_closure: { + 'x': CO_FAST_CELL, + 'eggs': CO_FAST_LOCAL, + }, defs.spam_full: { 'a': POSONLY, 'b': POSONLY, @@ -777,6 +800,268 @@ class CodeTest(unittest.TestCase): kinds = _testinternalcapi.get_co_localskinds(func.__code__) self.assertEqual(kinds, expected) + @unittest.skipIf(_testinternalcapi is None, "missing _testinternalcapi") + def test_var_counts(self): + self.maxDiff = None + def new_var_counts(*, + posonly=0, + posorkw=0, + kwonly=0, + varargs=0, + varkwargs=0, + purelocals=0, + argcells=0, + othercells=0, + freevars=0, + globalvars=0, + attrs=0, + unknown=0, + ): + nargvars = posonly + posorkw + kwonly + varargs + varkwargs + nlocals = nargvars + purelocals + othercells + if isinstance(globalvars, int): + globalvars = { + 'total': globalvars, + 'numglobal': 0, + 'numbuiltin': 0, + 'numunknown': globalvars, + } + else: + g_numunknown = 0 + if isinstance(globalvars, dict): + numglobal = globalvars['numglobal'] + numbuiltin = globalvars['numbuiltin'] + size = 2 + if 'numunknown' in globalvars: + g_numunknown = globalvars['numunknown'] + size += 1 + assert len(globalvars) == size, globalvars + else: + assert not isinstance(globalvars, str), repr(globalvars) + try: + numglobal, numbuiltin = globalvars + except ValueError: + numglobal, numbuiltin, g_numunknown = globalvars + globalvars = { + 'total': numglobal + numbuiltin + g_numunknown, + 'numglobal': numglobal, + 'numbuiltin': numbuiltin, + 'numunknown': g_numunknown, + } + unbound = globalvars['total'] + attrs + unknown + return { + 'total': nlocals + freevars + unbound, + 'locals': { + 'total': nlocals, + 'args': { + 'total': nargvars, + 'numposonly': posonly, + 'numposorkw': posorkw, + 'numkwonly': kwonly, + 'varargs': varargs, + 'varkwargs': varkwargs, + }, + 'numpure': purelocals, + 'cells': { + 'total': argcells + othercells, + 'numargs': argcells, + 'numothers': othercells, + }, + 'hidden': { + 'total': 0, + 'numpure': 0, + 'numcells': 0, + }, + }, + 'numfree': freevars, + 'unbound': { + 'total': unbound, + 'globals': globalvars, + 'numattrs': attrs, + 'numunknown': unknown, + }, + } + + funcs = { + defs.spam_minimal: new_var_counts(), + defs.spam_with_builtins: new_var_counts( + purelocals=4, + globalvars=4, + ), + defs.spam_with_globals_and_builtins: new_var_counts( + purelocals=5, + globalvars=6, + ), + defs.spam_returns_arg: new_var_counts( + posorkw=1, + ), + defs.spam_with_inner_not_closure: new_var_counts( + purelocals=1, + ), + defs.spam_with_inner_closure: new_var_counts( + othercells=1, + purelocals=1, + ), + defs.spam_full: new_var_counts( + posonly=2, + posorkw=2, + kwonly=2, + varargs=1, + varkwargs=1, + purelocals=4, + globalvars=3, + attrs=1, + ), + defs.spam: new_var_counts( + posorkw=1, + ), + defs.spam_N: new_var_counts( + posorkw=1, + purelocals=1, + ), + defs.spam_C: new_var_counts( + posorkw=1, + purelocals=1, + argcells=1, + othercells=1, + ), + defs.spam_NN: new_var_counts( + posorkw=1, + purelocals=1, + ), + defs.spam_NC: new_var_counts( + posorkw=1, + purelocals=1, + argcells=1, + othercells=1, + ), + defs.spam_CN: new_var_counts( + posorkw=1, + purelocals=1, + argcells=1, + othercells=1, + ), + defs.spam_CC: new_var_counts( + posorkw=1, + purelocals=1, + argcells=1, + othercells=1, + ), + defs.eggs_nested: new_var_counts( + posorkw=1, + ), + defs.eggs_closure: new_var_counts( + posorkw=1, + freevars=2, + ), + defs.eggs_nested_N: new_var_counts( + posorkw=1, + purelocals=1, + ), + defs.eggs_nested_C: new_var_counts( + posorkw=1, + purelocals=1, + argcells=1, + freevars=2, + ), + defs.eggs_closure_N: new_var_counts( + posorkw=1, + purelocals=1, + freevars=2, + ), + defs.eggs_closure_C: new_var_counts( + posorkw=1, + purelocals=1, + argcells=1, + othercells=1, + freevars=2, + ), + defs.ham_nested: new_var_counts( + posorkw=1, + ), + defs.ham_closure: new_var_counts( + posorkw=1, + freevars=3, + ), + defs.ham_C_nested: new_var_counts( + posorkw=1, + ), + defs.ham_C_closure: new_var_counts( + posorkw=1, + freevars=4, + ), + } + assert len(funcs) == len(defs.FUNCTIONS), (len(funcs), len(defs.FUNCTIONS)) + for func in defs.FUNCTIONS: + with self.subTest(func): + expected = funcs[func] + counts = _testinternalcapi.get_code_var_counts(func.__code__) + self.assertEqual(counts, expected) + + func = defs.spam_with_globals_and_builtins + with self.subTest(f'{func} code'): + expected = new_var_counts( + purelocals=5, + globalvars=6, + ) + counts = _testinternalcapi.get_code_var_counts(func.__code__) + self.assertEqual(counts, expected) + + with self.subTest(f'{func} with own globals and builtins'): + expected = new_var_counts( + purelocals=5, + globalvars=(2, 4), + ) + counts = _testinternalcapi.get_code_var_counts(func) + self.assertEqual(counts, expected) + + with self.subTest(f'{func} without globals'): + expected = new_var_counts( + purelocals=5, + globalvars=(0, 4, 2), + ) + counts = _testinternalcapi.get_code_var_counts(func, globalsns={}) + self.assertEqual(counts, expected) + + with self.subTest(f'{func} without both'): + expected = new_var_counts( + purelocals=5, + globalvars=6, + ) + counts = _testinternalcapi.get_code_var_counts(func, globalsns={}, + builtinsns={}) + self.assertEqual(counts, expected) + + with self.subTest(f'{func} without builtins'): + expected = new_var_counts( + purelocals=5, + globalvars=(2, 0, 4), + ) + counts = _testinternalcapi.get_code_var_counts(func, builtinsns={}) + self.assertEqual(counts, expected) + + @unittest.skipIf(_testinternalcapi is None, "missing _testinternalcapi") + def test_stateless(self): + self.maxDiff = None + + for func in defs.STATELESS_CODE: + with self.subTest((func, '(code)')): + _testinternalcapi.verify_stateless_code(func.__code__) + for func in defs.STATELESS_FUNCTIONS: + with self.subTest((func, '(func)')): + _testinternalcapi.verify_stateless_code(func) + + for func in defs.FUNCTIONS: + if func not in defs.STATELESS_CODE: + with self.subTest((func, '(code)')): + with self.assertRaises(Exception): + _testinternalcapi.verify_stateless_code(func.__code__) + + if func not in defs.STATELESS_FUNCTIONS: + with self.subTest((func, '(func)')): + with self.assertRaises(Exception): + _testinternalcapi.verify_stateless_code(func) + def isinterned(s): return s is sys.intern(('_' + s + '_')[1:-1]) diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py index 4af8f7f480e..9aace57633b 100644 --- a/Lib/test/test_csv.py +++ b/Lib/test/test_csv.py @@ -10,7 +10,8 @@ import csv import gc import pickle from test import support -from test.support import import_helper, check_disallow_instantiation +from test.support import cpython_only, import_helper, check_disallow_instantiation +from test.support.import_helper import ensure_lazy_imports from itertools import permutations from textwrap import dedent from collections import OrderedDict @@ -1565,6 +1566,10 @@ class MiscTestCase(unittest.TestCase): def test__all__(self): support.check__all__(self, csv, ('csv', '_csv')) + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("csv", {"re"}) + def test_subclassable(self): # issue 44089 class Foo(csv.Error): ... diff --git a/Lib/test/test_ctypes/test_aligned_structures.py b/Lib/test/test_ctypes/test_aligned_structures.py index 0c563ab8055..50b4d729b9d 100644 --- a/Lib/test/test_ctypes/test_aligned_structures.py +++ b/Lib/test/test_ctypes/test_aligned_structures.py @@ -316,6 +316,7 @@ class TestAlignedStructures(unittest.TestCase, StructCheckMixin): class Main(sbase): _pack_ = 1 + _layout_ = "ms" _fields_ = [ ("a", c_ubyte), ("b", Inner), diff --git a/Lib/test/test_ctypes/test_bitfields.py b/Lib/test/test_ctypes/test_bitfields.py index dc81e752567..518f838219e 100644 --- a/Lib/test/test_ctypes/test_bitfields.py +++ b/Lib/test/test_ctypes/test_bitfields.py @@ -430,6 +430,7 @@ class BitFieldTest(unittest.TestCase, StructCheckMixin): def test_gh_84039(self): class Bad(Structure): _pack_ = 1 + _layout_ = "ms" _fields_ = [ ("a0", c_uint8, 1), ("a1", c_uint8, 1), @@ -443,9 +444,9 @@ class BitFieldTest(unittest.TestCase, StructCheckMixin): ("b1", c_uint16, 12), ] - class GoodA(Structure): _pack_ = 1 + _layout_ = "ms" _fields_ = [ ("a0", c_uint8, 1), ("a1", c_uint8, 1), @@ -460,6 +461,7 @@ class BitFieldTest(unittest.TestCase, StructCheckMixin): class Good(Structure): _pack_ = 1 + _layout_ = "ms" _fields_ = [ ("a", GoodA), ("b0", c_uint16, 4), @@ -475,6 +477,7 @@ class BitFieldTest(unittest.TestCase, StructCheckMixin): def test_gh_73939(self): class MyStructure(Structure): _pack_ = 1 + _layout_ = "ms" _fields_ = [ ("P", c_uint16), ("L", c_uint16, 9), diff --git a/Lib/test/test_ctypes/test_byteswap.py b/Lib/test/test_ctypes/test_byteswap.py index 9f9904282e4..ea5951603f9 100644 --- a/Lib/test/test_ctypes/test_byteswap.py +++ b/Lib/test/test_ctypes/test_byteswap.py @@ -269,6 +269,7 @@ class Test(unittest.TestCase, StructCheckMixin): class S(base): _pack_ = 1 + _layout_ = "ms" _fields_ = [("b", c_byte), ("h", c_short), @@ -296,6 +297,7 @@ class Test(unittest.TestCase, StructCheckMixin): class S(Structure): _pack_ = 1 + _layout_ = "ms" _fields_ = [("b", c_byte), ("h", c_short), diff --git a/Lib/test/test_ctypes/test_generated_structs.py b/Lib/test/test_ctypes/test_generated_structs.py index 9a8102219d8..aa448fad5bb 100644 --- a/Lib/test/test_ctypes/test_generated_structs.py +++ b/Lib/test/test_ctypes/test_generated_structs.py @@ -125,18 +125,21 @@ class Nested(Structure): class Packed1(Structure): _fields_ = [('a', c_int8), ('b', c_int64)] _pack_ = 1 + _layout_ = 'ms' @register() class Packed2(Structure): _fields_ = [('a', c_int8), ('b', c_int64)] _pack_ = 2 + _layout_ = 'ms' @register() class Packed3(Structure): _fields_ = [('a', c_int8), ('b', c_int64)] _pack_ = 4 + _layout_ = 'ms' @register() @@ -155,6 +158,7 @@ class Packed4(Structure): _fields_ = [('a', c_int8), ('b', c_int64)] _pack_ = 8 + _layout_ = 'ms' @register() class X86_32EdgeCase(Structure): @@ -366,6 +370,7 @@ class Example_gh_95496(Structure): @register() class Example_gh_84039_bad(Structure): _pack_ = 1 + _layout_ = 'ms' _fields_ = [("a0", c_uint8, 1), ("a1", c_uint8, 1), ("a2", c_uint8, 1), @@ -380,6 +385,7 @@ class Example_gh_84039_bad(Structure): @register() class Example_gh_84039_good_a(Structure): _pack_ = 1 + _layout_ = 'ms' _fields_ = [("a0", c_uint8, 1), ("a1", c_uint8, 1), ("a2", c_uint8, 1), @@ -392,6 +398,7 @@ class Example_gh_84039_good_a(Structure): @register() class Example_gh_84039_good(Structure): _pack_ = 1 + _layout_ = 'ms' _fields_ = [("a", Example_gh_84039_good_a), ("b0", c_uint16, 4), ("b1", c_uint16, 12)] @@ -399,6 +406,7 @@ class Example_gh_84039_good(Structure): @register() class Example_gh_73939(Structure): _pack_ = 1 + _layout_ = 'ms' _fields_ = [("P", c_uint16), ("L", c_uint16, 9), ("Pro", c_uint16, 1), @@ -419,6 +427,7 @@ class Example_gh_86098(Structure): @register() class Example_gh_86098_pack(Structure): _pack_ = 1 + _layout_ = 'ms' _fields_ = [("a", c_uint8, 8), ("b", c_uint8, 8), ("c", c_uint32, 16)] @@ -528,7 +537,7 @@ def dump_ctype(tp, struct_or_union_tag='', variable_name='', semi=''): pushes.append(f'#pragma pack(push, {pack})') pops.append(f'#pragma pack(pop)') layout = getattr(tp, '_layout_', None) - if layout == 'ms' or pack: + if layout == 'ms': # The 'ms_struct' attribute only works on x86 and PowerPC requires.add( 'defined(MS_WIN32) || (' diff --git a/Lib/test/test_ctypes/test_pep3118.py b/Lib/test/test_ctypes/test_pep3118.py index 06b2ccecade..11a0744f5a8 100644 --- a/Lib/test/test_ctypes/test_pep3118.py +++ b/Lib/test/test_ctypes/test_pep3118.py @@ -81,6 +81,7 @@ class Point(Structure): class PackedPoint(Structure): _pack_ = 2 + _layout_ = 'ms' _fields_ = [("x", c_long), ("y", c_long)] class PointMidPad(Structure): @@ -88,6 +89,7 @@ class PointMidPad(Structure): class PackedPointMidPad(Structure): _pack_ = 2 + _layout_ = 'ms' _fields_ = [("x", c_byte), ("y", c_uint64)] class PointEndPad(Structure): @@ -95,6 +97,7 @@ class PointEndPad(Structure): class PackedPointEndPad(Structure): _pack_ = 2 + _layout_ = 'ms' _fields_ = [("x", c_uint64), ("y", c_byte)] class Point2(Structure): diff --git a/Lib/test/test_ctypes/test_structunion.py b/Lib/test/test_ctypes/test_structunion.py index 8d8b7e5e995..5b21d48d99c 100644 --- a/Lib/test/test_ctypes/test_structunion.py +++ b/Lib/test/test_ctypes/test_structunion.py @@ -11,6 +11,8 @@ from ._support import (_CData, PyCStructType, UnionType, Py_TPFLAGS_DISALLOW_INSTANTIATION, Py_TPFLAGS_IMMUTABLETYPE) from struct import calcsize +import contextlib +from test.support import MS_WINDOWS class StructUnionTestBase: @@ -335,6 +337,22 @@ class StructUnionTestBase: self.assertIn("from_address", dir(type(self.cls))) self.assertIn("in_dll", dir(type(self.cls))) + def test_pack_layout_switch(self): + # Setting _pack_ implicitly sets default layout to MSVC; + # this is deprecated on non-Windows platforms. + if MS_WINDOWS: + warn_context = contextlib.nullcontext() + else: + warn_context = self.assertWarns(DeprecationWarning) + with warn_context: + class X(self.cls): + _pack_ = 1 + # _layout_ missing + _fields_ = [('a', c_int8, 1), ('b', c_int16, 2)] + + # Check MSVC layout (bitfields of different types aren't combined) + self.check_sizeof(X, struct_size=3, union_size=2) + class StructureTestCase(unittest.TestCase, StructUnionTestBase): cls = Structure diff --git a/Lib/test/test_ctypes/test_structures.py b/Lib/test/test_ctypes/test_structures.py index 221319642e8..92d4851d739 100644 --- a/Lib/test/test_ctypes/test_structures.py +++ b/Lib/test/test_ctypes/test_structures.py @@ -25,6 +25,7 @@ class StructureTestCase(unittest.TestCase, StructCheckMixin): _fields_ = [("a", c_byte), ("b", c_longlong)] _pack_ = 1 + _layout_ = 'ms' self.check_struct(X) self.assertEqual(sizeof(X), 9) @@ -34,6 +35,7 @@ class StructureTestCase(unittest.TestCase, StructCheckMixin): _fields_ = [("a", c_byte), ("b", c_longlong)] _pack_ = 2 + _layout_ = 'ms' self.check_struct(X) self.assertEqual(sizeof(X), 10) self.assertEqual(X.b.offset, 2) @@ -45,6 +47,7 @@ class StructureTestCase(unittest.TestCase, StructCheckMixin): _fields_ = [("a", c_byte), ("b", c_longlong)] _pack_ = 4 + _layout_ = 'ms' self.check_struct(X) self.assertEqual(sizeof(X), min(4, longlong_align) + longlong_size) self.assertEqual(X.b.offset, min(4, longlong_align)) @@ -53,27 +56,33 @@ class StructureTestCase(unittest.TestCase, StructCheckMixin): _fields_ = [("a", c_byte), ("b", c_longlong)] _pack_ = 8 + _layout_ = 'ms' self.check_struct(X) self.assertEqual(sizeof(X), min(8, longlong_align) + longlong_size) self.assertEqual(X.b.offset, min(8, longlong_align)) - - d = {"_fields_": [("a", "b"), - ("b", "q")], - "_pack_": -1} - self.assertRaises(ValueError, type(Structure), "X", (Structure,), d) + with self.assertRaises(ValueError): + class X(Structure): + _fields_ = [("a", "b"), ("b", "q")] + _pack_ = -1 + _layout_ = "ms" @support.cpython_only def test_packed_c_limits(self): # Issue 15989 import _testcapi - d = {"_fields_": [("a", c_byte)], - "_pack_": _testcapi.INT_MAX + 1} - self.assertRaises(ValueError, type(Structure), "X", (Structure,), d) - d = {"_fields_": [("a", c_byte)], - "_pack_": _testcapi.UINT_MAX + 2} - self.assertRaises(ValueError, type(Structure), "X", (Structure,), d) + with self.assertRaises(ValueError): + class X(Structure): + _fields_ = [("a", c_byte)] + _pack_ = _testcapi.INT_MAX + 1 + _layout_ = "ms" + + with self.assertRaises(ValueError): + class X(Structure): + _fields_ = [("a", c_byte)] + _pack_ = _testcapi.UINT_MAX + 2 + _layout_ = "ms" def test_initializers(self): class Person(Structure): diff --git a/Lib/test/test_ctypes/test_unaligned_structures.py b/Lib/test/test_ctypes/test_unaligned_structures.py index 58a00597ef5..b5fb4c0df77 100644 --- a/Lib/test/test_ctypes/test_unaligned_structures.py +++ b/Lib/test/test_ctypes/test_unaligned_structures.py @@ -19,10 +19,12 @@ for typ in [c_short, c_int, c_long, c_longlong, c_ushort, c_uint, c_ulong, c_ulonglong]: class X(Structure): _pack_ = 1 + _layout_ = 'ms' _fields_ = [("pad", c_byte), ("value", typ)] class Y(SwappedStructure): _pack_ = 1 + _layout_ = 'ms' _fields_ = [("pad", c_byte), ("value", typ)] structures.append(X) diff --git a/Lib/test/test_dataclasses/__init__.py b/Lib/test/test_dataclasses/__init__.py index 99fefb57fd0..ac78f8327b8 100644 --- a/Lib/test/test_dataclasses/__init__.py +++ b/Lib/test/test_dataclasses/__init__.py @@ -5,6 +5,7 @@ from dataclasses import * import abc +import annotationlib import io import pickle import inspect @@ -12,6 +13,7 @@ import builtins import types import weakref import traceback +import sys import textwrap import unittest from unittest.mock import Mock @@ -25,6 +27,7 @@ import typing # Needed for the string "typing.ClassVar[int]" to work as an import dataclasses # Needed for the string "dataclasses.InitVar[int]" to work as an annotation. from test import support +from test.support import import_helper # Just any custom exception we can catch. class CustomError(Exception): pass @@ -3754,7 +3757,6 @@ class TestSlots(unittest.TestCase): @support.cpython_only def test_dataclass_slot_dict_ctype(self): # https://github.com/python/cpython/issues/123935 - from test.support import import_helper # Skips test if `_testcapi` is not present: _testcapi = import_helper.import_module('_testcapi') @@ -4246,16 +4248,56 @@ class TestMakeDataclass(unittest.TestCase): C = make_dataclass('Point', ['x', 'y', 'z']) c = C(1, 2, 3) self.assertEqual(vars(c), {'x': 1, 'y': 2, 'z': 3}) - self.assertEqual(C.__annotations__, {'x': 'typing.Any', - 'y': 'typing.Any', - 'z': 'typing.Any'}) + self.assertEqual(C.__annotations__, {'x': typing.Any, + 'y': typing.Any, + 'z': typing.Any}) C = make_dataclass('Point', ['x', ('y', int), 'z']) c = C(1, 2, 3) self.assertEqual(vars(c), {'x': 1, 'y': 2, 'z': 3}) - self.assertEqual(C.__annotations__, {'x': 'typing.Any', + self.assertEqual(C.__annotations__, {'x': typing.Any, 'y': int, - 'z': 'typing.Any'}) + 'z': typing.Any}) + + def test_no_types_get_annotations(self): + C = make_dataclass('C', ['x', ('y', int), 'z']) + + self.assertEqual( + annotationlib.get_annotations(C, format=annotationlib.Format.VALUE), + {'x': typing.Any, 'y': int, 'z': typing.Any}, + ) + self.assertEqual( + annotationlib.get_annotations( + C, format=annotationlib.Format.FORWARDREF), + {'x': typing.Any, 'y': int, 'z': typing.Any}, + ) + self.assertEqual( + annotationlib.get_annotations( + C, format=annotationlib.Format.STRING), + {'x': 'typing.Any', 'y': 'int', 'z': 'typing.Any'}, + ) + + def test_no_types_no_typing_import(self): + with import_helper.CleanImport('typing'): + self.assertNotIn('typing', sys.modules) + C = make_dataclass('C', ['x', ('y', int)]) + + self.assertNotIn('typing', sys.modules) + self.assertEqual( + C.__annotate__(annotationlib.Format.FORWARDREF), + { + 'x': annotationlib.ForwardRef('Any', module='typing'), + 'y': int, + }, + ) + self.assertNotIn('typing', sys.modules) + + for field in fields(C): + if field.name == "x": + self.assertEqual(field.type, annotationlib.ForwardRef('Any', module='typing')) + else: + self.assertEqual(field.name, "y") + self.assertIs(field.type, int) def test_module_attr(self): self.assertEqual(ByMakeDataClass.__module__, __name__) diff --git a/Lib/test/test_dis.py b/Lib/test/test_dis.py index f2586fcee57..ae68c1dd75c 100644 --- a/Lib/test/test_dis.py +++ b/Lib/test/test_dis.py @@ -1336,7 +1336,7 @@ class DisTests(DisTestBase): # Loop can trigger a quicken where the loop is located self.code_quicken(loop_test) got = self.get_disassembly(loop_test, adaptive=True) - jit = import_helper.import_module("_testinternalcapi").jit_enabled() + jit = sys._jit.is_enabled() expected = dis_loop_test_quickened_code.format("JIT" if jit else "NO_JIT") self.do_disassembly_compare(got, expected) diff --git a/Lib/test/test_email/test_utils.py b/Lib/test/test_email/test_utils.py index 4e6201e13c8..c9d09098b50 100644 --- a/Lib/test/test_email/test_utils.py +++ b/Lib/test/test_email/test_utils.py @@ -4,6 +4,16 @@ import test.support import time import unittest +from test.support import cpython_only +from test.support.import_helper import ensure_lazy_imports + + +class TestImportTime(unittest.TestCase): + + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("email.utils", {"random", "socket"}) + class DateTimeTests(unittest.TestCase): diff --git a/Lib/test/test_enum.py b/Lib/test/test_enum.py index 68cedc666a5..d8cb5261244 100644 --- a/Lib/test/test_enum.py +++ b/Lib/test/test_enum.py @@ -19,7 +19,8 @@ from io import StringIO from pickle import dumps, loads, PicklingError, HIGHEST_PROTOCOL from test import support from test.support import ALWAYS_EQ, REPO_ROOT -from test.support import threading_helper +from test.support import threading_helper, cpython_only +from test.support.import_helper import ensure_lazy_imports from datetime import timedelta python_version = sys.version_info[:2] @@ -5288,6 +5289,10 @@ class MiscTestCase(unittest.TestCase): def test__all__(self): support.check__all__(self, enum, not_exported={'bin', 'show_flag_values'}) + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("enum", {"functools", "warnings", "inspect", "re"}) + def test_doc_1(self): class Single(Enum): ONE = 1 diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index f787190b1ae..ad3f669a030 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -15,13 +15,12 @@ import subprocess PROCESS_VM_READV_SUPPORTED = False try: - from _remotedebugging import PROCESS_VM_READV_SUPPORTED - from _remotedebugging import get_stack_trace - from _remotedebugging import get_async_stack_trace - from _remotedebugging import get_all_awaited_by + from _remote_debugging import PROCESS_VM_READV_SUPPORTED + from _remote_debugging import get_stack_trace + from _remote_debugging import get_async_stack_trace + from _remote_debugging import get_all_awaited_by except ImportError: - raise unittest.SkipTest("Test only runs when _remotedebuggingmodule is available") - + raise unittest.SkipTest("Test only runs when _remote_debugging is available") def _make_test_script(script_dir, script_basename, source): to_return = make_script(script_dir, script_basename, source) @@ -60,8 +59,7 @@ class TestGetStackTrace(unittest.TestCase): foo() def foo(): - sock.sendall(b"ready") - time.sleep(1000) + sock.sendall(b"ready"); time.sleep(10_000) # same line number bar() """ @@ -97,10 +95,10 @@ class TestGetStackTrace(unittest.TestCase): p.wait(timeout=SHORT_TIMEOUT) expected_stack_trace = [ - ("foo", script_name, 15), + ("foo", script_name, 14), ("baz", script_name, 11), ("bar", script_name, 9), - ("<module>", script_name, 17), + ("<module>", script_name, 16), ] self.assertEqual(stack_trace, expected_stack_trace) @@ -123,8 +121,7 @@ class TestGetStackTrace(unittest.TestCase): sock.connect(('localhost', {port})) def c5(): - sock.sendall(b"ready") - time.sleep(10000) + sock.sendall(b"ready"); time.sleep(10_000) # same line number async def c4(): await asyncio.sleep(0) @@ -196,10 +193,10 @@ class TestGetStackTrace(unittest.TestCase): root_task = "Task-1" expected_stack_trace = [ [ - ("c5", script_name, 11), - ("c4", script_name, 15), - ("c3", script_name, 18), - ("c2", script_name, 21), + ("c5", script_name, 10), + ("c4", script_name, 14), + ("c3", script_name, 17), + ("c2", script_name, 20), ], "c2_root", [ @@ -215,13 +212,13 @@ class TestGetStackTrace(unittest.TestCase): taskgroups.__file__, ANY, ), - ("main", script_name, 27), + ("main", script_name, 26), ], "Task-1", [], ], [ - [("c1", script_name, 24)], + [("c1", script_name, 23)], "sub_main_1", [ [ @@ -236,7 +233,7 @@ class TestGetStackTrace(unittest.TestCase): taskgroups.__file__, ANY, ), - ("main", script_name, 27), + ("main", script_name, 26), ], "Task-1", [], @@ -244,7 +241,7 @@ class TestGetStackTrace(unittest.TestCase): ], ], [ - [("c1", script_name, 24)], + [("c1", script_name, 23)], "sub_main_2", [ [ @@ -259,7 +256,7 @@ class TestGetStackTrace(unittest.TestCase): taskgroups.__file__, ANY, ), - ("main", script_name, 27), + ("main", script_name, 26), ], "Task-1", [], @@ -289,8 +286,7 @@ class TestGetStackTrace(unittest.TestCase): sock.connect(('localhost', {port})) async def gen_nested_call(): - sock.sendall(b"ready") - time.sleep(10000) + sock.sendall(b"ready"); time.sleep(10_000) # same line number async def gen(): for num in range(2): @@ -338,9 +334,9 @@ class TestGetStackTrace(unittest.TestCase): expected_stack_trace = [ [ - ("gen_nested_call", script_name, 11), - ("gen", script_name, 17), - ("main", script_name, 20), + ("gen_nested_call", script_name, 10), + ("gen", script_name, 16), + ("main", script_name, 19), ], "Task-1", [], @@ -367,8 +363,7 @@ class TestGetStackTrace(unittest.TestCase): async def deep(): await asyncio.sleep(0) - sock.sendall(b"ready") - time.sleep(10000) + sock.sendall(b"ready"); time.sleep(10_000) # same line number async def c1(): await asyncio.sleep(0) @@ -415,9 +410,9 @@ class TestGetStackTrace(unittest.TestCase): stack_trace[2].sort(key=lambda x: x[1]) expected_stack_trace = [ - [("deep", script_name, ANY), ("c1", script_name, 16)], + [("deep", script_name, 11), ("c1", script_name, 15)], "Task-2", - [[[("main", script_name, 22)], "Task-1", []]], + [[[("main", script_name, 21)], "Task-1", []]], ] self.assertEqual(stack_trace, expected_stack_trace) @@ -441,15 +436,14 @@ class TestGetStackTrace(unittest.TestCase): async def deep(): await asyncio.sleep(0) - sock.sendall(b"ready") - time.sleep(10000) + sock.sendall(b"ready"); time.sleep(10_000) # same line number async def c1(): await asyncio.sleep(0) await deep() async def c2(): - await asyncio.sleep(10000) + await asyncio.sleep(10_000) async def main(): await asyncio.staggered.staggered_race( @@ -492,8 +486,8 @@ class TestGetStackTrace(unittest.TestCase): stack_trace[2].sort(key=lambda x: x[1]) expected_stack_trace = [ [ - ("deep", script_name, ANY), - ("c1", script_name, 16), + ("deep", script_name, 11), + ("c1", script_name, 15), ("staggered_race.<locals>.run_one_coro", staggered.__file__, ANY), ], "Task-2", @@ -501,7 +495,7 @@ class TestGetStackTrace(unittest.TestCase): [ [ ("staggered_race", staggered.__file__, ANY), - ("main", script_name, 22), + ("main", script_name, 21), ], "Task-1", [], diff --git a/Lib/test/test_functools.py b/Lib/test/test_functools.py index e3b449f2d24..2e794b0fc95 100644 --- a/Lib/test/test_functools.py +++ b/Lib/test/test_functools.py @@ -23,6 +23,7 @@ from inspect import Signature from test.support import import_helper from test.support import threading_helper +from test.support import cpython_only from test.support import EqualToForwardRef import functools @@ -63,6 +64,14 @@ class BadTuple(tuple): class MyDict(dict): pass +class TestImportTime(unittest.TestCase): + + @cpython_only + def test_lazy_import(self): + import_helper.ensure_lazy_imports( + "functools", {"os", "weakref", "typing", "annotationlib", "warnings"} + ) + class TestPartial: diff --git a/Lib/test/test_gettext.py b/Lib/test/test_gettext.py index 585ed08ea14..33b7d75e3ff 100644 --- a/Lib/test/test_gettext.py +++ b/Lib/test/test_gettext.py @@ -6,7 +6,8 @@ import unittest.mock from functools import partial from test import support -from test.support import os_helper +from test.support import cpython_only, os_helper +from test.support.import_helper import ensure_lazy_imports # TODO: @@ -931,6 +932,10 @@ class MiscTestCase(unittest.TestCase): support.check__all__(self, gettext, not_exported={'c2py', 'ENOENT'}) + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("gettext", {"re", "warnings", "locale"}) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_heapq.py b/Lib/test/test_heapq.py index 1aa8e4e2897..d6623fee9bb 100644 --- a/Lib/test/test_heapq.py +++ b/Lib/test/test_heapq.py @@ -13,8 +13,9 @@ c_heapq = import_helper.import_fresh_module('heapq', fresh=['_heapq']) # _heapq.nlargest/nsmallest are saved in heapq._nlargest/_smallest when # _heapq is imported, so check them there -func_names = ['heapify', 'heappop', 'heappush', 'heappushpop', 'heapreplace', - '_heappop_max', '_heapreplace_max', '_heapify_max'] +func_names = ['heapify', 'heappop', 'heappush', 'heappushpop', 'heapreplace'] +# Add max-heap variants +func_names += [func + '_max' for func in func_names] class TestModules(TestCase): def test_py_functions(self): @@ -24,7 +25,7 @@ class TestModules(TestCase): @skipUnless(c_heapq, 'requires _heapq') def test_c_functions(self): for fname in func_names: - self.assertEqual(getattr(c_heapq, fname).__module__, '_heapq') + self.assertEqual(getattr(c_heapq, fname).__module__, '_heapq', fname) def load_tests(loader, tests, ignore): @@ -74,6 +75,34 @@ class TestHeap: except AttributeError: pass + def test_max_push_pop(self): + # 1) Push 256 random numbers and pop them off, verifying all's OK. + heap = [] + data = [] + self.check_max_invariant(heap) + for i in range(256): + item = random.random() + data.append(item) + self.module.heappush_max(heap, item) + self.check_max_invariant(heap) + results = [] + while heap: + item = self.module.heappop_max(heap) + self.check_max_invariant(heap) + results.append(item) + data_sorted = data[:] + data_sorted.sort(reverse=True) + + self.assertEqual(data_sorted, results) + # 2) Check that the invariant holds for a sorted array + self.check_max_invariant(results) + + self.assertRaises(TypeError, self.module.heappush_max, []) + + exc_types = (AttributeError, TypeError) + self.assertRaises(exc_types, self.module.heappush_max, None, None) + self.assertRaises(exc_types, self.module.heappop_max, None) + def check_invariant(self, heap): # Check the heap invariant. for pos, item in enumerate(heap): @@ -81,6 +110,11 @@ class TestHeap: parentpos = (pos-1) >> 1 self.assertTrue(heap[parentpos] <= item) + def check_max_invariant(self, heap): + for pos, item in enumerate(heap[1:], start=1): + parentpos = (pos - 1) >> 1 + self.assertGreaterEqual(heap[parentpos], item) + def test_heapify(self): for size in list(range(30)) + [20000]: heap = [random.random() for dummy in range(size)] @@ -89,6 +123,14 @@ class TestHeap: self.assertRaises(TypeError, self.module.heapify, None) + def test_heapify_max(self): + for size in list(range(30)) + [20000]: + heap = [random.random() for dummy in range(size)] + self.module.heapify_max(heap) + self.check_max_invariant(heap) + + self.assertRaises(TypeError, self.module.heapify_max, None) + def test_naive_nbest(self): data = [random.randrange(2000) for i in range(1000)] heap = [] @@ -109,10 +151,7 @@ class TestHeap: def test_nbest(self): # Less-naive "N-best" algorithm, much faster (if len(data) is big - # enough <wink>) than sorting all of data. However, if we had a max - # heap instead of a min heap, it could go faster still via - # heapify'ing all of data (linear time), then doing 10 heappops - # (10 log-time steps). + # enough <wink>) than sorting all of data. data = [random.randrange(2000) for i in range(1000)] heap = data[:10] self.module.heapify(heap) @@ -125,6 +164,17 @@ class TestHeap: self.assertRaises(TypeError, self.module.heapreplace, None, None) self.assertRaises(IndexError, self.module.heapreplace, [], None) + def test_nbest_maxheap(self): + # With a max heap instead of a min heap, the "N-best" algorithm can + # go even faster still via heapify'ing all of data (linear time), then + # doing 10 heappops (10 log-time steps). + data = [random.randrange(2000) for i in range(1000)] + heap = data[:] + self.module.heapify_max(heap) + result = [self.module.heappop_max(heap) for _ in range(10)] + result.reverse() + self.assertEqual(result, sorted(data)[-10:]) + def test_nbest_with_pushpop(self): data = [random.randrange(2000) for i in range(1000)] heap = data[:10] @@ -134,6 +184,62 @@ class TestHeap: self.assertEqual(list(self.heapiter(heap)), sorted(data)[-10:]) self.assertEqual(self.module.heappushpop([], 'x'), 'x') + def test_naive_nworst(self): + # Max-heap variant of "test_naive_nbest" + data = [random.randrange(2000) for i in range(1000)] + heap = [] + for item in data: + self.module.heappush_max(heap, item) + if len(heap) > 10: + self.module.heappop_max(heap) + heap.sort() + expected = sorted(data)[:10] + self.assertEqual(heap, expected) + + def heapiter_max(self, heap): + # An iterator returning a max-heap's elements, largest-first. + try: + while 1: + yield self.module.heappop_max(heap) + except IndexError: + pass + + def test_nworst(self): + # Max-heap variant of "test_nbest" + data = [random.randrange(2000) for i in range(1000)] + heap = data[:10] + self.module.heapify_max(heap) + for item in data[10:]: + if item < heap[0]: # this gets rarer the longer we run + self.module.heapreplace_max(heap, item) + expected = sorted(data, reverse=True)[-10:] + self.assertEqual(list(self.heapiter_max(heap)), expected) + + self.assertRaises(TypeError, self.module.heapreplace_max, None) + self.assertRaises(TypeError, self.module.heapreplace_max, None, None) + self.assertRaises(IndexError, self.module.heapreplace_max, [], None) + + def test_nworst_minheap(self): + # Min-heap variant of "test_nbest_maxheap" + data = [random.randrange(2000) for i in range(1000)] + heap = data[:] + self.module.heapify(heap) + result = [self.module.heappop(heap) for _ in range(10)] + result.reverse() + expected = sorted(data, reverse=True)[-10:] + self.assertEqual(result, expected) + + def test_nworst_with_pushpop(self): + # Max-heap variant of "test_nbest_with_pushpop" + data = [random.randrange(2000) for i in range(1000)] + heap = data[:10] + self.module.heapify_max(heap) + for item in data[10:]: + self.module.heappushpop_max(heap, item) + expected = sorted(data, reverse=True)[-10:] + self.assertEqual(list(self.heapiter_max(heap)), expected) + self.assertEqual(self.module.heappushpop_max([], 'x'), 'x') + def test_heappushpop(self): h = [] x = self.module.heappushpop(h, 10) @@ -153,12 +259,31 @@ class TestHeap: x = self.module.heappushpop(h, 11) self.assertEqual((h, x), ([11], 10)) + def test_heappushpop_max(self): + h = [] + x = self.module.heappushpop_max(h, 10) + self.assertTupleEqual((h, x), ([], 10)) + + h = [10] + x = self.module.heappushpop_max(h, 10.0) + self.assertTupleEqual((h, x), ([10], 10.0)) + self.assertIsInstance(h[0], int) + self.assertIsInstance(x, float) + + h = [10] + x = self.module.heappushpop_max(h, 11) + self.assertTupleEqual((h, x), ([10], 11)) + + h = [10] + x = self.module.heappushpop_max(h, 9) + self.assertTupleEqual((h, x), ([9], 10)) + def test_heappop_max(self): - # _heapop_max has an optimization for one-item lists which isn't + # heapop_max has an optimization for one-item lists which isn't # covered in other tests, so test that case explicitly here h = [3, 2] - self.assertEqual(self.module._heappop_max(h), 3) - self.assertEqual(self.module._heappop_max(h), 2) + self.assertEqual(self.module.heappop_max(h), 3) + self.assertEqual(self.module.heappop_max(h), 2) def test_heapsort(self): # Exercise everything with repeated heapsort checks @@ -175,6 +300,20 @@ class TestHeap: heap_sorted = [self.module.heappop(heap) for i in range(size)] self.assertEqual(heap_sorted, sorted(data)) + def test_heapsort_max(self): + for trial in range(100): + size = random.randrange(50) + data = [random.randrange(25) for i in range(size)] + if trial & 1: # Half of the time, use heapify_max + heap = data[:] + self.module.heapify_max(heap) + else: # The rest of the time, use heappush_max + heap = [] + for item in data: + self.module.heappush_max(heap, item) + heap_sorted = [self.module.heappop_max(heap) for i in range(size)] + self.assertEqual(heap_sorted, sorted(data, reverse=True)) + def test_merge(self): inputs = [] for i in range(random.randrange(25)): @@ -377,16 +516,20 @@ class SideEffectLT: class TestErrorHandling: def test_non_sequence(self): - for f in (self.module.heapify, self.module.heappop): + for f in (self.module.heapify, self.module.heappop, + self.module.heapify_max, self.module.heappop_max): self.assertRaises((TypeError, AttributeError), f, 10) for f in (self.module.heappush, self.module.heapreplace, + self.module.heappush_max, self.module.heapreplace_max, self.module.nlargest, self.module.nsmallest): self.assertRaises((TypeError, AttributeError), f, 10, 10) def test_len_only(self): - for f in (self.module.heapify, self.module.heappop): + for f in (self.module.heapify, self.module.heappop, + self.module.heapify_max, self.module.heappop_max): self.assertRaises((TypeError, AttributeError), f, LenOnly()) - for f in (self.module.heappush, self.module.heapreplace): + for f in (self.module.heappush, self.module.heapreplace, + self.module.heappush_max, self.module.heapreplace_max): self.assertRaises((TypeError, AttributeError), f, LenOnly(), 10) for f in (self.module.nlargest, self.module.nsmallest): self.assertRaises(TypeError, f, 2, LenOnly()) @@ -395,7 +538,8 @@ class TestErrorHandling: seq = [CmpErr(), CmpErr(), CmpErr()] for f in (self.module.heapify, self.module.heappop): self.assertRaises(ZeroDivisionError, f, seq) - for f in (self.module.heappush, self.module.heapreplace): + for f in (self.module.heappush, self.module.heapreplace, + self.module.heappush_max, self.module.heapreplace_max): self.assertRaises(ZeroDivisionError, f, seq, 10) for f in (self.module.nlargest, self.module.nsmallest): self.assertRaises(ZeroDivisionError, f, 2, seq) @@ -403,6 +547,8 @@ class TestErrorHandling: def test_arg_parsing(self): for f in (self.module.heapify, self.module.heappop, self.module.heappush, self.module.heapreplace, + self.module.heapify_max, self.module.heappop_max, + self.module.heappush_max, self.module.heapreplace_max, self.module.nlargest, self.module.nsmallest): self.assertRaises((TypeError, AttributeError), f, 10) @@ -424,6 +570,10 @@ class TestErrorHandling: # Python version raises IndexError, C version RuntimeError with self.assertRaises((IndexError, RuntimeError)): self.module.heappush(heap, SideEffectLT(5, heap)) + heap = [] + heap.extend(SideEffectLT(i, heap) for i in range(200)) + with self.assertRaises((IndexError, RuntimeError)): + self.module.heappush_max(heap, SideEffectLT(5, heap)) def test_heappop_mutating_heap(self): heap = [] @@ -431,8 +581,12 @@ class TestErrorHandling: # Python version raises IndexError, C version RuntimeError with self.assertRaises((IndexError, RuntimeError)): self.module.heappop(heap) + heap = [] + heap.extend(SideEffectLT(i, heap) for i in range(200)) + with self.assertRaises((IndexError, RuntimeError)): + self.module.heappop_max(heap) - def test_comparison_operator_modifiying_heap(self): + def test_comparison_operator_modifying_heap(self): # See bpo-39421: Strong references need to be taken # when comparing objects as they can alter the heap class EvilClass(int): @@ -444,7 +598,7 @@ class TestErrorHandling: self.module.heappush(heap, EvilClass(0)) self.assertRaises(IndexError, self.module.heappushpop, heap, 1) - def test_comparison_operator_modifiying_heap_two_heaps(self): + def test_comparison_operator_modifying_heap_two_heaps(self): class h(int): def __lt__(self, o): @@ -464,6 +618,17 @@ class TestErrorHandling: self.assertRaises((IndexError, RuntimeError), self.module.heappush, list1, g(1)) self.assertRaises((IndexError, RuntimeError), self.module.heappush, list2, h(1)) + list1, list2 = [], [] + + self.module.heappush_max(list1, h(0)) + self.module.heappush_max(list2, g(0)) + self.module.heappush_max(list1, g(1)) + self.module.heappush_max(list2, h(1)) + + self.assertRaises((IndexError, RuntimeError), self.module.heappush_max, list1, g(1)) + self.assertRaises((IndexError, RuntimeError), self.module.heappush_max, list2, h(1)) + + class TestErrorHandlingPython(TestErrorHandling, TestCase): module = py_heapq diff --git a/Lib/test/test_json/test_tool.py b/Lib/test/test_json/test_tool.py index ba9c42f758e..72cde3f0d6c 100644 --- a/Lib/test/test_json/test_tool.py +++ b/Lib/test/test_json/test_tool.py @@ -6,9 +6,11 @@ import unittest import subprocess from test import support -from test.support import force_not_colorized, os_helper +from test.support import force_colorized, force_not_colorized, os_helper from test.support.script_helper import assert_python_ok +from _colorize import get_theme + @support.requires_subprocess() class TestMain(unittest.TestCase): @@ -246,34 +248,39 @@ class TestMain(unittest.TestCase): proc.communicate(b'"{}"') self.assertEqual(proc.returncode, errno.EPIPE) + @force_colorized def test_colors(self): infile = os_helper.TESTFN self.addCleanup(os.remove, infile) + t = get_theme().syntax + ob = "{" + cb = "}" + cases = ( - ('{}', b'{}'), - ('[]', b'[]'), - ('null', b'\x1b[1;36mnull\x1b[0m'), - ('true', b'\x1b[1;36mtrue\x1b[0m'), - ('false', b'\x1b[1;36mfalse\x1b[0m'), - ('NaN', b'NaN'), - ('Infinity', b'Infinity'), - ('-Infinity', b'-Infinity'), - ('"foo"', b'\x1b[1;32m"foo"\x1b[0m'), - (r'" \"foo\" "', b'\x1b[1;32m" \\"foo\\" "\x1b[0m'), - ('"α"', b'\x1b[1;32m"\\u03b1"\x1b[0m'), - ('123', b'123'), - ('-1.2345e+23', b'-1.2345e+23'), + ('{}', '{}'), + ('[]', '[]'), + ('null', f'{t.keyword}null{t.reset}'), + ('true', f'{t.keyword}true{t.reset}'), + ('false', f'{t.keyword}false{t.reset}'), + ('NaN', f'{t.number}NaN{t.reset}'), + ('Infinity', f'{t.number}Infinity{t.reset}'), + ('-Infinity', f'{t.number}-Infinity{t.reset}'), + ('"foo"', f'{t.string}"foo"{t.reset}'), + (r'" \"foo\" "', f'{t.string}" \\"foo\\" "{t.reset}'), + ('"α"', f'{t.string}"\\u03b1"{t.reset}'), + ('123', f'{t.number}123{t.reset}'), + ('-1.2345e+23', f'{t.number}-1.2345e+23{t.reset}'), (r'{"\\": ""}', - b'''\ -{ - \x1b[94m"\\\\"\x1b[0m: \x1b[1;32m""\x1b[0m -}'''), + f'''\ +{ob} + {t.definition}"\\\\"{t.reset}: {t.string}""{t.reset} +{cb}'''), (r'{"\\\\": ""}', - b'''\ -{ - \x1b[94m"\\\\\\\\"\x1b[0m: \x1b[1;32m""\x1b[0m -}'''), + f'''\ +{ob} + {t.definition}"\\\\\\\\"{t.reset}: {t.string}""{t.reset} +{cb}'''), ('''\ { "foo": "bar", @@ -281,30 +288,32 @@ class TestMain(unittest.TestCase): "qux": [true, false, null], "xyz": [NaN, -Infinity, Infinity] }''', - b'''\ -{ - \x1b[94m"foo"\x1b[0m: \x1b[1;32m"bar"\x1b[0m, - \x1b[94m"baz"\x1b[0m: 1234, - \x1b[94m"qux"\x1b[0m: [ - \x1b[1;36mtrue\x1b[0m, - \x1b[1;36mfalse\x1b[0m, - \x1b[1;36mnull\x1b[0m + f'''\ +{ob} + {t.definition}"foo"{t.reset}: {t.string}"bar"{t.reset}, + {t.definition}"baz"{t.reset}: {t.number}1234{t.reset}, + {t.definition}"qux"{t.reset}: [ + {t.keyword}true{t.reset}, + {t.keyword}false{t.reset}, + {t.keyword}null{t.reset} ], - \x1b[94m"xyz"\x1b[0m: [ - NaN, - -Infinity, - Infinity + {t.definition}"xyz"{t.reset}: [ + {t.number}NaN{t.reset}, + {t.number}-Infinity{t.reset}, + {t.number}Infinity{t.reset} ] -}'''), +{cb}'''), ) for input_, expected in cases: with self.subTest(input=input_): with open(infile, "w", encoding="utf-8") as fp: fp.write(input_) - _, stdout, _ = assert_python_ok('-m', self.module, infile, - PYTHON_COLORS='1') - stdout = stdout.replace(b'\r\n', b'\n') # normalize line endings + _, stdout_b, _ = assert_python_ok( + '-m', self.module, infile, FORCE_COLOR='1', __isolated='1' + ) + stdout = stdout_b.decode() + stdout = stdout.replace('\r\n', '\n') # normalize line endings stdout = stdout.strip() self.assertEqual(stdout, expected) diff --git a/Lib/test/test_locale.py b/Lib/test/test_locale.py index 528ceef5281..455d2af37ef 100644 --- a/Lib/test/test_locale.py +++ b/Lib/test/test_locale.py @@ -1,13 +1,18 @@ from decimal import Decimal -from test.support import verbose, is_android, linked_to_musl, os_helper +from test.support import cpython_only, verbose, is_android, linked_to_musl, os_helper from test.support.warnings_helper import check_warnings -from test.support.import_helper import import_fresh_module +from test.support.import_helper import ensure_lazy_imports, import_fresh_module from unittest import mock import unittest import locale import sys import codecs +class LazyImportTest(unittest.TestCase): + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("locale", {"re", "warnings"}) + class BaseLocalizedTest(unittest.TestCase): # diff --git a/Lib/test/test_mimetypes.py b/Lib/test/test_mimetypes.py index dad5dbde7cd..fb57d5e5544 100644 --- a/Lib/test/test_mimetypes.py +++ b/Lib/test/test_mimetypes.py @@ -6,7 +6,8 @@ import sys import unittest.mock from platform import win32_edition from test import support -from test.support import os_helper +from test.support import cpython_only, force_not_colorized, os_helper +from test.support.import_helper import ensure_lazy_imports try: import _winapi @@ -435,8 +436,13 @@ class MiscTestCase(unittest.TestCase): def test__all__(self): support.check__all__(self, mimetypes) + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("mimetypes", {"os", "posixpath", "urllib.parse", "argparse"}) + class CommandLineTest(unittest.TestCase): + @force_not_colorized def test_parse_args(self): args, help_text = mimetypes._parse_args("-h") self.assertTrue(help_text.startswith("usage: ")) diff --git a/Lib/test/test_minidom.py b/Lib/test/test_minidom.py index 6679c0a4fbe..4f25e9c2a03 100644 --- a/Lib/test/test_minidom.py +++ b/Lib/test/test_minidom.py @@ -102,41 +102,38 @@ class MinidomTest(unittest.TestCase): elem = root.childNodes[0] nelem = dom.createElement("element") root.insertBefore(nelem, elem) - self.confirm(len(root.childNodes) == 2 - and root.childNodes.length == 2 - and root.childNodes[0] is nelem - and root.childNodes.item(0) is nelem - and root.childNodes[1] is elem - and root.childNodes.item(1) is elem - and root.firstChild is nelem - and root.lastChild is elem - and root.toxml() == "<doc><element/><foo/></doc>" - , "testInsertBefore -- node properly placed in tree") + self.assertEqual(len(root.childNodes), 2) + self.assertEqual(root.childNodes.length, 2) + self.assertIs(root.childNodes[0], nelem) + self.assertIs(root.childNodes.item(0), nelem) + self.assertIs(root.childNodes[1], elem) + self.assertIs(root.childNodes.item(1), elem) + self.assertIs(root.firstChild, nelem) + self.assertIs(root.lastChild, elem) + self.assertEqual(root.toxml(), "<doc><element/><foo/></doc>") nelem = dom.createElement("element") root.insertBefore(nelem, None) - self.confirm(len(root.childNodes) == 3 - and root.childNodes.length == 3 - and root.childNodes[1] is elem - and root.childNodes.item(1) is elem - and root.childNodes[2] is nelem - and root.childNodes.item(2) is nelem - and root.lastChild is nelem - and nelem.previousSibling is elem - and root.toxml() == "<doc><element/><foo/><element/></doc>" - , "testInsertBefore -- node properly placed in tree") + self.assertEqual(len(root.childNodes), 3) + self.assertEqual(root.childNodes.length, 3) + self.assertIs(root.childNodes[1], elem) + self.assertIs(root.childNodes.item(1), elem) + self.assertIs(root.childNodes[2], nelem) + self.assertIs(root.childNodes.item(2), nelem) + self.assertIs(root.lastChild, nelem) + self.assertIs(nelem.previousSibling, elem) + self.assertEqual(root.toxml(), "<doc><element/><foo/><element/></doc>") nelem2 = dom.createElement("bar") root.insertBefore(nelem2, nelem) - self.confirm(len(root.childNodes) == 4 - and root.childNodes.length == 4 - and root.childNodes[2] is nelem2 - and root.childNodes.item(2) is nelem2 - and root.childNodes[3] is nelem - and root.childNodes.item(3) is nelem - and nelem2.nextSibling is nelem - and nelem.previousSibling is nelem2 - and root.toxml() == - "<doc><element/><foo/><bar/><element/></doc>" - , "testInsertBefore -- node properly placed in tree") + self.assertEqual(len(root.childNodes), 4) + self.assertEqual(root.childNodes.length, 4) + self.assertIs(root.childNodes[2], nelem2) + self.assertIs(root.childNodes.item(2), nelem2) + self.assertIs(root.childNodes[3], nelem) + self.assertIs(root.childNodes.item(3), nelem) + self.assertIs(nelem2.nextSibling, nelem) + self.assertIs(nelem.previousSibling, nelem2) + self.assertEqual(root.toxml(), + "<doc><element/><foo/><bar/><element/></doc>") dom.unlink() def _create_fragment_test_nodes(self): @@ -342,8 +339,8 @@ class MinidomTest(unittest.TestCase): self.assertRaises(xml.dom.NotFoundErr, child.removeAttributeNode, None) self.assertIs(node, child.removeAttributeNode(node)) - self.confirm(len(child.attributes) == 0 - and child.getAttributeNode("spam") is None) + self.assertEqual(len(child.attributes), 0) + self.assertIsNone(child.getAttributeNode("spam")) dom2 = Document() child2 = dom2.appendChild(dom2.createElement("foo")) node2 = child2.getAttributeNode("spam") @@ -366,33 +363,34 @@ class MinidomTest(unittest.TestCase): # Set this attribute to be an ID and make sure that doesn't change # when changing the value: el.setIdAttribute("spam") - self.confirm(len(el.attributes) == 1 - and el.attributes["spam"].value == "bam" - and el.attributes["spam"].nodeValue == "bam" - and el.getAttribute("spam") == "bam" - and el.getAttributeNode("spam").isId) + self.assertEqual(len(el.attributes), 1) + self.assertEqual(el.attributes["spam"].value, "bam") + self.assertEqual(el.attributes["spam"].nodeValue, "bam") + self.assertEqual(el.getAttribute("spam"), "bam") + self.assertTrue(el.getAttributeNode("spam").isId) el.attributes["spam"] = "ham" - self.confirm(len(el.attributes) == 1 - and el.attributes["spam"].value == "ham" - and el.attributes["spam"].nodeValue == "ham" - and el.getAttribute("spam") == "ham" - and el.attributes["spam"].isId) + self.assertEqual(len(el.attributes), 1) + self.assertEqual(el.attributes["spam"].value, "ham") + self.assertEqual(el.attributes["spam"].nodeValue, "ham") + self.assertEqual(el.getAttribute("spam"), "ham") + self.assertTrue(el.attributes["spam"].isId) el.setAttribute("spam2", "bam") - self.confirm(len(el.attributes) == 2 - and el.attributes["spam"].value == "ham" - and el.attributes["spam"].nodeValue == "ham" - and el.getAttribute("spam") == "ham" - and el.attributes["spam2"].value == "bam" - and el.attributes["spam2"].nodeValue == "bam" - and el.getAttribute("spam2") == "bam") + self.assertEqual(len(el.attributes), 2) + self.assertEqual(el.attributes["spam"].value, "ham") + self.assertEqual(el.attributes["spam"].nodeValue, "ham") + self.assertEqual(el.getAttribute("spam"), "ham") + self.assertEqual(el.attributes["spam2"].value, "bam") + self.assertEqual(el.attributes["spam2"].nodeValue, "bam") + self.assertEqual(el.getAttribute("spam2"), "bam") el.attributes["spam2"] = "bam2" - self.confirm(len(el.attributes) == 2 - and el.attributes["spam"].value == "ham" - and el.attributes["spam"].nodeValue == "ham" - and el.getAttribute("spam") == "ham" - and el.attributes["spam2"].value == "bam2" - and el.attributes["spam2"].nodeValue == "bam2" - and el.getAttribute("spam2") == "bam2") + + self.assertEqual(len(el.attributes), 2) + self.assertEqual(el.attributes["spam"].value, "ham") + self.assertEqual(el.attributes["spam"].nodeValue, "ham") + self.assertEqual(el.getAttribute("spam"), "ham") + self.assertEqual(el.attributes["spam2"].value, "bam2") + self.assertEqual(el.attributes["spam2"].nodeValue, "bam2") + self.assertEqual(el.getAttribute("spam2"), "bam2") dom.unlink() def testGetAttrList(self): @@ -448,12 +446,12 @@ class MinidomTest(unittest.TestCase): dom = parseString(d) elems = dom.getElementsByTagNameNS("http://pyxml.sf.net/minidom", "myelem") - self.confirm(len(elems) == 1 - and elems[0].namespaceURI == "http://pyxml.sf.net/minidom" - and elems[0].localName == "myelem" - and elems[0].prefix == "minidom" - and elems[0].tagName == "minidom:myelem" - and elems[0].nodeName == "minidom:myelem") + self.assertEqual(len(elems), 1) + self.assertEqual(elems[0].namespaceURI, "http://pyxml.sf.net/minidom") + self.assertEqual(elems[0].localName, "myelem") + self.assertEqual(elems[0].prefix, "minidom") + self.assertEqual(elems[0].tagName, "minidom:myelem") + self.assertEqual(elems[0].nodeName, "minidom:myelem") dom.unlink() def get_empty_nodelist_from_elements_by_tagName_ns_helper(self, doc, nsuri, @@ -602,17 +600,17 @@ class MinidomTest(unittest.TestCase): def testProcessingInstruction(self): dom = parseString('<e><?mypi \t\n data \t\n ?></e>') pi = dom.documentElement.firstChild - self.confirm(pi.target == "mypi" - and pi.data == "data \t\n " - and pi.nodeName == "mypi" - and pi.nodeType == Node.PROCESSING_INSTRUCTION_NODE - and pi.attributes is None - and not pi.hasChildNodes() - and len(pi.childNodes) == 0 - and pi.firstChild is None - and pi.lastChild is None - and pi.localName is None - and pi.namespaceURI == xml.dom.EMPTY_NAMESPACE) + self.assertEqual(pi.target, "mypi") + self.assertEqual(pi.data, "data \t\n ") + self.assertEqual(pi.nodeName, "mypi") + self.assertEqual(pi.nodeType, Node.PROCESSING_INSTRUCTION_NODE) + self.assertIsNone(pi.attributes) + self.assertFalse(pi.hasChildNodes()) + self.assertEqual(len(pi.childNodes), 0) + self.assertIsNone(pi.firstChild) + self.assertIsNone(pi.lastChild) + self.assertIsNone(pi.localName) + self.assertEqual(pi.namespaceURI, xml.dom.EMPTY_NAMESPACE) def testProcessingInstructionRepr(self): dom = parseString('<e><?mypi \t\n data \t\n ?></e>') @@ -718,19 +716,16 @@ class MinidomTest(unittest.TestCase): keys2 = list(attrs2.keys()) keys1.sort() keys2.sort() - self.assertEqual(keys1, keys2, - "clone of element has same attribute keys") + self.assertEqual(keys1, keys2) for i in range(len(keys1)): a1 = attrs1.item(i) a2 = attrs2.item(i) - self.confirm(a1 is not a2 - and a1.value == a2.value - and a1.nodeValue == a2.nodeValue - and a1.namespaceURI == a2.namespaceURI - and a1.localName == a2.localName - , "clone of attribute node has proper attribute values") - self.assertIs(a2.ownerElement, e2, - "clone of attribute node correctly owned") + self.assertIsNot(a1, a2) + self.assertEqual(a1.value, a2.value) + self.assertEqual(a1.nodeValue, a2.nodeValue) + self.assertEqual(a1.namespaceURI,a2.namespaceURI) + self.assertEqual(a1.localName, a2.localName) + self.assertIs(a2.ownerElement, e2) def _setupCloneElement(self, deep): dom = parseString("<doc attr='value'><foo/></doc>") @@ -746,20 +741,19 @@ class MinidomTest(unittest.TestCase): def testCloneElementShallow(self): dom, clone = self._setupCloneElement(0) - self.confirm(len(clone.childNodes) == 0 - and clone.childNodes.length == 0 - and clone.parentNode is None - and clone.toxml() == '<doc attr="value"/>' - , "testCloneElementShallow") + self.assertEqual(len(clone.childNodes), 0) + self.assertEqual(clone.childNodes.length, 0) + self.assertIsNone(clone.parentNode) + self.assertEqual(clone.toxml(), '<doc attr="value"/>') + dom.unlink() def testCloneElementDeep(self): dom, clone = self._setupCloneElement(1) - self.confirm(len(clone.childNodes) == 1 - and clone.childNodes.length == 1 - and clone.parentNode is None - and clone.toxml() == '<doc attr="value"><foo/></doc>' - , "testCloneElementDeep") + self.assertEqual(len(clone.childNodes), 1) + self.assertEqual(clone.childNodes.length, 1) + self.assertIsNone(clone.parentNode) + self.assertTrue(clone.toxml(), '<doc attr="value"><foo/></doc>') dom.unlink() def testCloneDocumentShallow(self): diff --git a/Lib/test/test_optparse.py b/Lib/test/test_optparse.py index 8655a0537a5..e6ffd2b0ffe 100644 --- a/Lib/test/test_optparse.py +++ b/Lib/test/test_optparse.py @@ -14,8 +14,9 @@ import unittest from io import StringIO from test import support -from test.support import os_helper +from test.support import cpython_only, os_helper from test.support.i18n_helper import TestTranslationsBase, update_translation_snapshots +from test.support.import_helper import ensure_lazy_imports import optparse from optparse import make_option, Option, \ @@ -1655,6 +1656,10 @@ class MiscTestCase(unittest.TestCase): not_exported = {'check_builtin', 'AmbiguousOptionError', 'NO_DEFAULT'} support.check__all__(self, optparse, not_exported=not_exported) + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("optparse", {"textwrap"}) + class TestTranslations(TestTranslationsBase): def test_translations(self): diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py index 41a79d0dceb..8a313cc4292 100644 --- a/Lib/test/test_pathlib/test_pathlib.py +++ b/Lib/test/test_pathlib/test_pathlib.py @@ -16,6 +16,7 @@ from unittest import mock from urllib.request import pathname2url from test.support import import_helper +from test.support import cpython_only from test.support import is_emscripten, is_wasi from test.support import infinite_recursion from test.support import os_helper @@ -80,6 +81,12 @@ class UnsupportedOperationTest(unittest.TestCase): self.assertTrue(isinstance(pathlib.UnsupportedOperation(), NotImplementedError)) +class LazyImportTest(unittest.TestCase): + @cpython_only + def test_lazy_import(self): + import_helper.ensure_lazy_imports("pathlib", {"shutil"}) + + # # Tests for the pure classes. # @@ -3290,7 +3297,6 @@ class PathTest(PurePathTest): self.assertEqual(P.from_uri('file:////foo/bar'), P('//foo/bar')) self.assertEqual(P.from_uri('file://localhost/foo/bar'), P('/foo/bar')) if not is_wasi: - self.assertEqual(P.from_uri('file://127.0.0.1/foo/bar'), P('/foo/bar')) self.assertEqual(P.from_uri(f'file://{socket.gethostname()}/foo/bar'), P('/foo/bar')) self.assertRaises(ValueError, P.from_uri, 'foo/bar') diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py index be365a5a3dd..54797d7898f 100644 --- a/Lib/test/test_pdb.py +++ b/Lib/test/test_pdb.py @@ -1,7 +1,9 @@ # A test suite for pdb; not very comprehensive at the moment. +import _colorize import doctest import gc +import io import os import pdb import sys @@ -18,7 +20,7 @@ from asyncio.events import _set_event_loop_policy from contextlib import ExitStack, redirect_stdout from io import StringIO from test import support -from test.support import force_not_colorized, has_socket_support, os_helper +from test.support import has_socket_support, os_helper from test.support.import_helper import import_module from test.support.pty_helper import run_pty, FakeInput from test.support.script_helper import kill_python @@ -3446,6 +3448,7 @@ def test_pdb_issue_gh_65052(): """ +@support.force_not_colorized_test_class @support.requires_subprocess() class PdbTestCase(unittest.TestCase): def tearDown(self): @@ -3740,7 +3743,6 @@ def bœr(): self.assertNotIn(b'Error', stdout, "Got an error running test script under PDB") - @force_not_colorized def test_issue16180(self): # A syntax error in the debuggee. script = "def f: pass\n" @@ -3754,7 +3756,6 @@ def bœr(): 'Fail to handle a syntax error in the debuggee.' .format(expected, stderr)) - @force_not_colorized def test_issue84583(self): # A syntax error from ast.literal_eval should not make pdb exit. script = "import ast; ast.literal_eval('')\n" @@ -4688,6 +4689,40 @@ class PdbTestInline(unittest.TestCase): self.assertIn("42", stdout) +@support.force_colorized_test_class +class PdbTestColorize(unittest.TestCase): + def setUp(self): + self._original_can_colorize = _colorize.can_colorize + # Force colorize to be enabled because we are sending data + # to a StringIO + _colorize.can_colorize = lambda *args, **kwargs: True + + def tearDown(self): + _colorize.can_colorize = self._original_can_colorize + + def test_code_display(self): + output = io.StringIO() + p = pdb.Pdb(stdout=output, colorize=True) + p.set_trace(commands=['ll', 'c']) + self.assertIn("\x1b", output.getvalue()) + + output = io.StringIO() + p = pdb.Pdb(stdout=output, colorize=False) + p.set_trace(commands=['ll', 'c']) + self.assertNotIn("\x1b", output.getvalue()) + + output = io.StringIO() + p = pdb.Pdb(stdout=output) + p.set_trace(commands=['ll', 'c']) + self.assertNotIn("\x1b", output.getvalue()) + + def test_stack_entry(self): + output = io.StringIO() + p = pdb.Pdb(stdout=output, colorize=True) + p.set_trace(commands=['w', 'c']) + self.assertIn("\x1b", output.getvalue()) + + @support.force_not_colorized_test_class @support.requires_subprocess() class TestREPLSession(unittest.TestCase): @@ -4711,6 +4746,7 @@ class TestREPLSession(unittest.TestCase): self.assertEqual(p.returncode, 0) +@support.force_not_colorized_test_class @support.requires_subprocess() class PdbTestReadline(unittest.TestCase): def setUpClass(): @@ -4812,14 +4848,35 @@ class PdbTestReadline(unittest.TestCase): self.assertIn(b'I love Python', output) + def test_multiline_auto_indent(self): + script = textwrap.dedent(""" + import pdb; pdb.Pdb().set_trace() + """) + + input = b"def f(x):\n" + input += b"if x > 0:\n" + input += b"x += 1\n" + input += b"return x\n" + # We need to do backspaces to remove the auto-indentation + input += b"\x08\x08\x08\x08else:\n" + input += b"return -x\n" + input += b"\n" + input += b"f(-21-21)\n" + input += b"c\n" + + output = run_pty(script, input) + + self.assertIn(b'42', output) + def test_multiline_completion(self): script = textwrap.dedent(""" import pdb; pdb.Pdb().set_trace() """) input = b"def func():\n" - # Complete: \treturn 40 + 2 - input += b"\tret\t 40 + 2\n" + # Auto-indent + # Complete: return 40 + 2 + input += b"ret\t 40 + 2\n" input += b"\n" # Complete: func() input += b"fun\t()\n" @@ -4839,12 +4896,13 @@ class PdbTestReadline(unittest.TestCase): # if the completion is not working as expected input = textwrap.dedent("""\ def func(): - \ta = 1 - \ta += 1 - \ta += 1 - \tif a > 0: - a += 1 - \t\treturn a + a = 1 + \x08\ta += 1 + \x08\x08\ta += 1 + \x08\x08\x08\ta += 1 + \x08\x08\x08\x08\tif a > 0: + a += 1 + \x08\x08\x08\x08return a func() c @@ -4852,7 +4910,7 @@ class PdbTestReadline(unittest.TestCase): output = run_pty(script, input) - self.assertIn(b'4', output) + self.assertIn(b'5', output) self.assertNotIn(b'Error', output) def test_interact_completion(self): diff --git a/Lib/test/test_peepholer.py b/Lib/test/test_peepholer.py index 565e42b04a6..47f51f1979f 100644 --- a/Lib/test/test_peepholer.py +++ b/Lib/test/test_peepholer.py @@ -1,4 +1,5 @@ import dis +import gc from itertools import combinations, product import opcode import sys @@ -2472,6 +2473,13 @@ class OptimizeLoadFastTestCase(DirectCfgOptimizerTests): ] self.check(insts, insts) + insts = [ + ("LOAD_FAST", 0, 1), + ("DELETE_FAST", 0, 2), + ("POP_TOP", None, 3), + ] + self.check(insts, insts) + def test_unoptimized_if_aliased(self): insts = [ ("LOAD_FAST", 0, 1), @@ -2606,6 +2614,22 @@ class OptimizeLoadFastTestCase(DirectCfgOptimizerTests): ] self.cfg_optimization_test(insts, expected, consts=[None]) + def test_del_in_finally(self): + # This loads `obj` onto the stack, executes `del obj`, then returns the + # `obj` from the stack. See gh-133371 for more details. + def create_obj(): + obj = [42] + try: + return obj + finally: + del obj + + obj = create_obj() + # The crash in the linked issue happens while running GC during + # interpreter finalization, so run it here manually. + gc.collect() + self.assertEqual(obj, [42]) + if __name__ == "__main__": diff --git a/Lib/test/test_pickle.py b/Lib/test/test_pickle.py index 296d4b882e1..742ca8de1be 100644 --- a/Lib/test/test_pickle.py +++ b/Lib/test/test_pickle.py @@ -15,7 +15,8 @@ from textwrap import dedent import doctest import unittest from test import support -from test.support import import_helper, os_helper +from test.support import cpython_only, import_helper, os_helper +from test.support.import_helper import ensure_lazy_imports from test.pickletester import AbstractHookTests from test.pickletester import AbstractUnpickleTests @@ -36,6 +37,12 @@ except ImportError: has_c_implementation = False +class LazyImportTest(unittest.TestCase): + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("pickle", {"re"}) + + class PyPickleTests(AbstractPickleModuleTests, unittest.TestCase): dump = staticmethod(pickle._dump) dumps = staticmethod(pickle._dumps) @@ -745,6 +752,7 @@ class CommandLineTest(unittest.TestCase): expect = self.text_normalize(expect) self.assertListEqual(res.splitlines(), expect.splitlines()) + @support.force_not_colorized def test_unknown_flag(self): stderr = io.StringIO() with self.assertRaises(SystemExit): diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py index b90edc05e04..818e807dd3a 100644 --- a/Lib/test/test_platform.py +++ b/Lib/test/test_platform.py @@ -794,6 +794,7 @@ class CommandLineTest(unittest.TestCase): self.invoke_platform(*flags) obj.assert_called_once_with(aliased, terse) + @support.force_not_colorized def test_help(self): output = io.StringIO() diff --git a/Lib/test/test_pprint.py b/Lib/test/test_pprint.py index dfbc2a06e73..f68996f72b1 100644 --- a/Lib/test/test_pprint.py +++ b/Lib/test/test_pprint.py @@ -11,6 +11,9 @@ import re import types import unittest +from test.support import cpython_only +from test.support.import_helper import ensure_lazy_imports + # list, tuple and dict subclasses that do or don't overwrite __repr__ class list2(list): pass @@ -129,6 +132,10 @@ class QueryTestCase(unittest.TestCase): self.b = list(range(200)) self.a[-12] = self.b + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("pprint", {"dataclasses", "re"}) + def test_init(self): pp = pprint.PrettyPrinter() pp = pprint.PrettyPrinter(indent=4, width=40, depth=5, diff --git a/Lib/test/test_pstats.py b/Lib/test/test_pstats.py index d5a5a9738c2..a26a8c1d522 100644 --- a/Lib/test/test_pstats.py +++ b/Lib/test/test_pstats.py @@ -1,6 +1,7 @@ import unittest from test import support +from test.support.import_helper import ensure_lazy_imports from io import StringIO from pstats import SortKey from enum import StrEnum, _test_simple_enum @@ -10,6 +11,12 @@ import pstats import tempfile import cProfile +class LazyImportTest(unittest.TestCase): + @support.cpython_only + def test_lazy_import(self): + ensure_lazy_imports("pstats", {"typing"}) + + class AddCallersTestCase(unittest.TestCase): """Tests for pstats.add_callers helper.""" diff --git a/Lib/test/test_pyrepl/support.py b/Lib/test/test_pyrepl/support.py index 3692e164cb9..4f7f9d77933 100644 --- a/Lib/test/test_pyrepl/support.py +++ b/Lib/test/test_pyrepl/support.py @@ -113,9 +113,6 @@ handle_events_narrow_console = partial( prepare_console=partial(prepare_console, width=10), ) -reader_no_colors = partial(prepare_reader, can_colorize=False) -reader_force_colors = partial(prepare_reader, can_colorize=True) - class FakeConsole(Console): def __init__(self, events, encoding="utf-8") -> None: diff --git a/Lib/test/test_pyrepl/test_eventqueue.py b/Lib/test/test_pyrepl/test_eventqueue.py index afb55710342..edfe6ac4748 100644 --- a/Lib/test/test_pyrepl/test_eventqueue.py +++ b/Lib/test/test_pyrepl/test_eventqueue.py @@ -53,7 +53,7 @@ class EventQueueTestBase: mock_keymap.compile_keymap.return_value = {"a": "b"} eq = self.make_eventqueue() eq.keymap = {b"a": "b"} - eq.push("a") + eq.push(b"a") mock_keymap.compile_keymap.assert_called() self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "b") @@ -63,7 +63,7 @@ class EventQueueTestBase: mock_keymap.compile_keymap.return_value = {"a": "b"} eq = self.make_eventqueue() eq.keymap = {b"c": "d"} - eq.push("a") + eq.push(b"a") mock_keymap.compile_keymap.assert_called() self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "a") @@ -73,13 +73,13 @@ class EventQueueTestBase: mock_keymap.compile_keymap.return_value = {"a": "b"} eq = self.make_eventqueue() eq.keymap = {b"a": {b"b": "c"}} - eq.push("a") + eq.push(b"a") mock_keymap.compile_keymap.assert_called() self.assertTrue(eq.empty()) - eq.push("b") + eq.push(b"b") self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "c") - eq.push("d") + eq.push(b"d") self.assertEqual(eq.events[1].evt, "key") self.assertEqual(eq.events[1].data, "d") @@ -88,32 +88,32 @@ class EventQueueTestBase: mock_keymap.compile_keymap.return_value = {"a": "b"} eq = self.make_eventqueue() eq.keymap = {b"a": {b"b": "c"}} - eq.push("a") + eq.push(b"a") mock_keymap.compile_keymap.assert_called() self.assertTrue(eq.empty()) eq.flush_buf() - eq.push("\033") + eq.push(b"\033") self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "\033") - eq.push("b") + eq.push(b"b") self.assertEqual(eq.events[1].evt, "key") self.assertEqual(eq.events[1].data, "b") def test_push_special_key(self): eq = self.make_eventqueue() eq.keymap = {} - eq.push("\x1b") - eq.push("[") - eq.push("A") + eq.push(b"\x1b") + eq.push(b"[") + eq.push(b"A") self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "\x1b") def test_push_unrecognized_escape_sequence(self): eq = self.make_eventqueue() eq.keymap = {} - eq.push("\x1b") - eq.push("[") - eq.push("Z") + eq.push(b"\x1b") + eq.push(b"[") + eq.push(b"Z") self.assertEqual(len(eq.events), 3) self.assertEqual(eq.events[0].evt, "key") self.assertEqual(eq.events[0].data, "\x1b") @@ -122,12 +122,54 @@ class EventQueueTestBase: self.assertEqual(eq.events[2].evt, "key") self.assertEqual(eq.events[2].data, "Z") - def test_push_unicode_character(self): + def test_push_unicode_character_as_str(self): eq = self.make_eventqueue() eq.keymap = {} - eq.push("ч") - self.assertEqual(eq.events[0].evt, "key") - self.assertEqual(eq.events[0].data, "ч") + with self.assertRaises(AssertionError): + eq.push("ч") + with self.assertRaises(AssertionError): + eq.push("ñ") + + def test_push_unicode_character_two_bytes(self): + eq = self.make_eventqueue() + eq.keymap = {} + + encoded = "ч".encode(eq.encoding, "replace") + self.assertEqual(len(encoded), 2) + + eq.push(encoded[0]) + e = eq.get() + self.assertIsNone(e) + + eq.push(encoded[1]) + e = eq.get() + self.assertEqual(e.evt, "key") + self.assertEqual(e.data, "ч") + + def test_push_single_chars_and_unicode_character_as_str(self): + eq = self.make_eventqueue() + eq.keymap = {} + + def _event(evt, data, raw=None): + r = raw if raw is not None else data.encode(eq.encoding) + e = Event(evt, data, r) + return e + + def _push(keys): + for k in keys: + eq.push(k) + + self.assertIsInstance("ñ", str) + + # If an exception happens during push, the existing events must be + # preserved and we can continue to push. + _push(b"b") + with self.assertRaises(AssertionError): + _push("ñ") + _push(b"a") + + self.assertEqual(eq.get(), _event("key", "b")) + self.assertEqual(eq.get(), _event("key", "a")) @unittest.skipIf(support.MS_WINDOWS, "No Unix event queue on Windows") diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py index 8d7fcf538d2..4ee320a5a4d 100644 --- a/Lib/test/test_pyrepl/test_reader.py +++ b/Lib/test/test_pyrepl/test_reader.py @@ -4,20 +4,21 @@ import rlcompleter from textwrap import dedent from unittest import TestCase from unittest.mock import MagicMock +from test.support import force_colorized_test_class, force_not_colorized_test_class from .support import handle_all_events, handle_events_narrow_console from .support import ScreenEqualMixin, code_to_events -from .support import prepare_console, reader_force_colors -from .support import reader_no_colors as prepare_reader +from .support import prepare_reader, prepare_console from _pyrepl.console import Event from _pyrepl.reader import Reader -from _colorize import theme +from _colorize import default_theme -overrides = {"RESET": "z", "SOFT_KEYWORD": "K"} -colors = {overrides.get(k, k[0].lower()): v for k, v in theme.items()} +overrides = {"reset": "z", "soft_keyword": "K"} +colors = {overrides.get(k, k[0].lower()): v for k, v in default_theme.syntax.items()} +@force_not_colorized_test_class class TestReader(ScreenEqualMixin, TestCase): def test_calc_screen_wrap_simple(self): events = code_to_events(10 * "a") @@ -127,13 +128,6 @@ class TestReader(ScreenEqualMixin, TestCase): reader.setpos_from_xy(0, 0) self.assertEqual(reader.pos, 0) - def test_control_characters(self): - code = 'flag = "🏳️🌈"' - events = code_to_events(code) - reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) - self.assert_screen_equal(reader, 'flag = "🏳️\\u200d🌈"', clean=True) - self.assert_screen_equal(reader, 'flag {o}={z} {s}"🏳️\\u200d🌈"{z}'.format(**colors)) - def test_setpos_from_xy_multiple_lines(self): # fmt: off code = ( @@ -364,6 +358,8 @@ class TestReader(ScreenEqualMixin, TestCase): reader.setpos_from_xy(8, 0) self.assertEqual(reader.pos, 7) +@force_colorized_test_class +class TestReaderInColor(ScreenEqualMixin, TestCase): def test_syntax_highlighting_basic(self): code = dedent( """\ @@ -403,7 +399,7 @@ class TestReader(ScreenEqualMixin, TestCase): ) expected_sync = expected.format(a="", **colors) events = code_to_events(code) - reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + reader, _ = handle_all_events(events) self.assert_screen_equal(reader, code, clean=True) self.assert_screen_equal(reader, expected_sync) self.assertEqual(reader.pos, 2**7 + 2**8) @@ -416,7 +412,7 @@ class TestReader(ScreenEqualMixin, TestCase): [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))] * 13, code_to_events("async "), ) - reader, _ = handle_all_events(more_events, prepare_reader=reader_force_colors) + reader, _ = handle_all_events(more_events) self.assert_screen_equal(reader, expected_async) self.assertEqual(reader.pos, 21) self.assertEqual(reader.cxy, (6, 1)) @@ -433,7 +429,7 @@ class TestReader(ScreenEqualMixin, TestCase): """ ).format(**colors) events = code_to_events(code) - reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + reader, _ = handle_all_events(events) self.assert_screen_equal(reader, code, clean=True) self.assert_screen_equal(reader, expected) @@ -451,7 +447,7 @@ class TestReader(ScreenEqualMixin, TestCase): """ ).format(**colors) events = code_to_events(code) - reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + reader, _ = handle_all_events(events) self.assert_screen_equal(reader, code, clean=True) self.assert_screen_equal(reader, expected) @@ -471,7 +467,7 @@ class TestReader(ScreenEqualMixin, TestCase): """ ).format(**colors) events = code_to_events(code) - reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + reader, _ = handle_all_events(events) self.assert_screen_equal(reader, code, clean=True) self.assert_screen_equal(reader, expected) @@ -497,6 +493,13 @@ class TestReader(ScreenEqualMixin, TestCase): """ ).format(OB="{", CB="}", **colors) events = code_to_events(code) - reader, _ = handle_all_events(events, prepare_reader=reader_force_colors) + reader, _ = handle_all_events(events) self.assert_screen_equal(reader, code, clean=True) self.assert_screen_equal(reader, expected) + + def test_control_characters(self): + code = 'flag = "🏳️🌈"' + events = code_to_events(code) + reader, _ = handle_all_events(events) + self.assert_screen_equal(reader, 'flag = "🏳️\\u200d🌈"', clean=True) + self.assert_screen_equal(reader, 'flag {o}={z} {s}"🏳️\\u200d🌈"{z}'.format(**colors)) diff --git a/Lib/test/test_pyrepl/test_unix_console.py b/Lib/test/test_pyrepl/test_unix_console.py index 7acb84a94f7..c447b310c49 100644 --- a/Lib/test/test_pyrepl/test_unix_console.py +++ b/Lib/test/test_pyrepl/test_unix_console.py @@ -3,11 +3,12 @@ import os import sys import unittest from functools import partial -from test.support import os_helper +from test.support import os_helper, force_not_colorized_test_class + from unittest import TestCase from unittest.mock import MagicMock, call, patch, ANY -from .support import handle_all_events, code_to_events, reader_no_colors +from .support import handle_all_events, code_to_events try: from _pyrepl.console import Event @@ -33,12 +34,10 @@ def unix_console(events, **kwargs): handle_events_unix_console = partial( handle_all_events, - prepare_reader=reader_no_colors, prepare_console=unix_console, ) handle_events_narrow_unix_console = partial( handle_all_events, - prepare_reader=reader_no_colors, prepare_console=partial(unix_console, width=5), ) handle_events_short_unix_console = partial( @@ -120,6 +119,7 @@ TERM_CAPABILITIES = { ) @patch("termios.tcsetattr", lambda a, b, c: None) @patch("os.write") +@force_not_colorized_test_class class TestConsole(TestCase): def test_simple_addition(self, _os_write): code = "12+34" @@ -255,9 +255,7 @@ class TestConsole(TestCase): # fmt: on events = itertools.chain(code_to_events(code)) - reader, console = handle_events_short_unix_console( - events, prepare_reader=reader_no_colors - ) + reader, console = handle_events_short_unix_console(events) console.height = 2 console.getheightwidth = MagicMock(lambda _: (2, 80)) diff --git a/Lib/test/test_pyrepl/test_windows_console.py b/Lib/test/test_pyrepl/test_windows_console.py index e95fec46a85..e7bab226b31 100644 --- a/Lib/test/test_pyrepl/test_windows_console.py +++ b/Lib/test/test_pyrepl/test_windows_console.py @@ -7,12 +7,13 @@ if sys.platform != "win32": import itertools from functools import partial +from test.support import force_not_colorized_test_class from typing import Iterable from unittest import TestCase from unittest.mock import MagicMock, call from .support import handle_all_events, code_to_events -from .support import reader_no_colors as default_prepare_reader +from .support import prepare_reader as default_prepare_reader try: from _pyrepl.console import Event, Console @@ -24,10 +25,12 @@ try: MOVE_DOWN, ERASE_IN_LINE, ) + import _pyrepl.windows_console as wc except ImportError: pass +@force_not_colorized_test_class class WindowsConsoleTests(TestCase): def console(self, events, **kwargs) -> Console: console = WindowsConsole() @@ -350,8 +353,226 @@ class WindowsConsoleTests(TestCase): Event(evt="key", data='\x1a', raw=bytearray(b'\x1a')), ], ) - reader, _ = self.handle_events_narrow(events) + reader, con = self.handle_events_narrow(events) self.assertEqual(reader.cxy, (2, 3)) + con.restore() + + +class WindowsConsoleGetEventTests(TestCase): + # Virtual-Key Codes: https://learn.microsoft.com/en-us/windows/win32/inputdev/virtual-key-codes + VK_BACK = 0x08 + VK_RETURN = 0x0D + VK_LEFT = 0x25 + VK_7 = 0x37 + VK_M = 0x4D + # Used for miscellaneous characters; it can vary by keyboard. + # For the US standard keyboard, the '" key. + # For the German keyboard, the Ä key. + VK_OEM_7 = 0xDE + + # State of control keys: https://learn.microsoft.com/en-us/windows/console/key-event-record-str + RIGHT_ALT_PRESSED = 0x0001 + RIGHT_CTRL_PRESSED = 0x0004 + LEFT_ALT_PRESSED = 0x0002 + LEFT_CTRL_PRESSED = 0x0008 + ENHANCED_KEY = 0x0100 + SHIFT_PRESSED = 0x0010 + + + def get_event(self, input_records, **kwargs) -> Console: + self.console = WindowsConsole(encoding='utf-8') + self.mock = MagicMock(side_effect=input_records) + self.console._read_input = self.mock + self.console._WindowsConsole__vt_support = kwargs.get("vt_support", + False) + event = self.console.get_event(block=False) + return event + + def get_input_record(self, unicode_char, vcode=0, control=0): + return wc.INPUT_RECORD( + wc.KEY_EVENT, + wc.ConsoleEvent(KeyEvent= + wc.KeyEvent( + bKeyDown=True, + wRepeatCount=1, + wVirtualKeyCode=vcode, + wVirtualScanCode=0, # not used + uChar=wc.Char(unicode_char), + dwControlKeyState=control + ))) + + def test_EmptyBuffer(self): + self.assertEqual(self.get_event([None]), None) + self.assertEqual(self.mock.call_count, 1) + + def test_WINDOW_BUFFER_SIZE_EVENT(self): + ir = wc.INPUT_RECORD( + wc.WINDOW_BUFFER_SIZE_EVENT, + wc.ConsoleEvent(WindowsBufferSizeEvent= + wc.WindowsBufferSizeEvent( + wc._COORD(0, 0)))) + self.assertEqual(self.get_event([ir]), Event("resize", "")) + self.assertEqual(self.mock.call_count, 1) + + def test_KEY_EVENT_up_ignored(self): + ir = wc.INPUT_RECORD( + wc.KEY_EVENT, + wc.ConsoleEvent(KeyEvent= + wc.KeyEvent(bKeyDown=False))) + self.assertEqual(self.get_event([ir]), None) + self.assertEqual(self.mock.call_count, 1) + + def test_unhandled_events(self): + for event in (wc.FOCUS_EVENT, wc.MENU_EVENT, wc.MOUSE_EVENT): + ir = wc.INPUT_RECORD( + event, + # fake data, nothing is read except bKeyDown + wc.ConsoleEvent(KeyEvent= + wc.KeyEvent(bKeyDown=False))) + self.assertEqual(self.get_event([ir]), None) + self.assertEqual(self.mock.call_count, 1) + + def test_enter(self): + ir = self.get_input_record("\r", self.VK_RETURN) + self.assertEqual(self.get_event([ir]), Event("key", "\n")) + self.assertEqual(self.mock.call_count, 1) + + def test_backspace(self): + ir = self.get_input_record("\x08", self.VK_BACK) + self.assertEqual( + self.get_event([ir]), Event("key", "backspace")) + self.assertEqual(self.mock.call_count, 1) + + def test_m(self): + ir = self.get_input_record("m", self.VK_M) + self.assertEqual(self.get_event([ir]), Event("key", "m")) + self.assertEqual(self.mock.call_count, 1) + + def test_M(self): + ir = self.get_input_record("M", self.VK_M, self.SHIFT_PRESSED) + self.assertEqual(self.get_event([ir]), Event("key", "M")) + self.assertEqual(self.mock.call_count, 1) + + def test_left(self): + # VK_LEFT is sent as ENHANCED_KEY + ir = self.get_input_record("\x00", self.VK_LEFT, self.ENHANCED_KEY) + self.assertEqual(self.get_event([ir]), Event("key", "left")) + self.assertEqual(self.mock.call_count, 1) + + def test_left_RIGHT_CTRL_PRESSED(self): + ir = self.get_input_record( + "\x00", self.VK_LEFT, self.RIGHT_CTRL_PRESSED | self.ENHANCED_KEY) + self.assertEqual( + self.get_event([ir]), Event("key", "ctrl left")) + self.assertEqual(self.mock.call_count, 1) + + def test_left_LEFT_CTRL_PRESSED(self): + ir = self.get_input_record( + "\x00", self.VK_LEFT, self.LEFT_CTRL_PRESSED | self.ENHANCED_KEY) + self.assertEqual( + self.get_event([ir]), Event("key", "ctrl left")) + self.assertEqual(self.mock.call_count, 1) + + def test_left_RIGHT_ALT_PRESSED(self): + ir = self.get_input_record( + "\x00", self.VK_LEFT, self.RIGHT_ALT_PRESSED | self.ENHANCED_KEY) + self.assertEqual(self.get_event([ir]), Event(evt="key", data="\033")) + self.assertEqual( + self.console.get_event(), Event("key", "left")) + # self.mock is not called again, since the second time we read from the + # command queue + self.assertEqual(self.mock.call_count, 1) + + def test_left_LEFT_ALT_PRESSED(self): + ir = self.get_input_record( + "\x00", self.VK_LEFT, self.LEFT_ALT_PRESSED | self.ENHANCED_KEY) + self.assertEqual(self.get_event([ir]), Event(evt="key", data="\033")) + self.assertEqual( + self.console.get_event(), Event("key", "left")) + self.assertEqual(self.mock.call_count, 1) + + def test_m_LEFT_ALT_PRESSED_and_LEFT_CTRL_PRESSED(self): + # For the shift keys, Windows does not send anything when + # ALT and CTRL are both pressed, so let's test with VK_M. + # get_event() receives this input, but does not + # generate an event. + # This is for e.g. an English keyboard layout, for a + # German layout this returns `µ`, see test_AltGr_m. + ir = self.get_input_record( + "\x00", self.VK_M, self.LEFT_ALT_PRESSED | self.LEFT_CTRL_PRESSED) + self.assertEqual(self.get_event([ir]), None) + self.assertEqual(self.mock.call_count, 1) + + def test_m_LEFT_ALT_PRESSED(self): + ir = self.get_input_record( + "m", vcode=self.VK_M, control=self.LEFT_ALT_PRESSED) + self.assertEqual(self.get_event([ir]), Event(evt="key", data="\033")) + self.assertEqual(self.console.get_event(), Event("key", "m")) + self.assertEqual(self.mock.call_count, 1) + + def test_m_RIGHT_ALT_PRESSED(self): + ir = self.get_input_record( + "m", vcode=self.VK_M, control=self.RIGHT_ALT_PRESSED) + self.assertEqual(self.get_event([ir]), Event(evt="key", data="\033")) + self.assertEqual(self.console.get_event(), Event("key", "m")) + self.assertEqual(self.mock.call_count, 1) + + def test_AltGr_7(self): + # E.g. on a German keyboard layout, '{' is entered via + # AltGr + 7, where AltGr is the right Alt key on the keyboard. + # In this case, Windows automatically sets + # RIGHT_ALT_PRESSED = 0x0001 + LEFT_CTRL_PRESSED = 0x0008 + # This can also be entered like + # LeftAlt + LeftCtrl + 7 or + # LeftAlt + RightCtrl + 7 + # See https://learn.microsoft.com/en-us/windows/console/key-event-record-str + # https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-vkkeyscanw + ir = self.get_input_record( + "{", vcode=self.VK_7, + control=self.RIGHT_ALT_PRESSED | self.LEFT_CTRL_PRESSED) + self.assertEqual(self.get_event([ir]), Event("key", "{")) + self.assertEqual(self.mock.call_count, 1) + + def test_AltGr_m(self): + # E.g. on a German keyboard layout, this yields 'µ' + # Let's use LEFT_ALT_PRESSED and RIGHT_CTRL_PRESSED this + # time, to cover that, too. See above in test_AltGr_7. + ir = self.get_input_record( + "µ", vcode=self.VK_M, control=self.LEFT_ALT_PRESSED | self.RIGHT_CTRL_PRESSED) + self.assertEqual(self.get_event([ir]), Event("key", "µ")) + self.assertEqual(self.mock.call_count, 1) + + def test_umlaut_a_german(self): + ir = self.get_input_record("ä", self.VK_OEM_7) + self.assertEqual(self.get_event([ir]), Event("key", "ä")) + self.assertEqual(self.mock.call_count, 1) + + # virtual terminal tests + # Note: wVirtualKeyCode, wVirtualScanCode and dwControlKeyState + # are always zero in this case. + # "\r" and backspace are handled specially, everything else + # is handled in "elif self.__vt_support:" in WindowsConsole.get_event(). + # Hence, only one regular key ("m") and a terminal sequence + # are sufficient to test here, the real tests happen in test_eventqueue + # and test_keymap. + + def test_enter_vt(self): + ir = self.get_input_record("\r") + self.assertEqual(self.get_event([ir], vt_support=True), + Event("key", "\n")) + self.assertEqual(self.mock.call_count, 1) + + def test_backspace_vt(self): + ir = self.get_input_record("\x7f") + self.assertEqual(self.get_event([ir], vt_support=True), + Event("key", "backspace", b"\x7f")) + self.assertEqual(self.mock.call_count, 1) + + def test_up_vt(self): + irs = [self.get_input_record(x) for x in "\x1b[A"] + self.assertEqual(self.get_event(irs, vt_support=True), + Event(evt='key', data='up', raw=bytearray(b'\x1b[A'))) + self.assertEqual(self.mock.call_count, 3) if __name__ == "__main__": diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py index 96f6cc86219..43957f525f1 100644 --- a/Lib/test/test_random.py +++ b/Lib/test/test_random.py @@ -1411,6 +1411,7 @@ class TestModule(unittest.TestCase): class CommandLineTest(unittest.TestCase): + @support.force_not_colorized def test_parse_args(self): args, help_text = random._parse_args(shlex.split("--choice a b c")) self.assertEqual(args.choice, ["a", "b", "c"]) diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py index 9fbe94fcdd6..9c794991dd5 100644 --- a/Lib/test/test_remote_pdb.py +++ b/Lib/test/test_remote_pdb.py @@ -12,7 +12,7 @@ import textwrap import threading import unittest import unittest.mock -from contextlib import contextmanager, redirect_stdout, ExitStack +from contextlib import closing, contextmanager, redirect_stdout, ExitStack from pathlib import Path from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT from test.support.os_helper import temp_dir, TESTFN, unlink @@ -79,44 +79,6 @@ class MockSocketFile: return results -class MockDebuggerSocket: - """Mock file-like simulating a connection to a _RemotePdb instance""" - - def __init__(self, incoming): - self.incoming = iter(incoming) - self.outgoing = [] - self.buffered = bytearray() - - def write(self, data: bytes) -> None: - """Simulate write to socket.""" - self.buffered += data - - def flush(self) -> None: - """Ensure each line is valid JSON.""" - lines = self.buffered.splitlines(keepends=True) - self.buffered.clear() - for line in lines: - assert line.endswith(b"\n") - self.outgoing.append(json.loads(line)) - - def readline(self) -> bytes: - """Read a line from the prepared input queue.""" - # Anything written must be flushed before trying to read, - # since the read will be dependent upon the last write. - assert not self.buffered - try: - item = next(self.incoming) - if not isinstance(item, bytes): - item = json.dumps(item).encode() - return item + b"\n" - except StopIteration: - return b"" - - def close(self) -> None: - """No-op close implementation.""" - pass - - class PdbClientTestCase(unittest.TestCase): """Tests for the _PdbClient class.""" @@ -124,8 +86,11 @@ class PdbClientTestCase(unittest.TestCase): self, *, incoming, - simulate_failure=None, + simulate_send_failure=False, + simulate_sigint_during_stdout_write=False, + use_interrupt_socket=False, expected_outgoing=None, + expected_outgoing_signals=None, expected_completions=None, expected_exception=None, expected_stdout="", @@ -134,6 +99,8 @@ class PdbClientTestCase(unittest.TestCase): ): if expected_outgoing is None: expected_outgoing = [] + if expected_outgoing_signals is None: + expected_outgoing_signals = [] if expected_completions is None: expected_completions = [] if expected_state is None: @@ -142,16 +109,6 @@ class PdbClientTestCase(unittest.TestCase): expected_state.setdefault("write_failed", False) messages = [m for source, m in incoming if source == "server"] prompts = [m["prompt"] for source, m in incoming if source == "user"] - sockfile = MockDebuggerSocket(messages) - stdout = io.StringIO() - - if simulate_failure: - sockfile.write = unittest.mock.Mock() - sockfile.flush = unittest.mock.Mock() - if simulate_failure == "write": - sockfile.write.side_effect = OSError("write failed") - elif simulate_failure == "flush": - sockfile.flush.side_effect = OSError("flush failed") input_iter = (m for source, m in incoming if source == "user") completions = [] @@ -178,18 +135,60 @@ class PdbClientTestCase(unittest.TestCase): reply = message["input"] if isinstance(reply, BaseException): raise reply - return reply + if isinstance(reply, str): + return reply + return reply() with ExitStack() as stack: + client_sock, server_sock = socket.socketpair() + stack.enter_context(closing(client_sock)) + stack.enter_context(closing(server_sock)) + + server_sock = unittest.mock.Mock(wraps=server_sock) + + client_sock.sendall( + b"".join( + (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n" + for m in messages + ) + ) + client_sock.shutdown(socket.SHUT_WR) + + if simulate_send_failure: + server_sock.sendall = unittest.mock.Mock( + side_effect=OSError("sendall failed") + ) + client_sock.shutdown(socket.SHUT_RD) + + stdout = io.StringIO() + + if simulate_sigint_during_stdout_write: + orig_stdout_write = stdout.write + + def sigint_stdout_write(s): + signal.raise_signal(signal.SIGINT) + return orig_stdout_write(s) + + stdout.write = sigint_stdout_write + input_mock = stack.enter_context( unittest.mock.patch("pdb.input", side_effect=mock_input) ) stack.enter_context(redirect_stdout(stdout)) + if use_interrupt_socket: + interrupt_sock = unittest.mock.Mock(spec=socket.socket) + mock_kill = None + else: + interrupt_sock = None + mock_kill = stack.enter_context( + unittest.mock.patch("os.kill", spec=os.kill) + ) + client = _PdbClient( - pid=0, - sockfile=sockfile, - interrupt_script="/a/b.py", + pid=12345, + server_socket=server_sock, + interrupt_sock=interrupt_sock, ) if expected_exception is not None: @@ -199,13 +198,12 @@ class PdbClientTestCase(unittest.TestCase): client.cmdloop() - actual_outgoing = sockfile.outgoing - if simulate_failure: - actual_outgoing += [ - json.loads(msg.args[0]) for msg in sockfile.write.mock_calls - ] + sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls] + for msg in sent_msgs: + assert msg.endswith(b"\n") + actual_outgoing = [json.loads(msg) for msg in sent_msgs] - self.assertEqual(sockfile.outgoing, expected_outgoing) + self.assertEqual(actual_outgoing, expected_outgoing) self.assertEqual(completions, expected_completions) if expected_stdout_substring and not expected_stdout: self.assertIn(expected_stdout_substring, stdout.getvalue()) @@ -215,6 +213,20 @@ class PdbClientTestCase(unittest.TestCase): actual_state = {k: getattr(client, k) for k in expected_state} self.assertEqual(actual_state, expected_state) + if use_interrupt_socket: + outgoing_signals = [ + signal.Signals(int.from_bytes(call.args[0])) + for call in interrupt_sock.sendall.call_args_list + ] + else: + assert mock_kill is not None + outgoing_signals = [] + for call in mock_kill.call_args_list: + pid, signum = call.args + self.assertEqual(pid, 12345) + outgoing_signals.append(signal.Signals(signum)) + self.assertEqual(outgoing_signals, expected_outgoing_signals) + def test_remote_immediately_closing_the_connection(self): """Test the behavior when the remote closes the connection immediately.""" incoming = [] @@ -409,11 +421,38 @@ class PdbClientTestCase(unittest.TestCase): expected_state={"state": "dumb"}, ) - def test_keyboard_interrupt_at_prompt(self): - """Test signaling when a prompt gets a KeyboardInterrupt.""" + def test_sigint_at_prompt(self): + """Test signaling when a prompt gets interrupted.""" incoming = [ ("server", {"prompt": "(Pdb) ", "state": "pdb"}), - ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}), + ( + "user", + { + "prompt": "(Pdb) ", + "input": lambda: signal.raise_signal(signal.SIGINT), + }, + ), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + {"signal": "INT"}, + ], + expected_state={"state": "pdb"}, + ) + + def test_sigint_at_continuation_prompt(self): + """Test signaling when a continuation prompt gets interrupted.""" + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "if True:"}), + ( + "user", + { + "prompt": "... ", + "input": lambda: signal.raise_signal(signal.SIGINT), + }, + ), ] self.do_test( incoming=incoming, @@ -423,6 +462,22 @@ class PdbClientTestCase(unittest.TestCase): expected_state={"state": "pdb"}, ) + def test_sigint_when_writing(self): + """Test siginaling when sys.stdout.write() gets interrupted.""" + incoming = [ + ("server", {"message": "Some message or other\n", "type": "info"}), + ] + for use_interrupt_socket in [False, True]: + with self.subTest(use_interrupt_socket=use_interrupt_socket): + self.do_test( + incoming=incoming, + simulate_sigint_during_stdout_write=True, + use_interrupt_socket=use_interrupt_socket, + expected_outgoing=[], + expected_outgoing_signals=[signal.SIGINT], + expected_stdout="Some message or other\n", + ) + def test_eof_at_prompt(self): """Test signaling when a prompt gets an EOFError.""" incoming = [ @@ -478,20 +533,7 @@ class PdbClientTestCase(unittest.TestCase): self.do_test( incoming=incoming, expected_outgoing=[{"signal": "INT"}], - simulate_failure="write", - expected_state={"write_failed": True}, - ) - - def test_flush_failing(self): - """Test terminating if flush fails due to a half closed socket.""" - incoming = [ - ("server", {"prompt": "(Pdb) ", "state": "pdb"}), - ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}), - ] - self.do_test( - incoming=incoming, - expected_outgoing=[{"signal": "INT"}], - simulate_failure="flush", + simulate_send_failure=True, expected_state={"write_failed": True}, ) @@ -660,42 +702,7 @@ class PdbClientTestCase(unittest.TestCase): }, {"reply": "xyz"}, ], - simulate_failure="write", - expected_completions=[], - expected_state={"state": "interact", "write_failed": True}, - ) - - def test_flush_failure_during_completion(self): - """Test failing to flush to the socket to request tab completions.""" - incoming = [ - ("server", {"prompt": ">>> ", "state": "interact"}), - ( - "user", - { - "prompt": ">>> ", - "completion_request": { - "line": "xy", - "begidx": 0, - "endidx": 2, - }, - "input": "xyz", - }, - ), - ] - self.do_test( - incoming=incoming, - expected_outgoing=[ - { - "complete": { - "text": "xy", - "line": "xy", - "begidx": 0, - "endidx": 2, - } - }, - {"reply": "xyz"}, - ], - simulate_failure="flush", + simulate_send_failure=True, expected_completions=[], expected_state={"state": "interact", "write_failed": True}, ) @@ -1032,6 +1039,7 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=pdb._PdbServer.protocol_version(), + signal_raising_thread=False, ) return x # This line won't be reached in debugging @@ -1089,23 +1097,6 @@ class PdbConnectTestCase(unittest.TestCase): client_file.write(json.dumps({"reply": command}).encode() + b"\n") client_file.flush() - def _send_interrupt(self, pid): - """Helper to send an interrupt signal to the debugger.""" - # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script: - interrupt_script = TESTFN + "_interrupt_script.py" - with open(interrupt_script, 'w') as f: - f.write( - 'import pdb, sys\n' - 'print("Hello, world!")\n' - 'if inst := pdb.Pdb._last_pdb_instance:\n' - ' inst.set_trace(sys._getframe(1))\n' - ) - self.addCleanup(unlink, interrupt_script) - try: - sys.remote_exec(pid, interrupt_script) - except PermissionError: - self.skipTest("Insufficient permissions to execute code in remote process") - def test_connect_and_basic_commands(self): """Test connecting to a remote debugger and sending basic commands.""" self._create_script() @@ -1218,6 +1209,7 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=pdb._PdbServer.protocol_version(), + signal_raising_thread=True, ) print("Connected to debugger") iterations = 50 @@ -1233,6 +1225,10 @@ class PdbConnectTestCase(unittest.TestCase): self._create_script(script=script) process, client_file = self._connect_and_get_client_file() + # Accept a 2nd connection from the subprocess to tell it about signals + signal_sock, _ = self.server_sock.accept() + self.addCleanup(signal_sock.close) + with kill_on_error(process): # Skip initial messages until we get to the prompt self._read_until_prompt(client_file) @@ -1248,7 +1244,7 @@ class PdbConnectTestCase(unittest.TestCase): break # Inject a script to interrupt the running process - self._send_interrupt(process.pid) + signal_sock.sendall(signal.SIGINT.to_bytes()) messages = self._read_until_prompt(client_file) # Verify we got the keyboard interrupt message. @@ -1304,6 +1300,7 @@ class PdbConnectTestCase(unittest.TestCase): frame=frame, commands="", version=fake_version, + signal_raising_thread=False, ) # This should print if the debugger detaches correctly diff --git a/Lib/test/test_shlex.py b/Lib/test/test_shlex.py index f35571ea886..a13ddcb76b7 100644 --- a/Lib/test/test_shlex.py +++ b/Lib/test/test_shlex.py @@ -3,6 +3,7 @@ import itertools import shlex import string import unittest +from test.support import cpython_only from test.support import import_helper @@ -364,6 +365,7 @@ class ShlexTest(unittest.TestCase): with self.assertRaises(AttributeError): shlex_instance.punctuation_chars = False + @cpython_only def test_lazy_imports(self): import_helper.ensure_lazy_imports('shlex', {'collections', 're', 'os'}) diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index ace97ce0cbe..03c54151a22 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -2,8 +2,9 @@ import unittest from unittest import mock from test import support from test.support import ( - is_apple, os_helper, refleak_helper, socket_helper, threading_helper + cpython_only, is_apple, os_helper, refleak_helper, socket_helper, threading_helper ) +from test.support.import_helper import ensure_lazy_imports import _thread as thread import array import contextlib @@ -257,6 +258,12 @@ HAVE_SOCKET_HYPERV = _have_socket_hyperv() # Size in bytes of the int type SIZEOF_INT = array.array("i").itemsize +class TestLazyImport(unittest.TestCase): + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("socket", {"array", "selectors"}) + + class SocketTCPTest(unittest.TestCase): def setUp(self): diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py index dcd90d11d46..ad0dcb3cccb 100644 --- a/Lib/test/test_sqlite3/test_cli.py +++ b/Lib/test/test_sqlite3/test_cli.py @@ -4,7 +4,12 @@ import unittest from sqlite3.__main__ import main as cli from test.support.os_helper import TESTFN, unlink -from test.support import captured_stdout, captured_stderr, captured_stdin +from test.support import ( + captured_stdout, + captured_stderr, + captured_stdin, + force_not_colorized, +) class CommandLineInterface(unittest.TestCase): @@ -32,6 +37,7 @@ class CommandLineInterface(unittest.TestCase): self.assertEqual(out, "") return err + @force_not_colorized def test_cli_help(self): out = self.expect_success("-h") self.assertIn("usage: ", out) diff --git a/Lib/test/test_string/test_string.py b/Lib/test/test_string/test_string.py index f6d112d8a93..5394fe4e12c 100644 --- a/Lib/test/test_string/test_string.py +++ b/Lib/test/test_string/test_string.py @@ -2,6 +2,14 @@ import unittest import string from string import Template import types +from test.support import cpython_only +from test.support.import_helper import ensure_lazy_imports + + +class LazyImportTest(unittest.TestCase): + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("base64", {"re", "collections"}) class ModuleTest(unittest.TestCase): diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 3cb755cd56c..ca35804fb36 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -162,6 +162,20 @@ class ProcessTestCase(BaseTestCase): [sys.executable, "-c", "while True: pass"], timeout=0.1) + def test_timeout_exception(self): + try: + subprocess.run([sys.executable, '-c', 'import time;time.sleep(9)'], timeout = -1) + except subprocess.TimeoutExpired as e: + self.assertIn("-1 seconds", str(e)) + else: + self.fail("Expected TimeoutExpired exception not raised") + try: + subprocess.run([sys.executable, '-c', 'import time;time.sleep(9)'], timeout = 0) + except subprocess.TimeoutExpired as e: + self.assertIn("0 seconds", str(e)) + else: + self.fail("Expected TimeoutExpired exception not raised") + def test_check_call_zero(self): # check_call() function with zero return code rc = subprocess.check_call(ZERO_RETURN_CMD) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 10c3e0e9a1d..bcc7b4de048 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -1776,7 +1776,7 @@ class SizeofTest(unittest.TestCase): check((1,2,3), vsize('') + self.P + 3*self.P) # type # static type: PyTypeObject - fmt = 'P2nPI13Pl4Pn9Pn12PIPc' + fmt = 'P2nPI13Pl4Pn9Pn12PI3Pc' s = vsize(fmt) check(int, s) typeid = 'n' if support.Py_GIL_DISABLED else '' @@ -1960,7 +1960,7 @@ def _supports_remote_attaching(): PROCESS_VM_READV_SUPPORTED = False try: - from _remotedebuggingmodule import PROCESS_VM_READV_SUPPORTED + from _remote_debugging import PROCESS_VM_READV_SUPPORTED except ImportError: pass @@ -2101,7 +2101,7 @@ print("Remote script executed successfully!") prologue = '''\ import sys def audit_hook(event, arg): - print(f"Audit event: {event}, arg: {arg}") + print(f"Audit event: {event}, arg: {arg}".encode("ascii", errors="replace")) sys.addaudithook(audit_hook) ''' script = ''' @@ -2196,6 +2196,64 @@ this is invalid python code self.assertIn(b"Remote debugging is not enabled", err) self.assertEqual(out, b"") +class TestSysJIT(unittest.TestCase): + + def test_jit_is_available(self): + available = sys._jit.is_available() + script = f"import sys; assert sys._jit.is_available() is {available}" + assert_python_ok("-c", script, PYTHON_JIT="0") + assert_python_ok("-c", script, PYTHON_JIT="1") + + def test_jit_is_enabled(self): + available = sys._jit.is_available() + script = "import sys; assert sys._jit.is_enabled() is {enabled}" + assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0") + assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1") + + def test_jit_is_active(self): + available = sys._jit.is_available() + script = textwrap.dedent( + """ + import _testcapi + import _testinternalcapi + import sys + + def frame_0_interpreter() -> None: + assert sys._jit.is_active() is False + + def frame_1_interpreter() -> None: + assert sys._jit.is_active() is False + frame_0_interpreter() + assert sys._jit.is_active() is False + + def frame_2_jit(expected: bool) -> None: + # Inlined into the last loop of frame_3_jit: + assert sys._jit.is_active() is expected + # Insert C frame: + _testcapi.pyobject_vectorcall(frame_1_interpreter, None, None) + assert sys._jit.is_active() is expected + + def frame_3_jit() -> None: + # JITs just before the last loop: + for i in range(_testinternalcapi.TIER2_THRESHOLD + 1): + # Careful, doing this in the reverse order breaks tracing: + expected = {enabled} and i == _testinternalcapi.TIER2_THRESHOLD + assert sys._jit.is_active() is expected + frame_2_jit(expected) + assert sys._jit.is_active() is expected + + def frame_4_interpreter() -> None: + assert sys._jit.is_active() is False + frame_3_jit() + assert sys._jit.is_active() is False + + assert sys._jit.is_active() is False + frame_4_interpreter() + assert sys._jit.is_active() is False + """ + ) + assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0") + assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1") if __name__ == "__main__": diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 814c00ca0fd..4ab38c2598b 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -5,7 +5,7 @@ Tests for the threading module. import test.support from test.support import threading_helper, requires_subprocess, requires_gil_enabled from test.support import verbose, cpython_only, os_helper -from test.support.import_helper import import_module +from test.support.import_helper import ensure_lazy_imports, import_module from test.support.script_helper import assert_python_ok, assert_python_failure from test.support import force_not_colorized @@ -121,6 +121,10 @@ class ThreadTests(BaseTestCase): maxDiff = 9999 @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("threading", {"functools", "warnings"}) + + @cpython_only def test_name(self): def func(): pass diff --git a/Lib/test/test_traceback.py b/Lib/test/test_traceback.py index 683486e9aca..b9be87f357f 100644 --- a/Lib/test/test_traceback.py +++ b/Lib/test/test_traceback.py @@ -37,6 +37,12 @@ test_code.co_positions = lambda _: iter([(6, 6, 0, 0)]) test_frame = namedtuple('frame', ['f_code', 'f_globals', 'f_locals']) test_tb = namedtuple('tb', ['tb_frame', 'tb_lineno', 'tb_next', 'tb_lasti']) +color_overrides = {"reset": "z", "filename": "fn", "error_highlight": "E"} +colors = { + color_overrides.get(k, k[0].lower()): v + for k, v in _colorize.default_theme.traceback.items() +} + LEVENSHTEIN_DATA_FILE = Path(__file__).parent / 'levenshtein_examples.json' @@ -4721,6 +4727,8 @@ class MiscTest(unittest.TestCase): class TestColorizedTraceback(unittest.TestCase): + maxDiff = None + def test_colorized_traceback(self): def foo(*args): x = {'a':{'b': None}} @@ -4743,9 +4751,9 @@ class TestColorizedTraceback(unittest.TestCase): e, capture_locals=True ) lines = "".join(exc.format(colorize=True)) - red = _colorize.ANSIColors.RED - boldr = _colorize.ANSIColors.BOLD_RED - reset = _colorize.ANSIColors.RESET + red = colors["e"] + boldr = colors["E"] + reset = colors["z"] self.assertIn("y = " + red + "x['a']['b']" + reset + boldr + "['c']" + reset, lines) self.assertIn("return " + red + "(lambda *args: foo(*args))" + reset + boldr + "(1,2,3,4)" + reset, lines) self.assertIn("return (lambda *args: " + red + "foo" + reset + boldr + "(*args)" + reset + ")(1,2,3,4)", lines) @@ -4761,18 +4769,16 @@ class TestColorizedTraceback(unittest.TestCase): e, capture_locals=True ) actual = "".join(exc.format(colorize=True)) - red = _colorize.ANSIColors.RED - magenta = _colorize.ANSIColors.MAGENTA - boldm = _colorize.ANSIColors.BOLD_MAGENTA - boldr = _colorize.ANSIColors.BOLD_RED - reset = _colorize.ANSIColors.RESET - expected = "".join([ - f' File {magenta}"<string>"{reset}, line {magenta}1{reset}\n', - f' a {boldr}${reset} b\n', - f' {boldr}^{reset}\n', - f'{boldm}SyntaxError{reset}: {magenta}invalid syntax{reset}\n'] - ) - self.assertIn(expected, actual) + def expected(t, m, fn, l, f, E, e, z): + return "".join( + [ + f' File {fn}"<string>"{z}, line {l}1{z}\n', + f' a {E}${z} b\n', + f' {E}^{z}\n', + f'{t}SyntaxError{z}: {m}invalid syntax{z}\n' + ] + ) + self.assertIn(expected(**colors), actual) def test_colorized_traceback_is_the_default(self): def foo(): @@ -4788,23 +4794,21 @@ class TestColorizedTraceback(unittest.TestCase): exception_print(e) actual = tbstderr.getvalue().splitlines() - red = _colorize.ANSIColors.RED - boldr = _colorize.ANSIColors.BOLD_RED - magenta = _colorize.ANSIColors.MAGENTA - boldm = _colorize.ANSIColors.BOLD_MAGENTA - reset = _colorize.ANSIColors.RESET lno_foo = foo.__code__.co_firstlineno - expected = ['Traceback (most recent call last):', - f' File {magenta}"{__file__}"{reset}, ' - f'line {magenta}{lno_foo+5}{reset}, in {magenta}test_colorized_traceback_is_the_default{reset}', - f' {red}foo{reset+boldr}(){reset}', - f' {red}~~~{reset+boldr}^^{reset}', - f' File {magenta}"{__file__}"{reset}, ' - f'line {magenta}{lno_foo+1}{reset}, in {magenta}foo{reset}', - f' {red}1{reset+boldr}/{reset+red}0{reset}', - f' {red}~{reset+boldr}^{reset+red}~{reset}', - f'{boldm}ZeroDivisionError{reset}: {magenta}division by zero{reset}'] - self.assertEqual(actual, expected) + def expected(t, m, fn, l, f, E, e, z): + return [ + 'Traceback (most recent call last):', + f' File {fn}"{__file__}"{z}, ' + f'line {l}{lno_foo+5}{z}, in {f}test_colorized_traceback_is_the_default{z}', + f' {e}foo{z}{E}(){z}', + f' {e}~~~{z}{E}^^{z}', + f' File {fn}"{__file__}"{z}, ' + f'line {l}{lno_foo+1}{z}, in {f}foo{z}', + f' {e}1{z}{E}/{z}{e}0{z}', + f' {e}~{z}{E}^{z}{e}~{z}', + f'{t}ZeroDivisionError{z}: {m}division by zero{z}', + ] + self.assertEqual(actual, expected(**colors)) def test_colorized_traceback_from_exception_group(self): def foo(): @@ -4822,33 +4826,31 @@ class TestColorizedTraceback(unittest.TestCase): e, capture_locals=True ) - red = _colorize.ANSIColors.RED - boldr = _colorize.ANSIColors.BOLD_RED - magenta = _colorize.ANSIColors.MAGENTA - boldm = _colorize.ANSIColors.BOLD_MAGENTA - reset = _colorize.ANSIColors.RESET lno_foo = foo.__code__.co_firstlineno actual = "".join(exc.format(colorize=True)).splitlines() - expected = [f" + Exception Group Traceback (most recent call last):", - f' | File {magenta}"{__file__}"{reset}, line {magenta}{lno_foo+9}{reset}, in {magenta}test_colorized_traceback_from_exception_group{reset}', - f' | {red}foo{reset}{boldr}(){reset}', - f' | {red}~~~{reset}{boldr}^^{reset}', - f" | e = ExceptionGroup('test', [ZeroDivisionError('division by zero')])", - f" | foo = {foo}", - f' | self = <{__name__}.TestColorizedTraceback testMethod=test_colorized_traceback_from_exception_group>', - f' | File {magenta}"{__file__}"{reset}, line {magenta}{lno_foo+6}{reset}, in {magenta}foo{reset}', - f' | raise ExceptionGroup("test", exceptions)', - f" | exceptions = [ZeroDivisionError('division by zero')]", - f' | {boldm}ExceptionGroup{reset}: {magenta}test (1 sub-exception){reset}', - f' +-+---------------- 1 ----------------', - f' | Traceback (most recent call last):', - f' | File {magenta}"{__file__}"{reset}, line {magenta}{lno_foo+3}{reset}, in {magenta}foo{reset}', - f' | {red}1 {reset}{boldr}/{reset}{red} 0{reset}', - f' | {red}~~{reset}{boldr}^{reset}{red}~~{reset}', - f" | exceptions = [ZeroDivisionError('division by zero')]", - f' | {boldm}ZeroDivisionError{reset}: {magenta}division by zero{reset}', - f' +------------------------------------'] - self.assertEqual(actual, expected) + def expected(t, m, fn, l, f, E, e, z): + return [ + f" + Exception Group Traceback (most recent call last):", + f' | File {fn}"{__file__}"{z}, line {l}{lno_foo+9}{z}, in {f}test_colorized_traceback_from_exception_group{z}', + f' | {e}foo{z}{E}(){z}', + f' | {e}~~~{z}{E}^^{z}', + f" | e = ExceptionGroup('test', [ZeroDivisionError('division by zero')])", + f" | foo = {foo}", + f' | self = <{__name__}.TestColorizedTraceback testMethod=test_colorized_traceback_from_exception_group>', + f' | File {fn}"{__file__}"{z}, line {l}{lno_foo+6}{z}, in {f}foo{z}', + f' | raise ExceptionGroup("test", exceptions)', + f" | exceptions = [ZeroDivisionError('division by zero')]", + f' | {t}ExceptionGroup{z}: {m}test (1 sub-exception){z}', + f' +-+---------------- 1 ----------------', + f' | Traceback (most recent call last):', + f' | File {fn}"{__file__}"{z}, line {l}{lno_foo+3}{z}, in {f}foo{z}', + f' | {e}1 {z}{E}/{z}{e} 0{z}', + f' | {e}~~{z}{E}^{z}{e}~~{z}', + f" | exceptions = [ZeroDivisionError('division by zero')]", + f' | {t}ZeroDivisionError{z}: {m}division by zero{z}', + f' +------------------------------------', + ] + self.assertEqual(actual, expected(**colors)) if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index 90de828cc71..c965860fbb1 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -1551,7 +1551,8 @@ class Pathname_Tests(unittest.TestCase): urllib.request.url2pathname(url, require_scheme=True), expected_path) - error_subtests = [ + def test_url2pathname_require_scheme_errors(self): + subtests = [ '', ':', 'foo', @@ -1561,13 +1562,20 @@ class Pathname_Tests(unittest.TestCase): 'data:file:foo', 'data:file://foo', ] - for url in error_subtests: + for url in subtests: with self.subTest(url=url): self.assertRaises( urllib.error.URLError, urllib.request.url2pathname, url, require_scheme=True) + def test_url2pathname_resolve_host(self): + fn = urllib.request.url2pathname + sep = os.path.sep + self.assertEqual(fn('//127.0.0.1/foo/bar', resolve_host=True), f'{sep}foo{sep}bar') + self.assertEqual(fn(f'//{socket.gethostname()}/foo/bar'), f'{sep}foo{sep}bar') + self.assertEqual(fn(f'//{socket.gethostname()}/foo/bar', resolve_host=True), f'{sep}foo{sep}bar') + @unittest.skipUnless(sys.platform == 'win32', 'test specific to Windows pathnames.') def test_url2pathname_win(self): @@ -1598,6 +1606,7 @@ class Pathname_Tests(unittest.TestCase): self.assertEqual(fn('//server/path/to/file'), '\\\\server\\path\\to\\file') self.assertEqual(fn('////server/path/to/file'), '\\\\server\\path\\to\\file') self.assertEqual(fn('/////server/path/to/file'), '\\\\server\\path\\to\\file') + self.assertEqual(fn('//127.0.0.1/path/to/file'), '\\\\127.0.0.1\\path\\to\\file') # Localhost paths self.assertEqual(fn('//localhost/C:/path/to/file'), 'C:\\path\\to\\file') self.assertEqual(fn('//localhost/C|/path/to/file'), 'C:\\path\\to\\file') @@ -1622,8 +1631,7 @@ class Pathname_Tests(unittest.TestCase): self.assertRaises(urllib.error.URLError, fn, '//:80/foo/bar') self.assertRaises(urllib.error.URLError, fn, '//:/foo/bar') self.assertRaises(urllib.error.URLError, fn, '//c:80/foo/bar') - self.assertEqual(fn('//127.0.0.1/foo/bar'), '/foo/bar') - self.assertEqual(fn(f'//{socket.gethostname()}/foo/bar'), '/foo/bar') + self.assertRaises(urllib.error.URLError, fn, '//127.0.0.1/foo/bar') @unittest.skipUnless(os_helper.FS_NONASCII, 'need os_helper.FS_NONASCII') def test_url2pathname_nonascii(self): diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py index 7c8a82d821a..4c9d9f4b562 100644 --- a/Lib/test/test_zipfile/test_core.py +++ b/Lib/test/test_zipfile/test_core.py @@ -24,10 +24,12 @@ from test.support import script_helper, os_helper from test.support import ( findfile, requires_zlib, requires_bz2, requires_lzma, captured_stdout, captured_stderr, requires_subprocess, + cpython_only ) from test.support.os_helper import ( TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count, FakePath ) +from test.support.import_helper import ensure_lazy_imports TESTFN2 = TESTFN + "2" @@ -49,6 +51,13 @@ def get_files(test): yield f test.assertFalse(f.closed) + +class LazyImportTest(unittest.TestCase): + @cpython_only + def test_lazy_import(self): + ensure_lazy_imports("zipfile", {"typing"}) + + class AbstractTestsWithSourceFile: @classmethod def setUpClass(cls): |