aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r--Lib/test/test_asyncio/test_eager_task_factory.py37
-rw-r--r--Lib/test/test_asyncio/test_futures.py58
-rw-r--r--Lib/test/test_asyncio/test_locks.py2
-rw-r--r--Lib/test/test_asyncio/test_selector_events.py16
-rw-r--r--Lib/test/test_asyncio/test_ssl.py10
-rw-r--r--Lib/test/test_asyncio/test_tasks.py73
-rw-r--r--Lib/test/test_asyncio/test_tools.py1706
7 files changed, 1889 insertions, 13 deletions
diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py
index a2fb1022ae4..9f3b6f9acef 100644
--- a/Lib/test/test_asyncio/test_eager_task_factory.py
+++ b/Lib/test/test_asyncio/test_eager_task_factory.py
@@ -263,6 +263,24 @@ class EagerTaskFactoryLoopTests:
self.run_coro(run())
+ def test_eager_start_false(self):
+ name = None
+
+ async def asyncfn():
+ nonlocal name
+ name = asyncio.current_task().get_name()
+
+ async def main():
+ t = asyncio.get_running_loop().create_task(
+ asyncfn(), eager_start=False, name="example"
+ )
+ self.assertFalse(t.done())
+ self.assertIsNone(name)
+ await t
+ self.assertEqual(name, "example")
+
+ self.run_coro(main())
+
class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
Task = tasks._PyTask
@@ -505,5 +523,24 @@ class EagerCTaskTests(BaseEagerTaskFactoryTests, test_utils.TestCase):
asyncio.current_task = asyncio.tasks.current_task = self._current_task
return super().tearDown()
+
+class DefaultTaskFactoryEagerStart(test_utils.TestCase):
+ def test_eager_start_true_with_default_factory(self):
+ name = None
+
+ async def asyncfn():
+ nonlocal name
+ name = asyncio.current_task().get_name()
+
+ async def main():
+ t = asyncio.get_running_loop().create_task(
+ asyncfn(), eager_start=True, name="example"
+ )
+ self.assertTrue(t.done())
+ self.assertEqual(name, "example")
+ await t
+
+ asyncio.run(main(), loop_factory=asyncio.EventLoop)
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index 8b51522278a..39bef465bdb 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -413,7 +413,7 @@ class BaseFutureTests:
def test_copy_state(self):
from asyncio.futures import _copy_future_state
- f = self._new_future(loop=self.loop)
+ f = concurrent.futures.Future()
f.set_result(10)
newf = self._new_future(loop=self.loop)
@@ -421,7 +421,7 @@ class BaseFutureTests:
self.assertTrue(newf.done())
self.assertEqual(newf.result(), 10)
- f_exception = self._new_future(loop=self.loop)
+ f_exception = concurrent.futures.Future()
f_exception.set_exception(RuntimeError())
newf_exception = self._new_future(loop=self.loop)
@@ -429,7 +429,7 @@ class BaseFutureTests:
self.assertTrue(newf_exception.done())
self.assertRaises(RuntimeError, newf_exception.result)
- f_cancelled = self._new_future(loop=self.loop)
+ f_cancelled = concurrent.futures.Future()
f_cancelled.cancel()
newf_cancelled = self._new_future(loop=self.loop)
@@ -441,7 +441,7 @@ class BaseFutureTests:
except BaseException as e:
f_exc = e
- f_conexc = self._new_future(loop=self.loop)
+ f_conexc = concurrent.futures.Future()
f_conexc.set_exception(f_exc)
newf_conexc = self._new_future(loop=self.loop)
@@ -454,6 +454,56 @@ class BaseFutureTests:
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)
+ def test_copy_state_from_concurrent_futures(self):
+ """Test _copy_future_state from concurrent.futures.Future.
+
+ This tests the optimized path using _get_snapshot when available.
+ """
+ from asyncio.futures import _copy_future_state
+
+ # Test with a result
+ f_concurrent = concurrent.futures.Future()
+ f_concurrent.set_result(42)
+ f_asyncio = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent, f_asyncio)
+ self.assertTrue(f_asyncio.done())
+ self.assertEqual(f_asyncio.result(), 42)
+
+ # Test with an exception
+ f_concurrent_exc = concurrent.futures.Future()
+ f_concurrent_exc.set_exception(ValueError("test exception"))
+ f_asyncio_exc = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_exc, f_asyncio_exc)
+ self.assertTrue(f_asyncio_exc.done())
+ with self.assertRaises(ValueError) as cm:
+ f_asyncio_exc.result()
+ self.assertEqual(str(cm.exception), "test exception")
+
+ # Test with cancelled state
+ f_concurrent_cancelled = concurrent.futures.Future()
+ f_concurrent_cancelled.cancel()
+ f_asyncio_cancelled = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
+ self.assertTrue(f_asyncio_cancelled.cancelled())
+
+ # Test that destination already cancelled prevents copy
+ f_concurrent_result = concurrent.futures.Future()
+ f_concurrent_result.set_result(10)
+ f_asyncio_precancelled = self._new_future(loop=self.loop)
+ f_asyncio_precancelled.cancel()
+ _copy_future_state(f_concurrent_result, f_asyncio_precancelled)
+ self.assertTrue(f_asyncio_precancelled.cancelled())
+
+ # Test exception type conversion
+ f_concurrent_invalid = concurrent.futures.Future()
+ f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
+ f_asyncio_invalid = self._new_future(loop=self.loop)
+ _copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
+ self.assertTrue(f_asyncio_invalid.done())
+ with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
+ f_asyncio_invalid.result()
+ self.assertEqual(str(cm.exception), "invalid")
+
def test_iter(self):
fut = self._new_future(loop=self.loop)
diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py
index 3bb3e5c4ca0..047f03cbb14 100644
--- a/Lib/test/test_asyncio/test_locks.py
+++ b/Lib/test/test_asyncio/test_locks.py
@@ -14,7 +14,7 @@ STR_RGX_REPR = (
r'(, value:\d)?'
r'(, waiters:\d+)?'
r'(, waiters:\d+\/\d+)?' # barrier
- r')\]>\Z'
+ r')\]>\z'
)
RGX_REPR = re.compile(STR_RGX_REPR)
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
index de81936b745..aab6a779170 100644
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -347,6 +347,18 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
selectors.EVENT_WRITE)])
self.loop._remove_writer.assert_called_with(1)
+ def test_accept_connection_zero_one(self):
+ for backlog in [0, 1]:
+ sock = mock.Mock()
+ sock.accept.return_value = (mock.Mock(), mock.Mock())
+ with self.subTest(backlog):
+ mock_obj = mock.patch.object
+ with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
+ self.loop._accept_connection(
+ mock.Mock(), sock, backlog=backlog)
+ self.loop.run_until_complete(asyncio.sleep(0))
+ self.assertEqual(sock.accept.call_count, backlog + 1)
+
def test_accept_connection_multiple(self):
sock = mock.Mock()
sock.accept.return_value = (mock.Mock(), mock.Mock())
@@ -362,7 +374,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
self.loop._accept_connection(
mock.Mock(), sock, backlog=backlog)
self.loop.run_until_complete(asyncio.sleep(0))
- self.assertEqual(sock.accept.call_count, backlog)
+ self.assertEqual(sock.accept.call_count, backlog + 1)
def test_accept_connection_skip_connectionabortederror(self):
sock = mock.Mock()
@@ -388,7 +400,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
# as in test_accept_connection_multiple avoid task pending
# warnings by using asyncio.sleep(0)
self.loop.run_until_complete(asyncio.sleep(0))
- self.assertEqual(sock.accept.call_count, backlog)
+ self.assertEqual(sock.accept.call_count, backlog + 1)
class SelectorTransportTests(test_utils.TestCase):
diff --git a/Lib/test/test_asyncio/test_ssl.py b/Lib/test/test_asyncio/test_ssl.py
index 986ecc2c5a9..3a7185cd897 100644
--- a/Lib/test/test_asyncio/test_ssl.py
+++ b/Lib/test/test_asyncio/test_ssl.py
@@ -195,9 +195,10 @@ class TestSSL(test_utils.TestCase):
except (BrokenPipeError, ConnectionError):
pass
- def test_create_server_ssl_1(self):
+ @support.bigmemtest(size=25, memuse=90*2**20, dry_run=False)
+ def test_create_server_ssl_1(self, size):
CNT = 0 # number of clients that were successful
- TOTAL_CNT = 25 # total number of clients that test will create
+ TOTAL_CNT = size # total number of clients that test will create
TIMEOUT = support.LONG_TIMEOUT # timeout for this test
A_DATA = b'A' * 1024 * BUF_MULTIPLIER
@@ -1038,9 +1039,10 @@ class TestSSL(test_utils.TestCase):
self.loop.run_until_complete(run_main())
- def test_create_server_ssl_over_ssl(self):
+ @support.bigmemtest(size=25, memuse=90*2**20, dry_run=False)
+ def test_create_server_ssl_over_ssl(self, size):
CNT = 0 # number of clients that were successful
- TOTAL_CNT = 25 # total number of clients that test will create
+ TOTAL_CNT = size # total number of clients that test will create
TIMEOUT = support.LONG_TIMEOUT # timeout for this test
A_DATA = b'A' * 1024 * BUF_MULTIPLIER
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 8d7f1733454..f6f976f213a 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -89,8 +89,8 @@ class BaseTaskTests:
Future = None
all_tasks = None
- def new_task(self, loop, coro, name='TestTask', context=None):
- return self.__class__.Task(coro, loop=loop, name=name, context=context)
+ def new_task(self, loop, coro, name='TestTask', context=None, eager_start=None):
+ return self.__class__.Task(coro, loop=loop, name=name, context=context, eager_start=eager_start)
def new_future(self, loop):
return self.__class__.Future(loop=loop)
@@ -2116,6 +2116,46 @@ class BaseTaskTests:
self.assertTrue(outer.cancelled())
self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
+ def test_shield_cancel_outer_result(self):
+ mock_handler = mock.Mock()
+ self.loop.set_exception_handler(mock_handler)
+ inner = self.new_future(self.loop)
+ outer = asyncio.shield(inner)
+ test_utils.run_briefly(self.loop)
+ outer.cancel()
+ test_utils.run_briefly(self.loop)
+ inner.set_result(1)
+ test_utils.run_briefly(self.loop)
+ mock_handler.assert_not_called()
+
+ def test_shield_cancel_outer_exception(self):
+ mock_handler = mock.Mock()
+ self.loop.set_exception_handler(mock_handler)
+ inner = self.new_future(self.loop)
+ outer = asyncio.shield(inner)
+ test_utils.run_briefly(self.loop)
+ outer.cancel()
+ test_utils.run_briefly(self.loop)
+ inner.set_exception(Exception('foo'))
+ test_utils.run_briefly(self.loop)
+ mock_handler.assert_called_once()
+
+ def test_shield_duplicate_log_once(self):
+ mock_handler = mock.Mock()
+ self.loop.set_exception_handler(mock_handler)
+ inner = self.new_future(self.loop)
+ outer = asyncio.shield(inner)
+ test_utils.run_briefly(self.loop)
+ outer.cancel()
+ test_utils.run_briefly(self.loop)
+ outer = asyncio.shield(inner)
+ test_utils.run_briefly(self.loop)
+ outer.cancel()
+ test_utils.run_briefly(self.loop)
+ inner.set_exception(Exception('foo'))
+ test_utils.run_briefly(self.loop)
+ mock_handler.assert_called_once()
+
def test_shield_shortcut(self):
fut = self.new_future(self.loop)
fut.set_result(42)
@@ -2686,6 +2726,35 @@ class BaseTaskTests:
self.assertEqual([None, 1, 2], ret)
+ def test_eager_start_true(self):
+ name = None
+
+ async def asyncfn():
+ nonlocal name
+ name = self.current_task().get_name()
+
+ async def main():
+ t = self.new_task(coro=asyncfn(), loop=asyncio.get_running_loop(), eager_start=True, name="example")
+ self.assertTrue(t.done())
+ self.assertEqual(name, "example")
+ await t
+
+ def test_eager_start_false(self):
+ name = None
+
+ async def asyncfn():
+ nonlocal name
+ name = self.current_task().get_name()
+
+ async def main():
+ t = self.new_task(coro=asyncfn(), loop=asyncio.get_running_loop(), eager_start=False, name="example")
+ self.assertFalse(t.done())
+ self.assertIsNone(name)
+ await t
+ self.assertEqual(name, "example")
+
+ asyncio.run(main(), loop_factory=asyncio.EventLoop)
+
def test_get_coro(self):
loop = asyncio.new_event_loop()
coro = coroutine_function()
diff --git a/Lib/test/test_asyncio/test_tools.py b/Lib/test/test_asyncio/test_tools.py
new file mode 100644
index 00000000000..34e94830204
--- /dev/null
+++ b/Lib/test/test_asyncio/test_tools.py
@@ -0,0 +1,1706 @@
+import unittest
+
+from asyncio import tools
+
+from collections import namedtuple
+
+FrameInfo = namedtuple('FrameInfo', ['funcname', 'filename', 'lineno'])
+CoroInfo = namedtuple('CoroInfo', ['call_stack', 'task_name'])
+TaskInfo = namedtuple('TaskInfo', ['task_id', 'task_name', 'coroutine_stack', 'awaited_by'])
+AwaitedInfo = namedtuple('AwaitedInfo', ['thread_id', 'awaited_by'])
+
+
+# mock output of get_all_awaited_by function.
+TEST_INPUTS_TREE = [
+ [
+ # test case containing a task called timer being awaited in two
+ # different subtasks part of a TaskGroup (root1 and root2) which call
+ # awaiter functions.
+ (
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="timer",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiter3", "/path/to/app.py", 130),
+ FrameInfo("awaiter2", "/path/to/app.py", 120),
+ FrameInfo("awaiter", "/path/to/app.py", 110)
+ ],
+ task_name=4
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiterB3", "/path/to/app.py", 190),
+ FrameInfo("awaiterB2", "/path/to/app.py", 180),
+ FrameInfo("awaiterB", "/path/to/app.py", 170)
+ ],
+ task_name=5
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiterB3", "/path/to/app.py", 190),
+ FrameInfo("awaiterB2", "/path/to/app.py", 180),
+ FrameInfo("awaiterB", "/path/to/app.py", 170)
+ ],
+ task_name=6
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiter3", "/path/to/app.py", 130),
+ FrameInfo("awaiter2", "/path/to/app.py", 120),
+ FrameInfo("awaiter", "/path/to/app.py", 110)
+ ],
+ task_name=7
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=8,
+ task_name="root1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("main", "", 0)
+ ],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=9,
+ task_name="root2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("main", "", 0)
+ ],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="child1_1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=8
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="child2_1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=8
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=7,
+ task_name="child1_2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=9
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=5,
+ task_name="child2_2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=9
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ),
+ (
+ [
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " └── __aexit__",
+ " └── _aexit",
+ " ├── (T) root1",
+ " │ └── bloch",
+ " │ └── blocho_caller",
+ " │ └── __aexit__",
+ " │ └── _aexit",
+ " │ ├── (T) child1_1",
+ " │ │ └── awaiter /path/to/app.py:110",
+ " │ │ └── awaiter2 /path/to/app.py:120",
+ " │ │ └── awaiter3 /path/to/app.py:130",
+ " │ │ └── (T) timer",
+ " │ └── (T) child2_1",
+ " │ └── awaiterB /path/to/app.py:170",
+ " │ └── awaiterB2 /path/to/app.py:180",
+ " │ └── awaiterB3 /path/to/app.py:190",
+ " │ └── (T) timer",
+ " └── (T) root2",
+ " └── bloch",
+ " └── blocho_caller",
+ " └── __aexit__",
+ " └── _aexit",
+ " ├── (T) child1_2",
+ " │ └── awaiter /path/to/app.py:110",
+ " │ └── awaiter2 /path/to/app.py:120",
+ " │ └── awaiter3 /path/to/app.py:130",
+ " │ └── (T) timer",
+ " └── (T) child2_2",
+ " └── awaiterB /path/to/app.py:170",
+ " └── awaiterB2 /path/to/app.py:180",
+ " └── awaiterB3 /path/to/app.py:190",
+ " └── (T) timer",
+ ]
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots
+ (
+ AwaitedInfo(
+ thread_id=9,
+ awaited_by=[
+ TaskInfo(
+ task_id=5,
+ task_name="Task-5",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="Task-6",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main2", "", 0)],
+ task_name=5
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=7,
+ task_name="Task-7",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main2", "", 0)],
+ task_name=5
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=8,
+ task_name="Task-8",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main2", "", 0)],
+ task_name=5
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(
+ thread_id=10,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=2,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=1
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="Task-3",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=1
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="Task-4",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=1
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=11, awaited_by=[]),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ),
+ (
+ [
+ [
+ "└── (T) Task-5",
+ " └── main2",
+ " ├── (T) Task-6",
+ " ├── (T) Task-7",
+ " └── (T) Task-8",
+ ],
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " ├── (T) Task-2",
+ " ├── (T) Task-3",
+ " └── (T) Task-4",
+ ],
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots, one of them without subtasks
+ (
+ [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-5",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ ),
+ AwaitedInfo(
+ thread_id=3,
+ awaited_by=[
+ TaskInfo(
+ task_id=4,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=5,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=4
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="Task-3",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=4
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=7,
+ task_name="Task-4",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=4
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=8, awaited_by=[]),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ]
+ ),
+ (
+ [
+ ["└── (T) Task-5"],
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " ├── (T) Task-2",
+ " ├── (T) Task-3",
+ " └── (T) Task-4",
+ ],
+ ]
+ ),
+ ],
+]
+
+TEST_INPUTS_CYCLES_TREE = [
+ [
+ # this test case contains a cycle: two tasks awaiting each other.
+ (
+ [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="a",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("awaiter2", "", 0)],
+ task_name=4
+ ),
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="b",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("awaiter", "", 0)],
+ task_name=3
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ]
+ ),
+ ([[4, 3, 4]]),
+ ],
+ [
+ # this test case contains two cycles
+ (
+ [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="A",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_b", "", 0)
+ ],
+ task_name=4
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="B",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_c", "", 0)
+ ],
+ task_name=5
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_a", "", 0)
+ ],
+ task_name=3
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=5,
+ task_name="C",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0)
+ ],
+ task_name=6
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_b", "", 0)
+ ],
+ task_name=4
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ]
+ ),
+ ([[4, 3, 4], [4, 6, 5, 4]]),
+ ],
+]
+
+TEST_INPUTS_TABLE = [
+ [
+ # test case containing a task called timer being awaited in two
+ # different subtasks part of a TaskGroup (root1 and root2) which call
+ # awaiter functions.
+ (
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="timer",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiter3", "", 0),
+ FrameInfo("awaiter2", "", 0),
+ FrameInfo("awaiter", "", 0)
+ ],
+ task_name=4
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiter1_3", "", 0),
+ FrameInfo("awaiter1_2", "", 0),
+ FrameInfo("awaiter1", "", 0)
+ ],
+ task_name=5
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiter1_3", "", 0),
+ FrameInfo("awaiter1_2", "", 0),
+ FrameInfo("awaiter1", "", 0)
+ ],
+ task_name=6
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("awaiter3", "", 0),
+ FrameInfo("awaiter2", "", 0),
+ FrameInfo("awaiter", "", 0)
+ ],
+ task_name=7
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=8,
+ task_name="root1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("main", "", 0)
+ ],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=9,
+ task_name="root2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("main", "", 0)
+ ],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="child1_1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=8
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="child2_1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=8
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=7,
+ task_name="child1_2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=9
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=5,
+ task_name="child2_2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("_aexit", "", 0),
+ FrameInfo("__aexit__", "", 0),
+ FrameInfo("blocho_caller", "", 0),
+ FrameInfo("bloch", "", 0)
+ ],
+ task_name=9
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ),
+ (
+ [
+ [1, "0x2", "Task-1", "", "", "", "0x0"],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "",
+ "awaiter3 -> awaiter2 -> awaiter",
+ "child1_1",
+ "0x4",
+ ],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "",
+ "awaiter1_3 -> awaiter1_2 -> awaiter1",
+ "child2_2",
+ "0x5",
+ ],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "",
+ "awaiter1_3 -> awaiter1_2 -> awaiter1",
+ "child2_1",
+ "0x6",
+ ],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "",
+ "awaiter3 -> awaiter2 -> awaiter",
+ "child1_2",
+ "0x7",
+ ],
+ [
+ 1,
+ "0x8",
+ "root1",
+ "",
+ "_aexit -> __aexit__ -> main",
+ "Task-1",
+ "0x2",
+ ],
+ [
+ 1,
+ "0x9",
+ "root2",
+ "",
+ "_aexit -> __aexit__ -> main",
+ "Task-1",
+ "0x2",
+ ],
+ [
+ 1,
+ "0x4",
+ "child1_1",
+ "",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root1",
+ "0x8",
+ ],
+ [
+ 1,
+ "0x6",
+ "child2_1",
+ "",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root1",
+ "0x8",
+ ],
+ [
+ 1,
+ "0x7",
+ "child1_2",
+ "",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root2",
+ "0x9",
+ ],
+ [
+ 1,
+ "0x5",
+ "child2_2",
+ "",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root2",
+ "0x9",
+ ],
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots
+ (
+ AwaitedInfo(
+ thread_id=9,
+ awaited_by=[
+ TaskInfo(
+ task_id=5,
+ task_name="Task-5",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="Task-6",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main2", "", 0)],
+ task_name=5
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=7,
+ task_name="Task-7",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main2", "", 0)],
+ task_name=5
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=8,
+ task_name="Task-8",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main2", "", 0)],
+ task_name=5
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(
+ thread_id=10,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=2,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=1
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="Task-3",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=1
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="Task-4",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=1
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=11, awaited_by=[]),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ),
+ (
+ [
+ [9, "0x5", "Task-5", "", "", "", "0x0"],
+ [9, "0x6", "Task-6", "", "main2", "Task-5", "0x5"],
+ [9, "0x7", "Task-7", "", "main2", "Task-5", "0x5"],
+ [9, "0x8", "Task-8", "", "main2", "Task-5", "0x5"],
+ [10, "0x1", "Task-1", "", "", "", "0x0"],
+ [10, "0x2", "Task-2", "", "main", "Task-1", "0x1"],
+ [10, "0x3", "Task-3", "", "main", "Task-1", "0x1"],
+ [10, "0x4", "Task-4", "", "main", "Task-1", "0x1"],
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots, one of them without subtasks
+ (
+ [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-5",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ ),
+ AwaitedInfo(
+ thread_id=3,
+ awaited_by=[
+ TaskInfo(
+ task_id=4,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=5,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=4
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="Task-3",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=4
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=7,
+ task_name="Task-4",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=4
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=8, awaited_by=[]),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ]
+ ),
+ (
+ [
+ [1, "0x2", "Task-5", "", "", "", "0x0"],
+ [3, "0x4", "Task-1", "", "", "", "0x0"],
+ [3, "0x5", "Task-2", "", "main", "Task-1", "0x4"],
+ [3, "0x6", "Task-3", "", "main", "Task-1", "0x4"],
+ [3, "0x7", "Task-4", "", "main", "Task-1", "0x4"],
+ ]
+ ),
+ ],
+ # CASES WITH CYCLES
+ [
+ # this test case contains a cycle: two tasks awaiting each other.
+ (
+ [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="a",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("awaiter2", "", 0)],
+ task_name=4
+ ),
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="b",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("awaiter", "", 0)],
+ task_name=3
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ]
+ ),
+ (
+ [
+ [1, "0x2", "Task-1", "", "", "", "0x0"],
+ [1, "0x3", "a", "", "awaiter2", "b", "0x4"],
+ [1, "0x3", "a", "", "main", "Task-1", "0x2"],
+ [1, "0x4", "b", "", "awaiter", "a", "0x3"],
+ ]
+ ),
+ ],
+ [
+ # this test case contains two cycles
+ (
+ [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="A",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_b", "", 0)
+ ],
+ task_name=4
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="B",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_c", "", 0)
+ ],
+ task_name=5
+ ),
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_a", "", 0)
+ ],
+ task_name=3
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=5,
+ task_name="C",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0)
+ ],
+ task_name=6
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=6,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("nested", "", 0),
+ FrameInfo("nested", "", 0),
+ FrameInfo("task_b", "", 0)
+ ],
+ task_name=4
+ )
+ ]
+ )
+ ]
+ ),
+ AwaitedInfo(thread_id=0, awaited_by=[])
+ ]
+ ),
+ (
+ [
+ [1, "0x2", "Task-1", "", "", "", "0x0"],
+ [
+ 1,
+ "0x3",
+ "A",
+ "",
+ "nested -> nested -> task_b",
+ "B",
+ "0x4",
+ ],
+ [
+ 1,
+ "0x4",
+ "B",
+ "",
+ "nested -> nested -> task_c",
+ "C",
+ "0x5",
+ ],
+ [
+ 1,
+ "0x4",
+ "B",
+ "",
+ "nested -> nested -> task_a",
+ "A",
+ "0x3",
+ ],
+ [
+ 1,
+ "0x5",
+ "C",
+ "",
+ "nested -> nested",
+ "Task-2",
+ "0x6",
+ ],
+ [
+ 1,
+ "0x6",
+ "Task-2",
+ "",
+ "nested -> nested -> task_b",
+ "B",
+ "0x4",
+ ],
+ ]
+ ),
+ ],
+]
+
+
+class TestAsyncioToolsTree(unittest.TestCase):
+ def test_asyncio_utils(self):
+ for input_, tree in TEST_INPUTS_TREE:
+ with self.subTest(input_):
+ result = tools.build_async_tree(input_)
+ self.assertEqual(result, tree)
+
+ def test_asyncio_utils_cycles(self):
+ for input_, cycles in TEST_INPUTS_CYCLES_TREE:
+ with self.subTest(input_):
+ try:
+ tools.build_async_tree(input_)
+ except tools.CycleFoundException as e:
+ self.assertEqual(e.cycles, cycles)
+
+
+class TestAsyncioToolsTable(unittest.TestCase):
+ def test_asyncio_utils(self):
+ for input_, table in TEST_INPUTS_TABLE:
+ with self.subTest(input_):
+ result = tools.build_task_table(input_)
+ self.assertEqual(result, table)
+
+
+class TestAsyncioToolsBasic(unittest.TestCase):
+ def test_empty_input_tree(self):
+ """Test build_async_tree with empty input."""
+ result = []
+ expected_output = []
+ self.assertEqual(tools.build_async_tree(result), expected_output)
+
+ def test_empty_input_table(self):
+ """Test build_task_table with empty input."""
+ result = []
+ expected_output = []
+ self.assertEqual(tools.build_task_table(result), expected_output)
+
+ def test_only_independent_tasks_tree(self):
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=10,
+ task_name="taskA",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=11,
+ task_name="taskB",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ expected = [["└── (T) taskA"], ["└── (T) taskB"]]
+ result = tools.build_async_tree(input_)
+ self.assertEqual(sorted(result), sorted(expected))
+
+ def test_only_independent_tasks_table(self):
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=10,
+ task_name="taskA",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=11,
+ task_name="taskB",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ self.assertEqual(
+ tools.build_task_table(input_),
+ [[1, '0xa', 'taskA', '', '', '', '0x0'], [1, '0xb', 'taskB', '', '', '', '0x0']]
+ )
+
+ def test_single_task_tree(self):
+ """Test build_async_tree with a single task and no awaits."""
+ result = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ expected_output = [
+ [
+ "└── (T) Task-1",
+ ]
+ ]
+ self.assertEqual(tools.build_async_tree(result), expected_output)
+
+ def test_single_task_table(self):
+ """Test build_task_table with a single task and no awaits."""
+ result = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ expected_output = [[1, '0x2', 'Task-1', '', '', '', '0x0']]
+ self.assertEqual(tools.build_task_table(result), expected_output)
+
+ def test_cycle_detection(self):
+ """Test build_async_tree raises CycleFoundException for cyclic input."""
+ result = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=3
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=2
+ )
+ ]
+ )
+ ]
+ )
+ ]
+ with self.assertRaises(tools.CycleFoundException) as context:
+ tools.build_async_tree(result)
+ self.assertEqual(context.exception.cycles, [[3, 2, 3]])
+
+ def test_complex_tree(self):
+ """Test build_async_tree with a more complex tree structure."""
+ result = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="Task-3",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=3
+ )
+ ]
+ )
+ ]
+ )
+ ]
+ expected_output = [
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " └── (T) Task-2",
+ " └── main",
+ " └── (T) Task-3",
+ ]
+ ]
+ self.assertEqual(tools.build_async_tree(result), expected_output)
+
+ def test_complex_table(self):
+ """Test build_task_table with a more complex tree structure."""
+ result = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=2,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=4,
+ task_name="Task-3",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("main", "", 0)],
+ task_name=3
+ )
+ ]
+ )
+ ]
+ )
+ ]
+ expected_output = [
+ [1, '0x2', 'Task-1', '', '', '', '0x0'],
+ [1, '0x3', 'Task-2', '', 'main', 'Task-1', '0x2'],
+ [1, '0x4', 'Task-3', '', 'main', 'Task-2', '0x3']
+ ]
+ self.assertEqual(tools.build_task_table(result), expected_output)
+
+ def test_deep_coroutine_chain(self):
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=10,
+ task_name="leaf",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("c1", "", 0),
+ FrameInfo("c2", "", 0),
+ FrameInfo("c3", "", 0),
+ FrameInfo("c4", "", 0),
+ FrameInfo("c5", "", 0)
+ ],
+ task_name=11
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=11,
+ task_name="root",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ expected = [
+ [
+ "└── (T) root",
+ " └── c5",
+ " └── c4",
+ " └── c3",
+ " └── c2",
+ " └── c1",
+ " └── (T) leaf",
+ ]
+ ]
+ result = tools.build_async_tree(input_)
+ self.assertEqual(result, expected)
+
+ def test_multiple_cycles_same_node(self):
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="Task-A",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("call1", "", 0)],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=2,
+ task_name="Task-B",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("call2", "", 0)],
+ task_name=3
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="Task-C",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("call3", "", 0)],
+ task_name=1
+ ),
+ CoroInfo(
+ call_stack=[FrameInfo("call4", "", 0)],
+ task_name=2
+ )
+ ]
+ )
+ ]
+ )
+ ]
+ with self.assertRaises(tools.CycleFoundException) as ctx:
+ tools.build_async_tree(input_)
+ cycles = ctx.exception.cycles
+ self.assertTrue(any(set(c) == {1, 2, 3} for c in cycles))
+
+ def test_table_output_format(self):
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="Task-A",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("foo", "", 0)],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=2,
+ task_name="Task-B",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ table = tools.build_task_table(input_)
+ for row in table:
+ self.assertEqual(len(row), 7)
+ self.assertIsInstance(row[0], int) # thread ID
+ self.assertTrue(
+ isinstance(row[1], str) and row[1].startswith("0x")
+ ) # hex task ID
+ self.assertIsInstance(row[2], str) # task name
+ self.assertIsInstance(row[3], str) # coroutine stack
+ self.assertIsInstance(row[4], str) # coroutine chain
+ self.assertIsInstance(row[5], str) # awaiter name
+ self.assertTrue(
+ isinstance(row[6], str) and row[6].startswith("0x")
+ ) # hex awaiter ID
+
+
+class TestAsyncioToolsEdgeCases(unittest.TestCase):
+
+ def test_task_awaits_self(self):
+ """A task directly awaits itself - should raise a cycle."""
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="Self-Awaiter",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("loopback", "", 0)],
+ task_name=1
+ )
+ ]
+ )
+ ]
+ )
+ ]
+ with self.assertRaises(tools.CycleFoundException) as ctx:
+ tools.build_async_tree(input_)
+ self.assertIn([1, 1], ctx.exception.cycles)
+
+ def test_task_with_missing_awaiter_id(self):
+ """Awaiter ID not in task list - should not crash, just show 'Unknown'."""
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="Task-A",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("coro", "", 0)],
+ task_name=999
+ )
+ ]
+ )
+ ]
+ )
+ ]
+ table = tools.build_task_table(input_)
+ self.assertEqual(len(table), 1)
+ self.assertEqual(table[0][5], "Unknown")
+
+ def test_duplicate_coroutine_frames(self):
+ """Same coroutine frame repeated under a parent - should deduplicate."""
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="Task-1",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("frameA", "", 0)],
+ task_name=2
+ ),
+ CoroInfo(
+ call_stack=[FrameInfo("frameA", "", 0)],
+ task_name=3
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=2,
+ task_name="Task-2",
+ coroutine_stack=[],
+ awaited_by=[]
+ ),
+ TaskInfo(
+ task_id=3,
+ task_name="Task-3",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ tree = tools.build_async_tree(input_)
+ # Both children should be under the same coroutine node
+ flat = "\n".join(tree[0])
+ self.assertIn("frameA", flat)
+ self.assertIn("Task-2", flat)
+ self.assertIn("Task-1", flat)
+
+ flat = "\n".join(tree[1])
+ self.assertIn("frameA", flat)
+ self.assertIn("Task-3", flat)
+ self.assertIn("Task-1", flat)
+
+ def test_task_with_no_name(self):
+ """Task with no name in id2name - should still render with fallback."""
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="root",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[FrameInfo("f1", "", 0)],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=2,
+ task_name=None,
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ # If name is None, fallback to string should not crash
+ tree = tools.build_async_tree(input_)
+ self.assertIn("(T) None", "\n".join(tree[0]))
+
+ def test_tree_rendering_with_custom_emojis(self):
+ """Pass custom emojis to the tree renderer."""
+ input_ = [
+ AwaitedInfo(
+ thread_id=1,
+ awaited_by=[
+ TaskInfo(
+ task_id=1,
+ task_name="MainTask",
+ coroutine_stack=[],
+ awaited_by=[
+ CoroInfo(
+ call_stack=[
+ FrameInfo("f1", "", 0),
+ FrameInfo("f2", "", 0)
+ ],
+ task_name=2
+ )
+ ]
+ ),
+ TaskInfo(
+ task_id=2,
+ task_name="SubTask",
+ coroutine_stack=[],
+ awaited_by=[]
+ )
+ ]
+ )
+ ]
+ tree = tools.build_async_tree(input_, task_emoji="🧵", cor_emoji="🔁")
+ flat = "\n".join(tree[0])
+ self.assertIn("🧵 MainTask", flat)
+ self.assertIn("🔁 f1", flat)
+ self.assertIn("🔁 f2", flat)
+ self.assertIn("🧵 SubTask", flat)