diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_external_inspection.py | 122 | ||||
-rw-r--r-- | Lib/test/test_iter.py | 2 | ||||
-rw-r--r-- | Lib/test/test_listcomps.py | 2 | ||||
-rw-r--r-- | Lib/test/test_platform.py | 16 |
4 files changed, 139 insertions, 3 deletions
diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index 90214e814f2..0f31c225e68 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -7,7 +7,7 @@ import socket import threading from asyncio import staggered, taskgroups, base_events, tasks from unittest.mock import ANY -from test.support import os_helper, SHORT_TIMEOUT, busy_retry +from test.support import os_helper, SHORT_TIMEOUT, busy_retry, requires_gil_enabled from test.support.script_helper import make_script from test.support.socket_helper import find_unused_port @@ -876,6 +876,126 @@ class TestGetStackTrace(unittest.TestCase): ], ) + @skip_if_not_supported + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) + @requires_gil_enabled("Free threaded builds don't have an 'active thread'") + def test_only_active_thread(self): + # Test that only_active_thread parameter works correctly + port = find_unused_port() + script = textwrap.dedent( + f"""\ + import time, sys, socket, threading + + # Connect to the test process + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('localhost', {port})) + + def worker_thread(name, barrier, ready_event): + barrier.wait() # Synchronize thread start + ready_event.wait() # Wait for main thread signal + # Sleep to keep thread alive + time.sleep(10_000) + + def main_work(): + # Do busy work to hold the GIL + sock.sendall(b"working\\n") + count = 0 + while count < 100000000: + count += 1 + if count % 10000000 == 0: + pass # Keep main thread busy + sock.sendall(b"done\\n") + + # Create synchronization primitives + num_threads = 3 + barrier = threading.Barrier(num_threads + 1) # +1 for main thread + ready_event = threading.Event() + + # Start worker threads + threads = [] + for i in range(num_threads): + t = threading.Thread(target=worker_thread, args=(f"Worker-{{i}}", barrier, ready_event)) + t.start() + threads.append(t) + + # Wait for all threads to be ready + barrier.wait() + + # Signal ready to parent process + sock.sendall(b"ready\\n") + + # Signal threads to start waiting + ready_event.set() + + # Give threads time to start sleeping + time.sleep(0.1) + + # Now do busy work to hold the GIL + main_work() + """ + ) + + with os_helper.temp_dir() as work_dir: + script_dir = os.path.join(work_dir, "script_pkg") + os.mkdir(script_dir) + + # Create a socket server to communicate with the target process + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(("localhost", port)) + server_socket.settimeout(SHORT_TIMEOUT) + server_socket.listen(1) + + script_name = _make_test_script(script_dir, "script", script) + client_socket = None + try: + p = subprocess.Popen([sys.executable, script_name]) + client_socket, _ = server_socket.accept() + server_socket.close() + + # Wait for ready signal + response = b"" + while b"ready" not in response: + response += client_socket.recv(1024) + + # Wait for the main thread to start its busy work + while b"working" not in response: + response += client_socket.recv(1024) + + # Get stack trace with all threads + unwinder_all = RemoteUnwinder(p.pid, all_threads=True) + all_traces = unwinder_all.get_stack_trace() + + # Get stack trace with only GIL holder + unwinder_gil = RemoteUnwinder(p.pid, only_active_thread=True) + gil_traces = unwinder_gil.get_stack_trace() + + except PermissionError: + self.skipTest( + "Insufficient permissions to read the stack trace" + ) + finally: + if client_socket is not None: + client_socket.close() + p.kill() + p.terminate() + p.wait(timeout=SHORT_TIMEOUT) + + # Verify we got multiple threads in all_traces + self.assertGreater(len(all_traces), 1, "Should have multiple threads") + + # Verify we got exactly one thread in gil_traces + self.assertEqual(len(gil_traces), 1, "Should have exactly one GIL holder") + + # The GIL holder should be in the all_traces list + gil_thread_id = gil_traces[0][0] + all_thread_ids = [trace[0] for trace in all_traces] + self.assertIn(gil_thread_id, all_thread_ids, + "GIL holder should be among all threads") + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_iter.py b/Lib/test/test_iter.py index 1b9f3cf7624..18e4b676c53 100644 --- a/Lib/test/test_iter.py +++ b/Lib/test/test_iter.py @@ -1147,7 +1147,7 @@ class TestCase(unittest.TestCase): def test_exception_locations(self): # The location of an exception raised from __init__ or - # __next__ should should be the iterator expression + # __next__ should be the iterator expression def init_raises(): try: diff --git a/Lib/test/test_listcomps.py b/Lib/test/test_listcomps.py index cffdeeacc5d..70148dc30fc 100644 --- a/Lib/test/test_listcomps.py +++ b/Lib/test/test_listcomps.py @@ -716,7 +716,7 @@ class ListComprehensionTest(unittest.TestCase): def test_exception_locations(self): # The location of an exception raised from __init__ or - # __next__ should should be the iterator expression + # __next__ should be the iterator expression def init_raises(): try: diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py index 3688cc4267b..479649053ab 100644 --- a/Lib/test/test_platform.py +++ b/Lib/test/test_platform.py @@ -133,6 +133,22 @@ class PlatformTest(unittest.TestCase): for terse in (False, True): res = platform.platform(aliased, terse) + def test__platform(self): + for src, res in [ + ('foo bar', 'foo_bar'), + ( + '1/2\\3:4;5"6(7)8(7)6"5;4:3\\2/1', + '1-2-3-4-5-6-7-8-7-6-5-4-3-2-1' + ), + ('--', ''), + ('-f', '-f'), + ('-foo----', '-foo'), + ('--foo---', '-foo'), + ('---foo--', '-foo'), + ]: + with self.subTest(src=src): + self.assertEqual(platform._platform(src), res) + def test_system(self): res = platform.system() |