diff options
Diffstat (limited to 'Lib/test/test_sys.py')
-rw-r--r-- | Lib/test/test_sys.py | 192 |
1 files changed, 127 insertions, 65 deletions
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 56413d00823..486bf10a0b5 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -24,7 +24,7 @@ from test.support import import_helper from test.support import force_not_colorized from test.support import SHORT_TIMEOUT try: - from test.support import interpreters + from concurrent import interpreters except ImportError: interpreters = None import textwrap @@ -57,7 +57,7 @@ class DisplayHookTest(unittest.TestCase): dh(None) self.assertEqual(out.getvalue(), "") - self.assertTrue(not hasattr(builtins, "_")) + self.assertNotHasAttr(builtins, "_") # sys.displayhook() requires arguments self.assertRaises(TypeError, dh) @@ -172,7 +172,7 @@ class ExceptHookTest(unittest.TestCase): with support.captured_stderr() as err: sys.__excepthook__(*sys.exc_info()) - self.assertTrue(err.getvalue().endswith("ValueError: 42\n")) + self.assertEndsWith(err.getvalue(), "ValueError: 42\n") self.assertRaises(TypeError, sys.__excepthook__) @@ -192,7 +192,7 @@ class ExceptHookTest(unittest.TestCase): err = err.getvalue() self.assertIn(""" File "b'bytes_filename'", line 123\n""", err) self.assertIn(""" text\n""", err) - self.assertTrue(err.endswith("SyntaxError: msg\n")) + self.assertEndsWith(err, "SyntaxError: msg\n") def test_excepthook(self): with test.support.captured_output("stderr") as stderr: @@ -269,8 +269,7 @@ class SysModuleTest(unittest.TestCase): rc, out, err = assert_python_failure('-c', code, **env_vars) self.assertEqual(rc, 1) self.assertEqual(out, b'') - self.assertTrue(err.startswith(expected), - "%s doesn't start with %s" % (ascii(err), ascii(expected))) + self.assertStartsWith(err, expected) # test that stderr buffer is flushed before the exit message is written # into stderr @@ -437,7 +436,7 @@ class SysModuleTest(unittest.TestCase): @unittest.skipUnless(hasattr(sys, "setdlopenflags"), 'test needs sys.setdlopenflags()') def test_dlopenflags(self): - self.assertTrue(hasattr(sys, "getdlopenflags")) + self.assertHasAttr(sys, "getdlopenflags") self.assertRaises(TypeError, sys.getdlopenflags, 42) oldflags = sys.getdlopenflags() self.assertRaises(TypeError, sys.setdlopenflags) @@ -623,8 +622,7 @@ class SysModuleTest(unittest.TestCase): # And the next record must be for g456(). filename, lineno, funcname, sourceline = stack[i+1] self.assertEqual(funcname, "g456") - self.assertTrue((sourceline.startswith("if leave_g.wait(") or - sourceline.startswith("g_raised.set()"))) + self.assertStartsWith(sourceline, ("if leave_g.wait(", "g_raised.set()")) finally: # Reap the spawned thread. leave_g.set() @@ -731,7 +729,7 @@ class SysModuleTest(unittest.TestCase): info = sys.thread_info self.assertEqual(len(info), 3) self.assertIn(info.name, ('nt', 'pthread', 'pthread-stubs', 'solaris', None)) - self.assertIn(info.lock, ('semaphore', 'mutex+cond', None)) + self.assertIn(info.lock, ('pymutex', None)) if sys.platform.startswith(("linux", "android", "freebsd")): self.assertEqual(info.name, "pthread") elif sys.platform == "win32": @@ -860,7 +858,7 @@ class SysModuleTest(unittest.TestCase): "hash_randomization", "isolated", "dev_mode", "utf8_mode", "warn_default_encoding", "safe_path", "int_max_str_digits") for attr in attrs: - self.assertTrue(hasattr(sys.flags, attr), attr) + self.assertHasAttr(sys.flags, attr) attr_type = bool if attr in ("dev_mode", "safe_path") else int self.assertEqual(type(getattr(sys.flags, attr)), attr_type, attr) self.assertTrue(repr(sys.flags)) @@ -871,12 +869,7 @@ class SysModuleTest(unittest.TestCase): def assert_raise_on_new_sys_type(self, sys_attr): # Users are intentionally prevented from creating new instances of # sys.flags, sys.version_info, and sys.getwindowsversion. - arg = sys_attr - attr_type = type(sys_attr) - with self.assertRaises(TypeError): - attr_type(arg) - with self.assertRaises(TypeError): - attr_type.__new__(attr_type, arg) + support.check_disallow_instantiation(self, type(sys_attr), sys_attr) def test_sys_flags_no_instantiation(self): self.assert_raise_on_new_sys_type(sys.flags) @@ -1072,10 +1065,11 @@ class SysModuleTest(unittest.TestCase): levels = {'alpha': 0xA, 'beta': 0xB, 'candidate': 0xC, 'final': 0xF} - self.assertTrue(hasattr(sys.implementation, 'name')) - self.assertTrue(hasattr(sys.implementation, 'version')) - self.assertTrue(hasattr(sys.implementation, 'hexversion')) - self.assertTrue(hasattr(sys.implementation, 'cache_tag')) + self.assertHasAttr(sys.implementation, 'name') + self.assertHasAttr(sys.implementation, 'version') + self.assertHasAttr(sys.implementation, 'hexversion') + self.assertHasAttr(sys.implementation, 'cache_tag') + self.assertHasAttr(sys.implementation, 'supports_isolated_interpreters') version = sys.implementation.version self.assertEqual(version[:2], (version.major, version.minor)) @@ -1089,6 +1083,15 @@ class SysModuleTest(unittest.TestCase): self.assertEqual(sys.implementation.name, sys.implementation.name.lower()) + # https://peps.python.org/pep-0734 + sii = sys.implementation.supports_isolated_interpreters + self.assertIsInstance(sii, bool) + if test.support.check_impl_detail(cpython=True): + if test.support.is_emscripten or test.support.is_wasi: + self.assertFalse(sii) + else: + self.assertTrue(sii) + @test.support.cpython_only def test_debugmallocstats(self): # Test sys._debugmallocstats() @@ -1137,23 +1140,12 @@ class SysModuleTest(unittest.TestCase): b = sys.getallocatedblocks() self.assertLessEqual(b, a) try: - # While we could imagine a Python session where the number of - # multiple buffer objects would exceed the sharing of references, - # it is unlikely to happen in a normal test run. - # - # In free-threaded builds each code object owns an array of - # pointers to copies of the bytecode. When the number of - # code objects is a large fraction of the total number of - # references, this can cause the total number of allocated - # blocks to exceed the total number of references. - # - # For some reason, iOS seems to trigger the "unlikely to happen" - # case reliably under CI conditions. It's not clear why; but as - # this test is checking the behavior of getallocatedblock() - # under garbage collection, we can skip this pre-condition check - # for now. See GH-130384. - if not support.Py_GIL_DISABLED and not support.is_apple_mobile: - self.assertLess(a, sys.gettotalrefcount()) + # The reported blocks will include immortalized strings, but the + # total ref count will not. This will sanity check that among all + # other objects (those eligible for garbage collection) there + # are more references being tracked than allocated blocks. + interned_immortal = sys.getunicodeinternedsize(_only_immortal=True) + self.assertLess(a - interned_immortal, sys.gettotalrefcount()) except AttributeError: # gettotalrefcount() not available pass @@ -1301,6 +1293,7 @@ class SysModuleTest(unittest.TestCase): for name in sys.stdlib_module_names: self.assertIsInstance(name, str) + @unittest.skipUnless(hasattr(sys, '_stdlib_dir'), 'need sys._stdlib_dir') def test_stdlib_dir(self): os = import_helper.import_fresh_module('os') marker = getattr(os, '__file__', None) @@ -1419,7 +1412,7 @@ class UnraisableHookTest(unittest.TestCase): else: self.assertIn("ValueError", report) self.assertIn("del is broken", report) - self.assertTrue(report.endswith("\n")) + self.assertEndsWith(report, "\n") def test_original_unraisablehook_exception_qualname(self): # See bpo-41031, bpo-45083. @@ -1955,33 +1948,19 @@ class SizeofTest(unittest.TestCase): self.assertEqual(out, b"") self.assertEqual(err, b"") - -def _supports_remote_attaching(): - PROCESS_VM_READV_SUPPORTED = False - - try: - from _testexternalinspection import PROCESS_VM_READV_SUPPORTED - except ImportError: - pass - - return PROCESS_VM_READV_SUPPORTED - -@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") -@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", - "Test only runs on Linux, Windows and MacOS") -@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), - "Test only runs on Linux with process_vm_readv support") +@test.support.support_remote_exec_only @test.support.cpython_only class TestRemoteExec(unittest.TestCase): def tearDown(self): test.support.reap_children() - def _run_remote_exec_test(self, script_code, python_args=None, env=None, prologue=''): + def _run_remote_exec_test(self, script_code, python_args=None, env=None, + prologue='', + script_path=os_helper.TESTFN + '_remote.py'): # Create the script that will be remotely executed - script = os_helper.TESTFN + '_remote.py' - self.addCleanup(os_helper.unlink, script) + self.addCleanup(os_helper.unlink, script_path) - with open(script, 'w') as f: + with open(script_path, 'w') as f: f.write(script_code) # Create and run the target process @@ -2050,7 +2029,7 @@ sock.close() self.assertEqual(response, b"ready") # Try remote exec on the target process - sys.remote_exec(proc.pid, script) + sys.remote_exec(proc.pid, script_path) # Signal script to continue client_socket.sendall(b"continue") @@ -2073,14 +2052,32 @@ sock.close() def test_remote_exec(self): """Test basic remote exec functionality""" - script = ''' -print("Remote script executed successfully!") -''' + script = 'print("Remote script executed successfully!")' returncode, stdout, stderr = self._run_remote_exec_test(script) # self.assertEqual(returncode, 0) self.assertIn(b"Remote script executed successfully!", stdout) self.assertEqual(stderr, b"") + def test_remote_exec_bytes(self): + script = 'print("Remote script executed successfully!")' + script_path = os.fsencode(os_helper.TESTFN) + b'_bytes_remote.py' + returncode, stdout, stderr = self._run_remote_exec_test(script, + script_path=script_path) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertEqual(stderr, b"") + + @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 'requires undecodable path') + @unittest.skipIf(sys.platform == 'darwin', + 'undecodable paths are not supported on macOS') + def test_remote_exec_undecodable(self): + script = 'print("Remote script executed successfully!")' + script_path = os_helper.TESTFN_UNDECODABLE + b'_undecodable_remote.py' + for script_path in [script_path, os.fsdecode(script_path)]: + returncode, stdout, stderr = self._run_remote_exec_test(script, + script_path=script_path) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertEqual(stderr, b"") + def test_remote_exec_with_self_process(self): """Test remote exec with the target process being the same as the test process""" @@ -2101,7 +2098,7 @@ print("Remote script executed successfully!") prologue = '''\ import sys def audit_hook(event, arg): - print(f"Audit event: {event}, arg: {arg}") + print(f"Audit event: {event}, arg: {arg}".encode("ascii", errors="replace")) sys.addaudithook(audit_hook) ''' script = ''' @@ -2110,7 +2107,7 @@ print("Remote script executed successfully!") returncode, stdout, stderr = self._run_remote_exec_test(script, prologue=prologue) self.assertEqual(returncode, 0) self.assertIn(b"Remote script executed successfully!", stdout) - self.assertIn(b"Audit event: remote_debugger_script, arg: ", stdout) + self.assertIn(b"Audit event: cpython.remote_debugger_script, arg: ", stdout) self.assertEqual(stderr, b"") def test_remote_exec_with_exception(self): @@ -2157,6 +2154,13 @@ raise Exception("Remote script exception") with self.assertRaises(OSError): sys.remote_exec(99999, "print('should not run')") + def test_remote_exec_invalid_script(self): + """Test remote exec with invalid script type""" + with self.assertRaises(TypeError): + sys.remote_exec(0, None) + with self.assertRaises(TypeError): + sys.remote_exec(0, 123) + def test_remote_exec_syntax_error(self): """Test remote exec with syntax error in script""" script = ''' @@ -2196,6 +2200,64 @@ this is invalid python code self.assertIn(b"Remote debugging is not enabled", err) self.assertEqual(out, b"") +class TestSysJIT(unittest.TestCase): + + def test_jit_is_available(self): + available = sys._jit.is_available() + script = f"import sys; assert sys._jit.is_available() is {available}" + assert_python_ok("-c", script, PYTHON_JIT="0") + assert_python_ok("-c", script, PYTHON_JIT="1") + + def test_jit_is_enabled(self): + available = sys._jit.is_available() + script = "import sys; assert sys._jit.is_enabled() is {enabled}" + assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0") + assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1") + + def test_jit_is_active(self): + available = sys._jit.is_available() + script = textwrap.dedent( + """ + import _testcapi + import _testinternalcapi + import sys + + def frame_0_interpreter() -> None: + assert sys._jit.is_active() is False + + def frame_1_interpreter() -> None: + assert sys._jit.is_active() is False + frame_0_interpreter() + assert sys._jit.is_active() is False + + def frame_2_jit(expected: bool) -> None: + # Inlined into the last loop of frame_3_jit: + assert sys._jit.is_active() is expected + # Insert C frame: + _testcapi.pyobject_vectorcall(frame_1_interpreter, None, None) + assert sys._jit.is_active() is expected + + def frame_3_jit() -> None: + # JITs just before the last loop: + for i in range(_testinternalcapi.TIER2_THRESHOLD + 1): + # Careful, doing this in the reverse order breaks tracing: + expected = {enabled} and i == _testinternalcapi.TIER2_THRESHOLD + assert sys._jit.is_active() is expected + frame_2_jit(expected) + assert sys._jit.is_active() is expected + + def frame_4_interpreter() -> None: + assert sys._jit.is_active() is False + frame_3_jit() + assert sys._jit.is_active() is False + + assert sys._jit.is_active() is False + frame_4_interpreter() + assert sys._jit.is_active() is False + """ + ) + assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0") + assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1") if __name__ == "__main__": |