diff options
author | Yury Selivanov <yury@edgedb.com> | 2025-01-22 08:25:29 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-01-22 17:25:29 +0100 |
commit | 188598851d5cf475fa57b4ec21c0e88ce9316ff0 (patch) | |
tree | cd8face9dc12d1d001503aa2e502d06ac391295d /Lib/asyncio/tasks.py | |
parent | 60a3a0dd6fe140fdc87f6e769ee5bb17d92efe4e (diff) | |
download | cpython-188598851d5cf475fa57b4ec21c0e88ce9316ff0.tar.gz cpython-188598851d5cf475fa57b4ec21c0e88ce9316ff0.zip |
GH-91048: Add utils for capturing async call stack for asyncio programs and enable profiling (#124640)
Signed-off-by: Pablo Galindo <pablogsal@gmail.com>
Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
Co-authored-by: Ćukasz Langa <lukasz@langa.pl>
Co-authored-by: Savannah Ostrowski <savannahostrowski@gmail.com>
Co-authored-by: Jacob Coffee <jacob@z7x.org>
Co-authored-by: Irit Katriel <1055913+iritkatriel@users.noreply.github.com>
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r-- | Lib/asyncio/tasks.py | 30 |
1 files changed, 26 insertions, 4 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 2112dd4b99d..a25854cc4bd 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -322,6 +322,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._loop.call_soon( self.__step, new_exc, context=self._context) else: + futures.future_add_to_awaited_by(result, self) result._asyncio_future_blocking = False result.add_done_callback( self.__wakeup, context=self._context) @@ -356,6 +357,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): + futures.future_discard_from_awaited_by(future, self) try: future.result() except BaseException as exc: @@ -502,6 +504,7 @@ async def _wait(fs, timeout, return_when, loop): if timeout is not None: timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) + cur_task = current_task() def _on_completion(f): nonlocal counter @@ -514,9 +517,11 @@ async def _wait(fs, timeout, return_when, loop): timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) + futures.future_discard_from_awaited_by(f, cur_task) for f in fs: f.add_done_callback(_on_completion) + futures.future_add_to_awaited_by(f, cur_task) try: await waiter @@ -802,10 +807,19 @@ def gather(*coros_or_futures, return_exceptions=False): outer.set_result([]) return outer - def _done_callback(fut): + loop = events._get_running_loop() + if loop is not None: + cur_task = current_task(loop) + else: + cur_task = None + + def _done_callback(fut, cur_task=cur_task): nonlocal nfinished nfinished += 1 + if cur_task is not None: + futures.future_discard_from_awaited_by(fut, cur_task) + if outer is None or outer.done(): if not fut.cancelled(): # Mark exception retrieved. @@ -862,7 +876,6 @@ def gather(*coros_or_futures, return_exceptions=False): nfuts = 0 nfinished = 0 done_futs = [] - loop = None outer = None # bpo-46672 for arg in coros_or_futures: if arg not in arg_to_fut: @@ -875,12 +888,13 @@ def gather(*coros_or_futures, return_exceptions=False): # can't control it, disable the "destroy pending task" # warning. fut._log_destroy_pending = False - nfuts += 1 arg_to_fut[arg] = fut if fut.done(): done_futs.append(fut) else: + if cur_task is not None: + futures.future_add_to_awaited_by(fut, cur_task) fut.add_done_callback(_done_callback) else: @@ -940,7 +954,15 @@ def shield(arg): loop = futures._get_loop(inner) outer = loop.create_future() - def _inner_done_callback(inner): + if loop is not None and (cur_task := current_task(loop)) is not None: + futures.future_add_to_awaited_by(inner, cur_task) + else: + cur_task = None + + def _inner_done_callback(inner, cur_task=cur_task): + if cur_task is not None: + futures.future_discard_from_awaited_by(inner, cur_task) + if outer.cancelled(): if not inner.cancelled(): # Mark inner's result as retrieved. |