aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_asyncio/test_graph.py
diff options
context:
space:
mode:
authorYury Selivanov <yury@edgedb.com>2025-01-22 08:25:29 -0800
committerGitHub <noreply@github.com>2025-01-22 17:25:29 +0100
commit188598851d5cf475fa57b4ec21c0e88ce9316ff0 (patch)
treecd8face9dc12d1d001503aa2e502d06ac391295d /Lib/test/test_asyncio/test_graph.py
parent60a3a0dd6fe140fdc87f6e769ee5bb17d92efe4e (diff)
downloadcpython-188598851d5cf475fa57b4ec21c0e88ce9316ff0.tar.gz
cpython-188598851d5cf475fa57b4ec21c0e88ce9316ff0.zip
GH-91048: Add utils for capturing async call stack for asyncio programs and enable profiling (#124640)
Signed-off-by: Pablo Galindo <pablogsal@gmail.com> Co-authored-by: Pablo Galindo <pablogsal@gmail.com> Co-authored-by: Kumar Aditya <kumaraditya@python.org> Co-authored-by: Ɓukasz Langa <lukasz@langa.pl> Co-authored-by: Savannah Ostrowski <savannahostrowski@gmail.com> Co-authored-by: Jacob Coffee <jacob@z7x.org> Co-authored-by: Irit Katriel <1055913+iritkatriel@users.noreply.github.com>
Diffstat (limited to 'Lib/test/test_asyncio/test_graph.py')
-rw-r--r--Lib/test/test_asyncio/test_graph.py436
1 files changed, 436 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_graph.py b/Lib/test/test_asyncio/test_graph.py
new file mode 100644
index 00000000000..fd2160d4ca3
--- /dev/null
+++ b/Lib/test/test_asyncio/test_graph.py
@@ -0,0 +1,436 @@
+import asyncio
+import io
+import unittest
+
+
+# To prevent a warning "test altered the execution environment"
+def tearDownModule():
+ asyncio._set_event_loop_policy(None)
+
+
+def capture_test_stack(*, fut=None, depth=1):
+
+ def walk(s):
+ ret = [
+ (f"T<{n}>" if '-' not in (n := s.future.get_name()) else 'T<anon>')
+ if isinstance(s.future, asyncio.Task) else 'F'
+ ]
+
+ ret.append(
+ [
+ (
+ f"s {entry.frame.f_code.co_name}"
+ if entry.frame.f_generator is None else
+ (
+ f"a {entry.frame.f_generator.cr_code.co_name}"
+ if hasattr(entry.frame.f_generator, 'cr_code') else
+ f"ag {entry.frame.f_generator.ag_code.co_name}"
+ )
+ ) for entry in s.call_stack
+ ]
+ )
+
+ ret.append(
+ sorted([
+ walk(ab) for ab in s.awaited_by
+ ], key=lambda entry: entry[0])
+ )
+
+ return ret
+
+ buf = io.StringIO()
+ asyncio.print_call_graph(fut, file=buf, depth=depth+1)
+
+ stack = asyncio.capture_call_graph(fut, depth=depth)
+ return walk(stack), buf.getvalue()
+
+
+class CallStackTestBase:
+
+ async def test_stack_tgroup(self):
+
+ stack_for_c5 = None
+
+ def c5():
+ nonlocal stack_for_c5
+ stack_for_c5 = capture_test_stack(depth=2)
+
+ async def c4():
+ await asyncio.sleep(0)
+ c5()
+
+ async def c3():
+ await c4()
+
+ async def c2():
+ await c3()
+
+ async def c1(task):
+ await task
+
+ async def main():
+ async with asyncio.TaskGroup() as tg:
+ task = tg.create_task(c2(), name="c2_root")
+ tg.create_task(c1(task), name="sub_main_1")
+ tg.create_task(c1(task), name="sub_main_2")
+
+ await main()
+
+ self.assertEqual(stack_for_c5[0], [
+ # task name
+ 'T<c2_root>',
+ # call stack
+ ['s c5', 'a c4', 'a c3', 'a c2'],
+ # awaited by
+ [
+ ['T<anon>',
+ ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], []
+ ],
+ ['T<sub_main_1>',
+ ['a c1'],
+ [
+ ['T<anon>',
+ ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], []
+ ]
+ ]
+ ],
+ ['T<sub_main_2>',
+ ['a c1'],
+ [
+ ['T<anon>',
+ ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], []
+ ]
+ ]
+ ]
+ ]
+ ])
+
+ self.assertIn(
+ ' async CallStackTestBase.test_stack_tgroup()',
+ stack_for_c5[1])
+
+
+ async def test_stack_async_gen(self):
+
+ stack_for_gen_nested_call = None
+
+ async def gen_nested_call():
+ nonlocal stack_for_gen_nested_call
+ stack_for_gen_nested_call = capture_test_stack()
+
+ async def gen():
+ for num in range(2):
+ yield num
+ if num == 1:
+ await gen_nested_call()
+
+ async def main():
+ async for el in gen():
+ pass
+
+ await main()
+
+ self.assertEqual(stack_for_gen_nested_call[0], [
+ 'T<anon>',
+ [
+ 's capture_test_stack',
+ 'a gen_nested_call',
+ 'ag gen',
+ 'a main',
+ 'a test_stack_async_gen'
+ ],
+ []
+ ])
+
+ self.assertIn(
+ 'async generator CallStackTestBase.test_stack_async_gen.<locals>.gen()',
+ stack_for_gen_nested_call[1])
+
+ async def test_stack_gather(self):
+
+ stack_for_deep = None
+
+ async def deep():
+ await asyncio.sleep(0)
+ nonlocal stack_for_deep
+ stack_for_deep = capture_test_stack()
+
+ async def c1():
+ await asyncio.sleep(0)
+ await deep()
+
+ async def c2():
+ await asyncio.sleep(0)
+
+ async def main():
+ await asyncio.gather(c1(), c2())
+
+ await main()
+
+ self.assertEqual(stack_for_deep[0], [
+ 'T<anon>',
+ ['s capture_test_stack', 'a deep', 'a c1'],
+ [
+ ['T<anon>', ['a main', 'a test_stack_gather'], []]
+ ]
+ ])
+
+ async def test_stack_shield(self):
+
+ stack_for_shield = None
+
+ async def deep():
+ await asyncio.sleep(0)
+ nonlocal stack_for_shield
+ stack_for_shield = capture_test_stack()
+
+ async def c1():
+ await asyncio.sleep(0)
+ await deep()
+
+ async def main():
+ await asyncio.shield(c1())
+
+ await main()
+
+ self.assertEqual(stack_for_shield[0], [
+ 'T<anon>',
+ ['s capture_test_stack', 'a deep', 'a c1'],
+ [
+ ['T<anon>', ['a main', 'a test_stack_shield'], []]
+ ]
+ ])
+
+ async def test_stack_timeout(self):
+
+ stack_for_inner = None
+
+ async def inner():
+ await asyncio.sleep(0)
+ nonlocal stack_for_inner
+ stack_for_inner = capture_test_stack()
+
+ async def c1():
+ async with asyncio.timeout(1):
+ await asyncio.sleep(0)
+ await inner()
+
+ async def main():
+ await asyncio.shield(c1())
+
+ await main()
+
+ self.assertEqual(stack_for_inner[0], [
+ 'T<anon>',
+ ['s capture_test_stack', 'a inner', 'a c1'],
+ [
+ ['T<anon>', ['a main', 'a test_stack_timeout'], []]
+ ]
+ ])
+
+ async def test_stack_wait(self):
+
+ stack_for_inner = None
+
+ async def inner():
+ await asyncio.sleep(0)
+ nonlocal stack_for_inner
+ stack_for_inner = capture_test_stack()
+
+ async def c1():
+ async with asyncio.timeout(1):
+ await asyncio.sleep(0)
+ await inner()
+
+ async def c2():
+ for i in range(3):
+ await asyncio.sleep(0)
+
+ async def main(t1, t2):
+ while True:
+ _, pending = await asyncio.wait([t1, t2])
+ if not pending:
+ break
+
+ t1 = asyncio.create_task(c1())
+ t2 = asyncio.create_task(c2())
+ try:
+ await main(t1, t2)
+ finally:
+ await t1
+ await t2
+
+ self.assertEqual(stack_for_inner[0], [
+ 'T<anon>',
+ ['s capture_test_stack', 'a inner', 'a c1'],
+ [
+ ['T<anon>',
+ ['a _wait', 'a wait', 'a main', 'a test_stack_wait'],
+ []
+ ]
+ ]
+ ])
+
+ async def test_stack_task(self):
+
+ stack_for_inner = None
+
+ async def inner():
+ await asyncio.sleep(0)
+ nonlocal stack_for_inner
+ stack_for_inner = capture_test_stack()
+
+ async def c1():
+ await inner()
+
+ async def c2():
+ await asyncio.create_task(c1(), name='there there')
+
+ async def main():
+ await c2()
+
+ await main()
+
+ self.assertEqual(stack_for_inner[0], [
+ 'T<there there>',
+ ['s capture_test_stack', 'a inner', 'a c1'],
+ [['T<anon>', ['a c2', 'a main', 'a test_stack_task'], []]]
+ ])
+
+ async def test_stack_future(self):
+
+ stack_for_fut = None
+
+ async def a2(fut):
+ await fut
+
+ async def a1(fut):
+ await a2(fut)
+
+ async def b1(fut):
+ await fut
+
+ async def main():
+ nonlocal stack_for_fut
+
+ fut = asyncio.Future()
+ async with asyncio.TaskGroup() as g:
+ g.create_task(a1(fut), name="task A")
+ g.create_task(b1(fut), name='task B')
+
+ for _ in range(5):
+ # Do a few iterations to ensure that both a1 and b1
+ # await on the future
+ await asyncio.sleep(0)
+
+ stack_for_fut = capture_test_stack(fut=fut)
+ fut.set_result(None)
+
+ await main()
+
+ self.assertEqual(stack_for_fut[0],
+ ['F',
+ [],
+ [
+ ['T<task A>',
+ ['a a2', 'a a1'],
+ [['T<anon>', ['a test_stack_future'], []]]
+ ],
+ ['T<task B>',
+ ['a b1'],
+ [['T<anon>', ['a test_stack_future'], []]]
+ ],
+ ]]
+ )
+
+ self.assertTrue(stack_for_fut[1].startswith('* Future(id='))
+
+
+@unittest.skipIf(
+ not hasattr(asyncio.futures, "_c_future_add_to_awaited_by"),
+ "C-accelerated asyncio call graph backend missing",
+)
+class TestCallStackC(CallStackTestBase, unittest.IsolatedAsyncioTestCase):
+ def setUp(self):
+ futures = asyncio.futures
+ tasks = asyncio.tasks
+
+ self._Future = asyncio.Future
+ asyncio.Future = futures.Future = futures._CFuture
+
+ self._Task = asyncio.Task
+ asyncio.Task = tasks.Task = tasks._CTask
+
+ self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by
+ futures.future_add_to_awaited_by = futures._c_future_add_to_awaited_by
+ asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by
+
+ self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by
+ futures.future_discard_from_awaited_by = futures._c_future_discard_from_awaited_by
+ asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by
+
+
+ def tearDown(self):
+ futures = asyncio.futures
+ tasks = asyncio.tasks
+
+ futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by
+ asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by
+ del self._future_discard_from_awaited_by
+
+ futures.future_add_to_awaited_by = self._future_add_to_awaited_by
+ asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by
+ del self._future_add_to_awaited_by
+
+ asyncio.Task = self._Task
+ tasks.Task = self._Task
+ del self._Task
+
+ asyncio.Future = self._Future
+ futures.Future = self._Future
+ del self._Future
+
+
+@unittest.skipIf(
+ not hasattr(asyncio.futures, "_py_future_add_to_awaited_by"),
+ "Pure Python asyncio call graph backend missing",
+)
+class TestCallStackPy(CallStackTestBase, unittest.IsolatedAsyncioTestCase):
+ def setUp(self):
+ futures = asyncio.futures
+ tasks = asyncio.tasks
+
+ self._Future = asyncio.Future
+ asyncio.Future = futures.Future = futures._PyFuture
+
+ self._Task = asyncio.Task
+ asyncio.Task = tasks.Task = tasks._PyTask
+
+ self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by
+ futures.future_add_to_awaited_by = futures._py_future_add_to_awaited_by
+ asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by
+
+ self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by
+ futures.future_discard_from_awaited_by = futures._py_future_discard_from_awaited_by
+ asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by
+
+
+ def tearDown(self):
+ futures = asyncio.futures
+ tasks = asyncio.tasks
+
+ futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by
+ asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by
+ del self._future_discard_from_awaited_by
+
+ futures.future_add_to_awaited_by = self._future_add_to_awaited_by
+ asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by
+ del self._future_add_to_awaited_by
+
+ asyncio.Task = self._Task
+ tasks.Task = self._Task
+ del self._Task
+
+ asyncio.Future = self._Future
+ futures.Future = self._Future
+ del self._Future