aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_external_inspection.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_external_inspection.py')
-rw-r--r--Lib/test/test_external_inspection.py345
1 files changed, 257 insertions, 88 deletions
diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py
index aa05db972f0..f787190b1ae 100644
--- a/Lib/test/test_external_inspection.py
+++ b/Lib/test/test_external_inspection.py
@@ -4,6 +4,8 @@ import textwrap
import importlib
import sys
import socket
+from asyncio import staggered, taskgroups
+from unittest.mock import ANY
from test.support import os_helper, SHORT_TIMEOUT, busy_retry
from test.support.script_helper import make_script
from test.support.socket_helper import find_unused_port
@@ -13,32 +15,38 @@ import subprocess
PROCESS_VM_READV_SUPPORTED = False
try:
- from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
- from _testexternalinspection import get_stack_trace
- from _testexternalinspection import get_async_stack_trace
- from _testexternalinspection import get_all_awaited_by
+ from _remotedebugging import PROCESS_VM_READV_SUPPORTED
+ from _remotedebugging import get_stack_trace
+ from _remotedebugging import get_async_stack_trace
+ from _remotedebugging import get_all_awaited_by
except ImportError:
- raise unittest.SkipTest(
- "Test only runs when _testexternalinspection is available")
+ raise unittest.SkipTest("Test only runs when _remotedebuggingmodule is available")
+
def _make_test_script(script_dir, script_basename, source):
to_return = make_script(script_dir, script_basename, source)
importlib.invalidate_caches()
return to_return
-skip_if_not_supported = unittest.skipIf((sys.platform != "darwin"
- and sys.platform != "linux"
- and sys.platform != "win32"),
- "Test only runs on Linux, Windows and MacOS")
+
+skip_if_not_supported = unittest.skipIf(
+ (sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32"),
+ "Test only runs on Linux, Windows and MacOS",
+)
+
+
class TestGetStackTrace(unittest.TestCase):
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import time, sys, socket
# Connect to the test process
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -56,7 +64,8 @@ class TestGetStackTrace(unittest.TestCase):
time.sleep(1000)
bar()
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -65,11 +74,11 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -87,22 +96,24 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
-
expected_stack_trace = [
- 'foo',
- 'baz',
- 'bar',
- '<module>'
+ ("foo", script_name, 15),
+ ("baz", script_name, 11),
+ ("bar", script_name, 9),
+ ("<module>", script_name, 17),
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import time
import sys
@@ -142,7 +153,8 @@ class TestGetStackTrace(unittest.TestCase):
return loop
asyncio.run(main(), loop_factory={{TASK_FACTORY}})
- """)
+ """
+ )
stack_trace = None
for task_factory_variant in "asyncio.new_event_loop", "new_eager_loop":
with (
@@ -153,25 +165,24 @@ class TestGetStackTrace(unittest.TestCase):
os.mkdir(script_dir)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
script_name = _make_test_script(
- script_dir, 'script',
- script.format(TASK_FACTORY=task_factory_variant))
+ script_dir,
+ "script",
+ script.format(TASK_FACTORY=task_factory_variant),
+ )
client_socket = None
try:
- p = subprocess.Popen(
- [sys.executable, script_name]
- )
+ p = subprocess.Popen([sys.executable, script_name])
client_socket, _ = server_socket.accept()
server_socket.close()
response = client_socket.recv(1024)
self.assertEqual(response, b"ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -184,23 +195,91 @@ class TestGetStackTrace(unittest.TestCase):
root_task = "Task-1"
expected_stack_trace = [
- ["c5", "c4", "c3", "c2"],
+ [
+ ("c5", script_name, 11),
+ ("c4", script_name, 15),
+ ("c3", script_name, 18),
+ ("c2", script_name, 21),
+ ],
"c2_root",
[
- [["main"], root_task, []],
- [["c1"], "sub_main_1", [[["main"], root_task, []]]],
- [["c1"], "sub_main_2", [[["main"], root_task, []]]],
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 27),
+ ],
+ "Task-1",
+ [],
+ ],
+ [
+ [("c1", script_name, 24)],
+ "sub_main_1",
+ [
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 27),
+ ],
+ "Task-1",
+ [],
+ ]
+ ],
+ ],
+ [
+ [("c1", script_name, 24)],
+ "sub_main_2",
+ [
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 27),
+ ],
+ "Task-1",
+ [],
+ ]
+ ],
+ ],
],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_asyncgen_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import time
import sys
@@ -224,7 +303,8 @@ class TestGetStackTrace(unittest.TestCase):
pass
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -232,10 +312,10 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -257,17 +337,26 @@ class TestGetStackTrace(unittest.TestCase):
stack_trace[2].sort(key=lambda x: x[1])
expected_stack_trace = [
- ['gen_nested_call', 'gen', 'main'], 'Task-1', []
+ [
+ ("gen_nested_call", script_name, 11),
+ ("gen", script_name, 17),
+ ("main", script_name, 20),
+ ],
+ "Task-1",
+ [],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_gather_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import time
import sys
@@ -292,7 +381,8 @@ class TestGetStackTrace(unittest.TestCase):
await asyncio.gather(c1(), c2())
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -300,10 +390,10 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -313,8 +403,7 @@ class TestGetStackTrace(unittest.TestCase):
self.assertEqual(response, b"ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -325,18 +414,23 @@ class TestGetStackTrace(unittest.TestCase):
# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])
- expected_stack_trace = [
- ['deep', 'c1'], 'Task-2', [[['main'], 'Task-1', []]]
+ expected_stack_trace = [
+ [("deep", script_name, ANY), ("c1", script_name, 16)],
+ "Task-2",
+ [[[("main", script_name, 22)], "Task-1", []]],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_staggered_race_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio.staggered
import time
import sys
@@ -364,7 +458,8 @@ class TestGetStackTrace(unittest.TestCase):
)
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -372,10 +467,10 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -385,8 +480,7 @@ class TestGetStackTrace(unittest.TestCase):
self.assertEqual(response, b"ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -396,18 +490,35 @@ class TestGetStackTrace(unittest.TestCase):
# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])
-
- expected_stack_trace = [
- ['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]]
+ expected_stack_trace = [
+ [
+ ("deep", script_name, ANY),
+ ("c1", script_name, 16),
+ ("staggered_race.<locals>.run_one_coro", staggered.__file__, ANY),
+ ],
+ "Task-2",
+ [
+ [
+ [
+ ("staggered_race", staggered.__file__, ANY),
+ ("main", script_name, 22),
+ ],
+ "Task-1",
+ [],
+ ]
+ ],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_global_awaited_by(self):
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import os
import random
@@ -443,6 +554,8 @@ class TestGetStackTrace(unittest.TestCase):
assert message == data.decode()
writer.close()
await writer.wait_closed()
+ # Signal we are ready to sleep
+ sock.sendall(b"ready")
await asyncio.sleep(SHORT_TIMEOUT)
async def echo_client_spam(server):
@@ -452,8 +565,10 @@ class TestGetStackTrace(unittest.TestCase):
random.shuffle(msg)
tg.create_task(echo_client("".join(msg)))
await asyncio.sleep(0)
- # at least a 1000 tasks created
- sock.sendall(b"ready")
+ # at least a 1000 tasks created. Each task will signal
+ # when is ready to avoid the race caused by the fact that
+ # tasks are waited on tg.__exit__ and we cannot signal when
+ # that happens otherwise
# at this point all client tasks completed without assertion errors
# let's wrap up the test
server.close()
@@ -468,7 +583,8 @@ class TestGetStackTrace(unittest.TestCase):
tg.create_task(echo_client_spam(server), name="echo client spam")
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -476,17 +592,19 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
client_socket, _ = server_socket.accept()
server_socket.close()
- response = client_socket.recv(1024)
- self.assertEqual(response, b"ready")
+ for _ in range(1000):
+ expected_response = b"ready"
+ response = client_socket.recv(len(expected_response))
+ self.assertEqual(response, expected_response)
for _ in busy_retry(SHORT_TIMEOUT):
try:
all_awaited_by = get_all_awaited_by(p.pid)
@@ -497,7 +615,9 @@ class TestGetStackTrace(unittest.TestCase):
msg = str(re)
if msg.startswith("Task list appears corrupted"):
continue
- elif msg.startswith("Invalid linked list structure reading remote memory"):
+ elif msg.startswith(
+ "Invalid linked list structure reading remote memory"
+ ):
continue
elif msg.startswith("Unknown error reading memory"):
continue
@@ -516,22 +636,62 @@ class TestGetStackTrace(unittest.TestCase):
# expected: at least 1000 pending tasks
self.assertGreaterEqual(len(entries), 1000)
# the first three tasks stem from the code structure
- self.assertIn(('Task-1', []), entries)
- self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries)
- self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries)
+ self.assertIn((ANY, "Task-1", []), entries)
+ main_stack = [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 60),
+ ]
+ self.assertIn(
+ (ANY, "server task", [[main_stack, ANY]]),
+ entries,
+ )
+ self.assertIn(
+ (ANY, "echo client spam", [[main_stack, ANY]]),
+ entries,
+ )
- expected_stack = [[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]]
- tasks_with_stack = [task for task in entries if task[1] == expected_stack]
+ expected_stack = [
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("echo_client_spam", script_name, 41),
+ ],
+ ANY,
+ ]
+ ]
+ tasks_with_stack = [
+ task for task in entries if task[2] == expected_stack
+ ]
self.assertGreaterEqual(len(tasks_with_stack), 1000)
# the final task will have some random number, but it should for
# sure be one of the echo client spam horde (In windows this is not true
# for some reason)
if sys.platform != "win32":
- self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1])
+ self.assertEqual(
+ expected_stack,
+ entries[-1][2],
+ )
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -540,12 +700,21 @@ class TestGetStackTrace(unittest.TestCase):
p.wait(timeout=SHORT_TIMEOUT)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_self_trace(self):
stack_trace = get_stack_trace(os.getpid())
- print(stack_trace)
- self.assertEqual(stack_trace[0], "test_self_trace")
+ self.assertEqual(
+ stack_trace[0],
+ (
+ "TestGetStackTrace.test_self_trace",
+ __file__,
+ self.test_self_trace.__code__.co_firstlineno + 6,
+ ),
+ )
+
if __name__ == "__main__":
unittest.main()