diff options
Diffstat (limited to 'Lib/test/test_external_inspection.py')
-rw-r--r-- | Lib/test/test_external_inspection.py | 345 |
1 files changed, 257 insertions, 88 deletions
diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index aa05db972f0..f787190b1ae 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -4,6 +4,8 @@ import textwrap import importlib import sys import socket +from asyncio import staggered, taskgroups +from unittest.mock import ANY from test.support import os_helper, SHORT_TIMEOUT, busy_retry from test.support.script_helper import make_script from test.support.socket_helper import find_unused_port @@ -13,32 +15,38 @@ import subprocess PROCESS_VM_READV_SUPPORTED = False try: - from _testexternalinspection import PROCESS_VM_READV_SUPPORTED - from _testexternalinspection import get_stack_trace - from _testexternalinspection import get_async_stack_trace - from _testexternalinspection import get_all_awaited_by + from _remotedebugging import PROCESS_VM_READV_SUPPORTED + from _remotedebugging import get_stack_trace + from _remotedebugging import get_async_stack_trace + from _remotedebugging import get_all_awaited_by except ImportError: - raise unittest.SkipTest( - "Test only runs when _testexternalinspection is available") + raise unittest.SkipTest("Test only runs when _remotedebuggingmodule is available") + def _make_test_script(script_dir, script_basename, source): to_return = make_script(script_dir, script_basename, source) importlib.invalidate_caches() return to_return -skip_if_not_supported = unittest.skipIf((sys.platform != "darwin" - and sys.platform != "linux" - and sys.platform != "win32"), - "Test only runs on Linux, Windows and MacOS") + +skip_if_not_supported = unittest.skipIf( + (sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32"), + "Test only runs on Linux, Windows and MacOS", +) + + class TestGetStackTrace(unittest.TestCase): @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import time, sys, socket # Connect to the test process sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -56,7 +64,8 @@ class TestGetStackTrace(unittest.TestCase): time.sleep(1000) bar() - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -65,11 +74,11 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -87,22 +96,24 @@ class TestGetStackTrace(unittest.TestCase): p.terminate() p.wait(timeout=SHORT_TIMEOUT) - expected_stack_trace = [ - 'foo', - 'baz', - 'bar', - '<module>' + ("foo", script_name, 15), + ("baz", script_name, 11), + ("bar", script_name, 9), + ("<module>", script_name, 17), ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import time import sys @@ -142,7 +153,8 @@ class TestGetStackTrace(unittest.TestCase): return loop asyncio.run(main(), loop_factory={{TASK_FACTORY}}) - """) + """ + ) stack_trace = None for task_factory_variant in "asyncio.new_event_loop", "new_eager_loop": with ( @@ -153,25 +165,24 @@ class TestGetStackTrace(unittest.TestCase): os.mkdir(script_dir) server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) script_name = _make_test_script( - script_dir, 'script', - script.format(TASK_FACTORY=task_factory_variant)) + script_dir, + "script", + script.format(TASK_FACTORY=task_factory_variant), + ) client_socket = None try: - p = subprocess.Popen( - [sys.executable, script_name] - ) + p = subprocess.Popen([sys.executable, script_name]) client_socket, _ = server_socket.accept() server_socket.close() response = client_socket.recv(1024) self.assertEqual(response, b"ready") stack_trace = get_async_stack_trace(p.pid) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -184,23 +195,91 @@ class TestGetStackTrace(unittest.TestCase): root_task = "Task-1" expected_stack_trace = [ - ["c5", "c4", "c3", "c2"], + [ + ("c5", script_name, 11), + ("c4", script_name, 15), + ("c3", script_name, 18), + ("c2", script_name, 21), + ], "c2_root", [ - [["main"], root_task, []], - [["c1"], "sub_main_1", [[["main"], root_task, []]]], - [["c1"], "sub_main_2", [[["main"], root_task, []]]], + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 27), + ], + "Task-1", + [], + ], + [ + [("c1", script_name, 24)], + "sub_main_1", + [ + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 27), + ], + "Task-1", + [], + ] + ], + ], + [ + [("c1", script_name, 24)], + "sub_main_2", + [ + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 27), + ], + "Task-1", + [], + ] + ], + ], ], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_asyncgen_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import time import sys @@ -224,7 +303,8 @@ class TestGetStackTrace(unittest.TestCase): pass asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -232,10 +312,10 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -257,17 +337,26 @@ class TestGetStackTrace(unittest.TestCase): stack_trace[2].sort(key=lambda x: x[1]) expected_stack_trace = [ - ['gen_nested_call', 'gen', 'main'], 'Task-1', [] + [ + ("gen_nested_call", script_name, 11), + ("gen", script_name, 17), + ("main", script_name, 20), + ], + "Task-1", + [], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_gather_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import time import sys @@ -292,7 +381,8 @@ class TestGetStackTrace(unittest.TestCase): await asyncio.gather(c1(), c2()) asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -300,10 +390,10 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -313,8 +403,7 @@ class TestGetStackTrace(unittest.TestCase): self.assertEqual(response, b"ready") stack_trace = get_async_stack_trace(p.pid) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -325,18 +414,23 @@ class TestGetStackTrace(unittest.TestCase): # sets are unordered, so we want to sort "awaited_by"s stack_trace[2].sort(key=lambda x: x[1]) - expected_stack_trace = [ - ['deep', 'c1'], 'Task-2', [[['main'], 'Task-1', []]] + expected_stack_trace = [ + [("deep", script_name, ANY), ("c1", script_name, 16)], + "Task-2", + [[[("main", script_name, 22)], "Task-1", []]], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_staggered_race_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio.staggered import time import sys @@ -364,7 +458,8 @@ class TestGetStackTrace(unittest.TestCase): ) asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -372,10 +467,10 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -385,8 +480,7 @@ class TestGetStackTrace(unittest.TestCase): self.assertEqual(response, b"ready") stack_trace = get_async_stack_trace(p.pid) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -396,18 +490,35 @@ class TestGetStackTrace(unittest.TestCase): # sets are unordered, so we want to sort "awaited_by"s stack_trace[2].sort(key=lambda x: x[1]) - - expected_stack_trace = [ - ['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]] + expected_stack_trace = [ + [ + ("deep", script_name, ANY), + ("c1", script_name, 16), + ("staggered_race.<locals>.run_one_coro", staggered.__file__, ANY), + ], + "Task-2", + [ + [ + [ + ("staggered_race", staggered.__file__, ANY), + ("main", script_name, 22), + ], + "Task-1", + [], + ] + ], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_global_awaited_by(self): port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import os import random @@ -443,6 +554,8 @@ class TestGetStackTrace(unittest.TestCase): assert message == data.decode() writer.close() await writer.wait_closed() + # Signal we are ready to sleep + sock.sendall(b"ready") await asyncio.sleep(SHORT_TIMEOUT) async def echo_client_spam(server): @@ -452,8 +565,10 @@ class TestGetStackTrace(unittest.TestCase): random.shuffle(msg) tg.create_task(echo_client("".join(msg))) await asyncio.sleep(0) - # at least a 1000 tasks created - sock.sendall(b"ready") + # at least a 1000 tasks created. Each task will signal + # when is ready to avoid the race caused by the fact that + # tasks are waited on tg.__exit__ and we cannot signal when + # that happens otherwise # at this point all client tasks completed without assertion errors # let's wrap up the test server.close() @@ -468,7 +583,8 @@ class TestGetStackTrace(unittest.TestCase): tg.create_task(echo_client_spam(server), name="echo client spam") asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -476,17 +592,19 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) client_socket, _ = server_socket.accept() server_socket.close() - response = client_socket.recv(1024) - self.assertEqual(response, b"ready") + for _ in range(1000): + expected_response = b"ready" + response = client_socket.recv(len(expected_response)) + self.assertEqual(response, expected_response) for _ in busy_retry(SHORT_TIMEOUT): try: all_awaited_by = get_all_awaited_by(p.pid) @@ -497,7 +615,9 @@ class TestGetStackTrace(unittest.TestCase): msg = str(re) if msg.startswith("Task list appears corrupted"): continue - elif msg.startswith("Invalid linked list structure reading remote memory"): + elif msg.startswith( + "Invalid linked list structure reading remote memory" + ): continue elif msg.startswith("Unknown error reading memory"): continue @@ -516,22 +636,62 @@ class TestGetStackTrace(unittest.TestCase): # expected: at least 1000 pending tasks self.assertGreaterEqual(len(entries), 1000) # the first three tasks stem from the code structure - self.assertIn(('Task-1', []), entries) - self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries) - self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries) + self.assertIn((ANY, "Task-1", []), entries) + main_stack = [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 60), + ] + self.assertIn( + (ANY, "server task", [[main_stack, ANY]]), + entries, + ) + self.assertIn( + (ANY, "echo client spam", [[main_stack, ANY]]), + entries, + ) - expected_stack = [[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]] - tasks_with_stack = [task for task in entries if task[1] == expected_stack] + expected_stack = [ + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("echo_client_spam", script_name, 41), + ], + ANY, + ] + ] + tasks_with_stack = [ + task for task in entries if task[2] == expected_stack + ] self.assertGreaterEqual(len(tasks_with_stack), 1000) # the final task will have some random number, but it should for # sure be one of the echo client spam horde (In windows this is not true # for some reason) if sys.platform != "win32": - self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1]) + self.assertEqual( + expected_stack, + entries[-1][2], + ) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -540,12 +700,21 @@ class TestGetStackTrace(unittest.TestCase): p.wait(timeout=SHORT_TIMEOUT) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_self_trace(self): stack_trace = get_stack_trace(os.getpid()) - print(stack_trace) - self.assertEqual(stack_trace[0], "test_self_trace") + self.assertEqual( + stack_trace[0], + ( + "TestGetStackTrace.test_self_trace", + __file__, + self.test_self_trace.__code__.co_firstlineno + 6, + ), + ) + if __name__ == "__main__": unittest.main() |