aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_sys.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_sys.py')
-rw-r--r--Lib/test/test_sys.py192
1 files changed, 127 insertions, 65 deletions
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index 56413d00823..486bf10a0b5 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -24,7 +24,7 @@ from test.support import import_helper
from test.support import force_not_colorized
from test.support import SHORT_TIMEOUT
try:
- from test.support import interpreters
+ from concurrent import interpreters
except ImportError:
interpreters = None
import textwrap
@@ -57,7 +57,7 @@ class DisplayHookTest(unittest.TestCase):
dh(None)
self.assertEqual(out.getvalue(), "")
- self.assertTrue(not hasattr(builtins, "_"))
+ self.assertNotHasAttr(builtins, "_")
# sys.displayhook() requires arguments
self.assertRaises(TypeError, dh)
@@ -172,7 +172,7 @@ class ExceptHookTest(unittest.TestCase):
with support.captured_stderr() as err:
sys.__excepthook__(*sys.exc_info())
- self.assertTrue(err.getvalue().endswith("ValueError: 42\n"))
+ self.assertEndsWith(err.getvalue(), "ValueError: 42\n")
self.assertRaises(TypeError, sys.__excepthook__)
@@ -192,7 +192,7 @@ class ExceptHookTest(unittest.TestCase):
err = err.getvalue()
self.assertIn(""" File "b'bytes_filename'", line 123\n""", err)
self.assertIn(""" text\n""", err)
- self.assertTrue(err.endswith("SyntaxError: msg\n"))
+ self.assertEndsWith(err, "SyntaxError: msg\n")
def test_excepthook(self):
with test.support.captured_output("stderr") as stderr:
@@ -269,8 +269,7 @@ class SysModuleTest(unittest.TestCase):
rc, out, err = assert_python_failure('-c', code, **env_vars)
self.assertEqual(rc, 1)
self.assertEqual(out, b'')
- self.assertTrue(err.startswith(expected),
- "%s doesn't start with %s" % (ascii(err), ascii(expected)))
+ self.assertStartsWith(err, expected)
# test that stderr buffer is flushed before the exit message is written
# into stderr
@@ -437,7 +436,7 @@ class SysModuleTest(unittest.TestCase):
@unittest.skipUnless(hasattr(sys, "setdlopenflags"),
'test needs sys.setdlopenflags()')
def test_dlopenflags(self):
- self.assertTrue(hasattr(sys, "getdlopenflags"))
+ self.assertHasAttr(sys, "getdlopenflags")
self.assertRaises(TypeError, sys.getdlopenflags, 42)
oldflags = sys.getdlopenflags()
self.assertRaises(TypeError, sys.setdlopenflags)
@@ -623,8 +622,7 @@ class SysModuleTest(unittest.TestCase):
# And the next record must be for g456().
filename, lineno, funcname, sourceline = stack[i+1]
self.assertEqual(funcname, "g456")
- self.assertTrue((sourceline.startswith("if leave_g.wait(") or
- sourceline.startswith("g_raised.set()")))
+ self.assertStartsWith(sourceline, ("if leave_g.wait(", "g_raised.set()"))
finally:
# Reap the spawned thread.
leave_g.set()
@@ -731,7 +729,7 @@ class SysModuleTest(unittest.TestCase):
info = sys.thread_info
self.assertEqual(len(info), 3)
self.assertIn(info.name, ('nt', 'pthread', 'pthread-stubs', 'solaris', None))
- self.assertIn(info.lock, ('semaphore', 'mutex+cond', None))
+ self.assertIn(info.lock, ('pymutex', None))
if sys.platform.startswith(("linux", "android", "freebsd")):
self.assertEqual(info.name, "pthread")
elif sys.platform == "win32":
@@ -860,7 +858,7 @@ class SysModuleTest(unittest.TestCase):
"hash_randomization", "isolated", "dev_mode", "utf8_mode",
"warn_default_encoding", "safe_path", "int_max_str_digits")
for attr in attrs:
- self.assertTrue(hasattr(sys.flags, attr), attr)
+ self.assertHasAttr(sys.flags, attr)
attr_type = bool if attr in ("dev_mode", "safe_path") else int
self.assertEqual(type(getattr(sys.flags, attr)), attr_type, attr)
self.assertTrue(repr(sys.flags))
@@ -871,12 +869,7 @@ class SysModuleTest(unittest.TestCase):
def assert_raise_on_new_sys_type(self, sys_attr):
# Users are intentionally prevented from creating new instances of
# sys.flags, sys.version_info, and sys.getwindowsversion.
- arg = sys_attr
- attr_type = type(sys_attr)
- with self.assertRaises(TypeError):
- attr_type(arg)
- with self.assertRaises(TypeError):
- attr_type.__new__(attr_type, arg)
+ support.check_disallow_instantiation(self, type(sys_attr), sys_attr)
def test_sys_flags_no_instantiation(self):
self.assert_raise_on_new_sys_type(sys.flags)
@@ -1072,10 +1065,11 @@ class SysModuleTest(unittest.TestCase):
levels = {'alpha': 0xA, 'beta': 0xB, 'candidate': 0xC, 'final': 0xF}
- self.assertTrue(hasattr(sys.implementation, 'name'))
- self.assertTrue(hasattr(sys.implementation, 'version'))
- self.assertTrue(hasattr(sys.implementation, 'hexversion'))
- self.assertTrue(hasattr(sys.implementation, 'cache_tag'))
+ self.assertHasAttr(sys.implementation, 'name')
+ self.assertHasAttr(sys.implementation, 'version')
+ self.assertHasAttr(sys.implementation, 'hexversion')
+ self.assertHasAttr(sys.implementation, 'cache_tag')
+ self.assertHasAttr(sys.implementation, 'supports_isolated_interpreters')
version = sys.implementation.version
self.assertEqual(version[:2], (version.major, version.minor))
@@ -1089,6 +1083,15 @@ class SysModuleTest(unittest.TestCase):
self.assertEqual(sys.implementation.name,
sys.implementation.name.lower())
+ # https://peps.python.org/pep-0734
+ sii = sys.implementation.supports_isolated_interpreters
+ self.assertIsInstance(sii, bool)
+ if test.support.check_impl_detail(cpython=True):
+ if test.support.is_emscripten or test.support.is_wasi:
+ self.assertFalse(sii)
+ else:
+ self.assertTrue(sii)
+
@test.support.cpython_only
def test_debugmallocstats(self):
# Test sys._debugmallocstats()
@@ -1137,23 +1140,12 @@ class SysModuleTest(unittest.TestCase):
b = sys.getallocatedblocks()
self.assertLessEqual(b, a)
try:
- # While we could imagine a Python session where the number of
- # multiple buffer objects would exceed the sharing of references,
- # it is unlikely to happen in a normal test run.
- #
- # In free-threaded builds each code object owns an array of
- # pointers to copies of the bytecode. When the number of
- # code objects is a large fraction of the total number of
- # references, this can cause the total number of allocated
- # blocks to exceed the total number of references.
- #
- # For some reason, iOS seems to trigger the "unlikely to happen"
- # case reliably under CI conditions. It's not clear why; but as
- # this test is checking the behavior of getallocatedblock()
- # under garbage collection, we can skip this pre-condition check
- # for now. See GH-130384.
- if not support.Py_GIL_DISABLED and not support.is_apple_mobile:
- self.assertLess(a, sys.gettotalrefcount())
+ # The reported blocks will include immortalized strings, but the
+ # total ref count will not. This will sanity check that among all
+ # other objects (those eligible for garbage collection) there
+ # are more references being tracked than allocated blocks.
+ interned_immortal = sys.getunicodeinternedsize(_only_immortal=True)
+ self.assertLess(a - interned_immortal, sys.gettotalrefcount())
except AttributeError:
# gettotalrefcount() not available
pass
@@ -1301,6 +1293,7 @@ class SysModuleTest(unittest.TestCase):
for name in sys.stdlib_module_names:
self.assertIsInstance(name, str)
+ @unittest.skipUnless(hasattr(sys, '_stdlib_dir'), 'need sys._stdlib_dir')
def test_stdlib_dir(self):
os = import_helper.import_fresh_module('os')
marker = getattr(os, '__file__', None)
@@ -1419,7 +1412,7 @@ class UnraisableHookTest(unittest.TestCase):
else:
self.assertIn("ValueError", report)
self.assertIn("del is broken", report)
- self.assertTrue(report.endswith("\n"))
+ self.assertEndsWith(report, "\n")
def test_original_unraisablehook_exception_qualname(self):
# See bpo-41031, bpo-45083.
@@ -1955,33 +1948,19 @@ class SizeofTest(unittest.TestCase):
self.assertEqual(out, b"")
self.assertEqual(err, b"")
-
-def _supports_remote_attaching():
- PROCESS_VM_READV_SUPPORTED = False
-
- try:
- from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
- except ImportError:
- pass
-
- return PROCESS_VM_READV_SUPPORTED
-
-@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled")
-@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32",
- "Test only runs on Linux, Windows and MacOS")
-@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(),
- "Test only runs on Linux with process_vm_readv support")
+@test.support.support_remote_exec_only
@test.support.cpython_only
class TestRemoteExec(unittest.TestCase):
def tearDown(self):
test.support.reap_children()
- def _run_remote_exec_test(self, script_code, python_args=None, env=None, prologue=''):
+ def _run_remote_exec_test(self, script_code, python_args=None, env=None,
+ prologue='',
+ script_path=os_helper.TESTFN + '_remote.py'):
# Create the script that will be remotely executed
- script = os_helper.TESTFN + '_remote.py'
- self.addCleanup(os_helper.unlink, script)
+ self.addCleanup(os_helper.unlink, script_path)
- with open(script, 'w') as f:
+ with open(script_path, 'w') as f:
f.write(script_code)
# Create and run the target process
@@ -2050,7 +2029,7 @@ sock.close()
self.assertEqual(response, b"ready")
# Try remote exec on the target process
- sys.remote_exec(proc.pid, script)
+ sys.remote_exec(proc.pid, script_path)
# Signal script to continue
client_socket.sendall(b"continue")
@@ -2073,14 +2052,32 @@ sock.close()
def test_remote_exec(self):
"""Test basic remote exec functionality"""
- script = '''
-print("Remote script executed successfully!")
-'''
+ script = 'print("Remote script executed successfully!")'
returncode, stdout, stderr = self._run_remote_exec_test(script)
# self.assertEqual(returncode, 0)
self.assertIn(b"Remote script executed successfully!", stdout)
self.assertEqual(stderr, b"")
+ def test_remote_exec_bytes(self):
+ script = 'print("Remote script executed successfully!")'
+ script_path = os.fsencode(os_helper.TESTFN) + b'_bytes_remote.py'
+ returncode, stdout, stderr = self._run_remote_exec_test(script,
+ script_path=script_path)
+ self.assertIn(b"Remote script executed successfully!", stdout)
+ self.assertEqual(stderr, b"")
+
+ @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 'requires undecodable path')
+ @unittest.skipIf(sys.platform == 'darwin',
+ 'undecodable paths are not supported on macOS')
+ def test_remote_exec_undecodable(self):
+ script = 'print("Remote script executed successfully!")'
+ script_path = os_helper.TESTFN_UNDECODABLE + b'_undecodable_remote.py'
+ for script_path in [script_path, os.fsdecode(script_path)]:
+ returncode, stdout, stderr = self._run_remote_exec_test(script,
+ script_path=script_path)
+ self.assertIn(b"Remote script executed successfully!", stdout)
+ self.assertEqual(stderr, b"")
+
def test_remote_exec_with_self_process(self):
"""Test remote exec with the target process being the same as the test process"""
@@ -2101,7 +2098,7 @@ print("Remote script executed successfully!")
prologue = '''\
import sys
def audit_hook(event, arg):
- print(f"Audit event: {event}, arg: {arg}")
+ print(f"Audit event: {event}, arg: {arg}".encode("ascii", errors="replace"))
sys.addaudithook(audit_hook)
'''
script = '''
@@ -2110,7 +2107,7 @@ print("Remote script executed successfully!")
returncode, stdout, stderr = self._run_remote_exec_test(script, prologue=prologue)
self.assertEqual(returncode, 0)
self.assertIn(b"Remote script executed successfully!", stdout)
- self.assertIn(b"Audit event: remote_debugger_script, arg: ", stdout)
+ self.assertIn(b"Audit event: cpython.remote_debugger_script, arg: ", stdout)
self.assertEqual(stderr, b"")
def test_remote_exec_with_exception(self):
@@ -2157,6 +2154,13 @@ raise Exception("Remote script exception")
with self.assertRaises(OSError):
sys.remote_exec(99999, "print('should not run')")
+ def test_remote_exec_invalid_script(self):
+ """Test remote exec with invalid script type"""
+ with self.assertRaises(TypeError):
+ sys.remote_exec(0, None)
+ with self.assertRaises(TypeError):
+ sys.remote_exec(0, 123)
+
def test_remote_exec_syntax_error(self):
"""Test remote exec with syntax error in script"""
script = '''
@@ -2196,6 +2200,64 @@ this is invalid python code
self.assertIn(b"Remote debugging is not enabled", err)
self.assertEqual(out, b"")
+class TestSysJIT(unittest.TestCase):
+
+ def test_jit_is_available(self):
+ available = sys._jit.is_available()
+ script = f"import sys; assert sys._jit.is_available() is {available}"
+ assert_python_ok("-c", script, PYTHON_JIT="0")
+ assert_python_ok("-c", script, PYTHON_JIT="1")
+
+ def test_jit_is_enabled(self):
+ available = sys._jit.is_available()
+ script = "import sys; assert sys._jit.is_enabled() is {enabled}"
+ assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0")
+ assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1")
+
+ def test_jit_is_active(self):
+ available = sys._jit.is_available()
+ script = textwrap.dedent(
+ """
+ import _testcapi
+ import _testinternalcapi
+ import sys
+
+ def frame_0_interpreter() -> None:
+ assert sys._jit.is_active() is False
+
+ def frame_1_interpreter() -> None:
+ assert sys._jit.is_active() is False
+ frame_0_interpreter()
+ assert sys._jit.is_active() is False
+
+ def frame_2_jit(expected: bool) -> None:
+ # Inlined into the last loop of frame_3_jit:
+ assert sys._jit.is_active() is expected
+ # Insert C frame:
+ _testcapi.pyobject_vectorcall(frame_1_interpreter, None, None)
+ assert sys._jit.is_active() is expected
+
+ def frame_3_jit() -> None:
+ # JITs just before the last loop:
+ for i in range(_testinternalcapi.TIER2_THRESHOLD + 1):
+ # Careful, doing this in the reverse order breaks tracing:
+ expected = {enabled} and i == _testinternalcapi.TIER2_THRESHOLD
+ assert sys._jit.is_active() is expected
+ frame_2_jit(expected)
+ assert sys._jit.is_active() is expected
+
+ def frame_4_interpreter() -> None:
+ assert sys._jit.is_active() is False
+ frame_3_jit()
+ assert sys._jit.is_active() is False
+
+ assert sys._jit.is_active() is False
+ frame_4_interpreter()
+ assert sys._jit.is_active() is False
+ """
+ )
+ assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0")
+ assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1")
if __name__ == "__main__":