aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_external_inspection.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_external_inspection.py')
-rw-r--r--Lib/test/test_external_inspection.py464
1 files changed, 370 insertions, 94 deletions
diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py
index 0f31c225e68..354a82a800f 100644
--- a/Lib/test/test_external_inspection.py
+++ b/Lib/test/test_external_inspection.py
@@ -5,9 +5,15 @@ import importlib
import sys
import socket
import threading
+import time
from asyncio import staggered, taskgroups, base_events, tasks
from unittest.mock import ANY
-from test.support import os_helper, SHORT_TIMEOUT, busy_retry, requires_gil_enabled
+from test.support import (
+ os_helper,
+ SHORT_TIMEOUT,
+ busy_retry,
+ requires_gil_enabled,
+)
from test.support.script_helper import make_script
from test.support.socket_helper import find_unused_port
@@ -235,55 +241,162 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
+ # First check all the tasks are present
+ tasks_names = [
+ task.task_name for task in stack_trace[0].awaited_by
+ ]
+ for task_name in ["c2_root", "sub_main_1", "sub_main_2"]:
+ self.assertIn(task_name, tasks_names)
+
+ # Now ensure that the awaited_by_relationships are correct
+ id_to_task = {
+ task.task_id: task for task in stack_trace[0].awaited_by
+ }
+ task_name_to_awaited_by = {
+ task.task_name: set(
+ id_to_task[awaited.task_name].task_name
+ for awaited in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ task_name_to_awaited_by,
+ {
+ "c2_root": {"Task-1", "sub_main_1", "sub_main_2"},
+ "Task-1": set(),
+ "sub_main_1": {"Task-1"},
+ "sub_main_2": {"Task-1"},
+ },
+ )
- expected_stack_trace = [
- [
- FrameInfo([script_name, 10, "c5"]),
- FrameInfo([script_name, 14, "c4"]),
- FrameInfo([script_name, 17, "c3"]),
- FrameInfo([script_name, 20, "c2"]),
- ],
- "c2_root",
- [
- CoroInfo(
- [
- [
- FrameInfo(
+ # Now ensure that the coroutine stacks are correct
+ coroutine_stacks = {
+ task.task_name: sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ coroutine_stacks,
+ {
+ "Task-1": [
+ (
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup._aexit",
+ ]
+ ),
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup.__aexit__",
+ ]
+ ),
+ tuple([script_name, 26, "main"]),
+ )
+ ],
+ "c2_root": [
+ (
+ tuple([script_name, 10, "c5"]),
+ tuple([script_name, 14, "c4"]),
+ tuple([script_name, 17, "c3"]),
+ tuple([script_name, 20, "c2"]),
+ )
+ ],
+ "sub_main_1": [(tuple([script_name, 23, "c1"]),)],
+ "sub_main_2": [(tuple([script_name, 23, "c1"]),)],
+ },
+ )
+
+ # Now ensure the coroutine stacks for the awaited_by relationships are correct.
+ awaited_by_coroutine_stacks = {
+ task.task_name: sorted(
+ (
+ id_to_task[coro.task_name].task_name,
+ tuple(tuple(frame) for frame in coro.call_stack),
+ )
+ for coro in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ awaited_by_coroutine_stacks,
+ {
+ "Task-1": [],
+ "c2_root": [
+ (
+ "Task-1",
+ (
+ tuple(
[
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
]
),
- FrameInfo(
+ tuple(
[
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
]
),
- FrameInfo([script_name, 26, "main"]),
- ],
+ tuple([script_name, 26, "main"]),
+ ),
+ ),
+ ("sub_main_1", (tuple([script_name, 23, "c1"]),)),
+ ("sub_main_2", (tuple([script_name, 23, "c1"]),)),
+ ],
+ "sub_main_1": [
+ (
"Task-1",
- ]
- ),
- CoroInfo(
- [
- [FrameInfo([script_name, 23, "c1"])],
- "sub_main_1",
- ]
- ),
- CoroInfo(
- [
- [FrameInfo([script_name, 23, "c1"])],
- "sub_main_2",
- ]
- ),
- ],
- ]
- self.assertEqual(stack_trace, expected_stack_trace)
+ (
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup._aexit",
+ ]
+ ),
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup.__aexit__",
+ ]
+ ),
+ tuple([script_name, 26, "main"]),
+ ),
+ )
+ ],
+ "sub_main_2": [
+ (
+ "Task-1",
+ (
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup._aexit",
+ ]
+ ),
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup.__aexit__",
+ ]
+ ),
+ tuple([script_name, 26, "main"]),
+ ),
+ )
+ ],
+ },
+ )
@skip_if_not_supported
@unittest.skipIf(
@@ -349,19 +462,29 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
-
- expected_stack_trace = [
+ # For this simple asyncgen test, we only expect one task with the full coroutine stack
+ self.assertEqual(len(stack_trace[0].awaited_by), 1)
+ task = stack_trace[0].awaited_by[0]
+ self.assertEqual(task.task_name, "Task-1")
+
+ # Check the coroutine stack - based on actual output, only shows main
+ coroutine_stack = sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ self.assertEqual(
+ coroutine_stack,
[
- FrameInfo([script_name, 10, "gen_nested_call"]),
- FrameInfo([script_name, 16, "gen"]),
- FrameInfo([script_name, 19, "main"]),
+ (
+ tuple([script_name, 10, "gen_nested_call"]),
+ tuple([script_name, 16, "gen"]),
+ tuple([script_name, 19, "main"]),
+ )
],
- "Task-1",
- [],
- ]
- self.assertEqual(stack_trace, expected_stack_trace)
+ )
+
+ # No awaited_by relationships expected for this simple case
+ self.assertEqual(task.awaited_by, [])
@skip_if_not_supported
@unittest.skipIf(
@@ -428,18 +551,73 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
-
- expected_stack_trace = [
- [
- FrameInfo([script_name, 11, "deep"]),
- FrameInfo([script_name, 15, "c1"]),
- ],
- "Task-2",
- [CoroInfo([[FrameInfo([script_name, 21, "main"])], "Task-1"])],
+ # First check all the tasks are present
+ tasks_names = [
+ task.task_name for task in stack_trace[0].awaited_by
]
- self.assertEqual(stack_trace, expected_stack_trace)
+ for task_name in ["Task-1", "Task-2"]:
+ self.assertIn(task_name, tasks_names)
+
+ # Now ensure that the awaited_by_relationships are correct
+ id_to_task = {
+ task.task_id: task for task in stack_trace[0].awaited_by
+ }
+ task_name_to_awaited_by = {
+ task.task_name: set(
+ id_to_task[awaited.task_name].task_name
+ for awaited in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ task_name_to_awaited_by,
+ {
+ "Task-1": set(),
+ "Task-2": {"Task-1"},
+ },
+ )
+
+ # Now ensure that the coroutine stacks are correct
+ coroutine_stacks = {
+ task.task_name: sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ coroutine_stacks,
+ {
+ "Task-1": [(tuple([script_name, 21, "main"]),)],
+ "Task-2": [
+ (
+ tuple([script_name, 11, "deep"]),
+ tuple([script_name, 15, "c1"]),
+ )
+ ],
+ },
+ )
+
+ # Now ensure the coroutine stacks for the awaited_by relationships are correct.
+ awaited_by_coroutine_stacks = {
+ task.task_name: sorted(
+ (
+ id_to_task[coro.task_name].task_name,
+ tuple(tuple(frame) for frame in coro.call_stack),
+ )
+ for coro in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ awaited_by_coroutine_stacks,
+ {
+ "Task-1": [],
+ "Task-2": [
+ ("Task-1", (tuple([script_name, 21, "main"]),))
+ ],
+ },
+ )
@skip_if_not_supported
@unittest.skipIf(
@@ -509,36 +687,93 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
- expected_stack_trace = [
- [
- FrameInfo([script_name, 11, "deep"]),
- FrameInfo([script_name, 15, "c1"]),
- FrameInfo(
- [
- staggered.__file__,
- ANY,
- "staggered_race.<locals>.run_one_coro",
- ]
- ),
- ],
- "Task-2",
- [
- CoroInfo(
- [
- [
- FrameInfo(
+ # First check all the tasks are present
+ tasks_names = [
+ task.task_name for task in stack_trace[0].awaited_by
+ ]
+ for task_name in ["Task-1", "Task-2"]:
+ self.assertIn(task_name, tasks_names)
+
+ # Now ensure that the awaited_by_relationships are correct
+ id_to_task = {
+ task.task_id: task for task in stack_trace[0].awaited_by
+ }
+ task_name_to_awaited_by = {
+ task.task_name: set(
+ id_to_task[awaited.task_name].task_name
+ for awaited in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ task_name_to_awaited_by,
+ {
+ "Task-1": set(),
+ "Task-2": {"Task-1"},
+ },
+ )
+
+ # Now ensure that the coroutine stacks are correct
+ coroutine_stacks = {
+ task.task_name: sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ coroutine_stacks,
+ {
+ "Task-1": [
+ (
+ tuple([staggered.__file__, ANY, "staggered_race"]),
+ tuple([script_name, 21, "main"]),
+ )
+ ],
+ "Task-2": [
+ (
+ tuple([script_name, 11, "deep"]),
+ tuple([script_name, 15, "c1"]),
+ tuple(
+ [
+ staggered.__file__,
+ ANY,
+ "staggered_race.<locals>.run_one_coro",
+ ]
+ ),
+ )
+ ],
+ },
+ )
+
+ # Now ensure the coroutine stacks for the awaited_by relationships are correct.
+ awaited_by_coroutine_stacks = {
+ task.task_name: sorted(
+ (
+ id_to_task[coro.task_name].task_name,
+ tuple(tuple(frame) for frame in coro.call_stack),
+ )
+ for coro in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ awaited_by_coroutine_stacks,
+ {
+ "Task-1": [],
+ "Task-2": [
+ (
+ "Task-1",
+ (
+ tuple(
[staggered.__file__, ANY, "staggered_race"]
),
- FrameInfo([script_name, 21, "main"]),
- ],
- "Task-1",
- ]
- )
- ],
- ]
- self.assertEqual(stack_trace, expected_stack_trace)
+ tuple([script_name, 21, "main"]),
+ ),
+ )
+ ],
+ },
+ )
@skip_if_not_supported
@unittest.skipIf(
@@ -930,9 +1165,6 @@ class TestGetStackTrace(unittest.TestCase):
# Signal threads to start waiting
ready_event.set()
- # Give threads time to start sleeping
- time.sleep(0.1)
-
# Now do busy work to hold the GIL
main_work()
"""
@@ -967,7 +1199,28 @@ class TestGetStackTrace(unittest.TestCase):
# Get stack trace with all threads
unwinder_all = RemoteUnwinder(p.pid, all_threads=True)
- all_traces = unwinder_all.get_stack_trace()
+ for _ in range(10):
+ # Wait for the main thread to start its busy work
+ all_traces = unwinder_all.get_stack_trace()
+ found = False
+ for thread_id, stack in all_traces:
+ if not stack:
+ continue
+ current_frame = stack[0]
+ if (
+ current_frame.funcname == "main_work"
+ and current_frame.lineno > 15
+ ):
+ found = True
+
+ if found:
+ break
+ # Give a bit of time to take the next sample
+ time.sleep(0.1)
+ else:
+ self.fail(
+ "Main thread did not start its busy work on time"
+ )
# Get stack trace with only GIL holder
unwinder_gil = RemoteUnwinder(p.pid, only_active_thread=True)
@@ -985,16 +1238,39 @@ class TestGetStackTrace(unittest.TestCase):
p.wait(timeout=SHORT_TIMEOUT)
# Verify we got multiple threads in all_traces
- self.assertGreater(len(all_traces), 1, "Should have multiple threads")
+ self.assertGreater(
+ len(all_traces), 1, "Should have multiple threads"
+ )
# Verify we got exactly one thread in gil_traces
- self.assertEqual(len(gil_traces), 1, "Should have exactly one GIL holder")
+ self.assertEqual(
+ len(gil_traces), 1, "Should have exactly one GIL holder"
+ )
# The GIL holder should be in the all_traces list
gil_thread_id = gil_traces[0][0]
all_thread_ids = [trace[0] for trace in all_traces]
- self.assertIn(gil_thread_id, all_thread_ids,
- "GIL holder should be among all threads")
+ self.assertIn(
+ gil_thread_id,
+ all_thread_ids,
+ "GIL holder should be among all threads",
+ )
+
+
+class TestUnsupportedPlatformHandling(unittest.TestCase):
+ @unittest.skipIf(
+ sys.platform in ("linux", "darwin", "win32"),
+ "Test only runs on unsupported platforms (not Linux, macOS, or Windows)",
+ )
+ @unittest.skipIf(sys.platform == "android", "Android raises Linux-specific exception")
+ def test_unsupported_platform_error(self):
+ with self.assertRaises(RuntimeError) as cm:
+ RemoteUnwinder(os.getpid())
+
+ self.assertIn(
+ "Reading the PyRuntime section is not supported on this platform",
+ str(cm.exception)
+ )
if __name__ == "__main__":