aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/__main__.py40
-rw-r--r--Lib/asyncio/base_events.py7
-rw-r--r--Lib/asyncio/base_subprocess.py7
-rw-r--r--Lib/asyncio/futures.py17
-rw-r--r--Lib/asyncio/graph.py6
-rw-r--r--Lib/asyncio/selector_events.py2
-rw-r--r--Lib/asyncio/taskgroups.py7
-rw-r--r--Lib/asyncio/tasks.py50
-rw-r--r--Lib/asyncio/tools.py260
9 files changed, 353 insertions, 43 deletions
diff --git a/Lib/asyncio/__main__.py b/Lib/asyncio/__main__.py
index 69f5a30cfe5..21ca5c5f62a 100644
--- a/Lib/asyncio/__main__.py
+++ b/Lib/asyncio/__main__.py
@@ -1,5 +1,7 @@
+import argparse
import ast
import asyncio
+import asyncio.tools
import concurrent.futures
import contextvars
import inspect
@@ -10,7 +12,7 @@ import threading
import types
import warnings
-from _colorize import can_colorize, ANSIColors # type: ignore[import-not-found]
+from _colorize import get_theme
from _pyrepl.console import InteractiveColoredConsole
from . import futures
@@ -101,8 +103,9 @@ class REPLThread(threading.Thread):
exec(startup_code, console.locals)
ps1 = getattr(sys, "ps1", ">>> ")
- if can_colorize() and CAN_USE_PYREPL:
- ps1 = f"{ANSIColors.BOLD_MAGENTA}{ps1}{ANSIColors.RESET}"
+ if CAN_USE_PYREPL:
+ theme = get_theme().syntax
+ ps1 = f"{theme.prompt}{ps1}{theme.reset}"
console.write(f"{ps1}import asyncio\n")
if CAN_USE_PYREPL:
@@ -140,6 +143,37 @@ class REPLThread(threading.Thread):
if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ prog="python3 -m asyncio",
+ description="Interactive asyncio shell and CLI tools",
+ color=True,
+ )
+ subparsers = parser.add_subparsers(help="sub-commands", dest="command")
+ ps = subparsers.add_parser(
+ "ps", help="Display a table of all pending tasks in a process"
+ )
+ ps.add_argument("pid", type=int, help="Process ID to inspect")
+ pstree = subparsers.add_parser(
+ "pstree", help="Display a tree of all pending tasks in a process"
+ )
+ pstree.add_argument("pid", type=int, help="Process ID to inspect")
+ args = parser.parse_args()
+ match args.command:
+ case "ps":
+ asyncio.tools.display_awaited_by_tasks_table(args.pid)
+ sys.exit(0)
+ case "pstree":
+ asyncio.tools.display_awaited_by_tasks_tree(args.pid)
+ sys.exit(0)
+ case None:
+ pass # continue to the interactive shell
+ case _:
+ # shouldn't happen as an invalid command-line wouldn't parse
+ # but let's keep it for the next person adding a command
+ print(f"error: unhandled command {args.command}", file=sys.stderr)
+ parser.print_usage(file=sys.stderr)
+ sys.exit(1)
+
sys.audit("cpython.run_stdin")
if os.getenv('PYTHON_BASIC_REPL'):
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 29b872ce00e..2ff9e4017bb 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -459,7 +459,7 @@ class BaseEventLoop(events.AbstractEventLoop):
return futures.Future(loop=self)
def create_task(self, coro, **kwargs):
- """Schedule a coroutine object.
+ """Schedule or begin executing a coroutine object.
Return a task object.
"""
@@ -1161,7 +1161,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ExceptionGroup("create_connection failed", exceptions)
if len(exceptions) == 1:
raise exceptions[0]
- else:
+ elif exceptions:
# If they all have the same str(), raise one.
model = str(exceptions[0])
if all(str(exc) == model for exc in exceptions):
@@ -1170,6 +1170,9 @@ class BaseEventLoop(events.AbstractEventLoop):
# the various error messages.
raise OSError('Multiple exceptions: {}'.format(
', '.join(str(exc) for exc in exceptions)))
+ else:
+ # No exceptions were collected, raise a timeout error
+ raise TimeoutError('create_connection failed')
finally:
exceptions = None
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index 9c2ba679ce2..d40af422e61 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -104,7 +104,12 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
for proto in self._pipes.values():
if proto is None:
continue
- proto.pipe.close()
+ # See gh-114177
+ # skip closing the pipe if loop is already closed
+ # this can happen e.g. when loop is closed immediately after
+ # process is killed
+ if self._loop and not self._loop.is_closed():
+ proto.pipe.close()
if (self._proc is not None and
# has the child process finished?
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index d1df6707302..6bd00a64478 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -351,22 +351,19 @@ def _set_concurrent_future_state(concurrent, source):
def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.
- The other Future may be a concurrent.futures.Future.
+ The other Future must be a concurrent.futures.Future.
"""
- assert source.done()
if dest.cancelled():
return
assert not dest.done()
- if source.cancelled():
+ done, cancelled, result, exception = source._get_snapshot()
+ assert done
+ if cancelled:
dest.cancel()
+ elif exception is not None:
+ dest.set_exception(_convert_future_exc(exception))
else:
- exception = source.exception()
- if exception is not None:
- dest.set_exception(_convert_future_exc(exception))
- else:
- result = source.result()
- dest.set_result(result)
-
+ dest.set_result(result)
def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.
diff --git a/Lib/asyncio/graph.py b/Lib/asyncio/graph.py
index d8df7c9919a..b5bfeb1630a 100644
--- a/Lib/asyncio/graph.py
+++ b/Lib/asyncio/graph.py
@@ -1,6 +1,7 @@
"""Introspection utils for tasks call graphs."""
import dataclasses
+import io
import sys
import types
@@ -16,9 +17,6 @@ __all__ = (
'FutureCallGraph',
)
-if False: # for type checkers
- from typing import TextIO
-
# Sadly, we can't re-use the traceback module's datastructures as those
# are tailored for error reporting, whereas we need to represent an
# async call graph.
@@ -270,7 +268,7 @@ def print_call_graph(
future: futures.Future | None = None,
/,
*,
- file: TextIO | None = None,
+ file: io.Writer[str] | None = None,
depth: int = 1,
limit: int | None = None,
) -> None:
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 22147451fa7..6ad84044adf 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -173,7 +173,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
# listening socket has triggered an EVENT_READ. There may be multiple
# connections waiting for an .accept() so it is called in a loop.
# See https://bugs.python.org/issue27906 for more details.
- for _ in range(backlog):
+ for _ in range(backlog + 1):
try:
conn, addr = sock.accept()
if self._debug:
diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py
index 1633478d1c8..00e8f6d5d1a 100644
--- a/Lib/asyncio/taskgroups.py
+++ b/Lib/asyncio/taskgroups.py
@@ -179,7 +179,7 @@ class TaskGroup:
exc = None
- def create_task(self, coro, *, name=None, context=None):
+ def create_task(self, coro, **kwargs):
"""Create a new task in this group and return it.
Similar to `asyncio.create_task`.
@@ -193,10 +193,7 @@ class TaskGroup:
if self._aborting:
coro.close()
raise RuntimeError(f"TaskGroup {self!r} is shutting down")
- if context is None:
- task = self._loop.create_task(coro, name=name)
- else:
- task = self._loop.create_task(coro, name=name, context=context)
+ task = self._loop.create_task(coro, **kwargs)
futures.future_add_to_awaited_by(task, self._parent_task)
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 825e91f5594..fbd5c39a7c5 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -386,19 +386,13 @@ else:
Task = _CTask = _asyncio.Task
-def create_task(coro, *, name=None, context=None):
+def create_task(coro, **kwargs):
"""Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
"""
loop = events.get_running_loop()
- if context is None:
- # Use legacy API if context is not needed
- task = loop.create_task(coro, name=name)
- else:
- task = loop.create_task(coro, name=name, context=context)
-
- return task
+ return loop.create_task(coro, **kwargs)
# wait() and as_completed() similar to those in PEP 3148.
@@ -914,6 +908,25 @@ def gather(*coros_or_futures, return_exceptions=False):
return outer
+def _log_on_exception(fut):
+ if fut.cancelled():
+ return
+
+ exc = fut.exception()
+ if exc is None:
+ return
+
+ context = {
+ 'message':
+ f'{exc.__class__.__name__} exception in shielded future',
+ 'exception': exc,
+ 'future': fut,
+ }
+ if fut._source_traceback:
+ context['source_traceback'] = fut._source_traceback
+ fut._loop.call_exception_handler(context)
+
+
def shield(arg):
"""Wait for a future, shielding it from cancellation.
@@ -959,14 +972,11 @@ def shield(arg):
else:
cur_task = None
- def _inner_done_callback(inner, cur_task=cur_task):
- if cur_task is not None:
- futures.future_discard_from_awaited_by(inner, cur_task)
+ def _clear_awaited_by_callback(inner):
+ futures.future_discard_from_awaited_by(inner, cur_task)
+ def _inner_done_callback(inner):
if outer.cancelled():
- if not inner.cancelled():
- # Mark inner's result as retrieved.
- inner.exception()
return
if inner.cancelled():
@@ -978,10 +988,16 @@ def shield(arg):
else:
outer.set_result(inner.result())
-
def _outer_done_callback(outer):
if not inner.done():
inner.remove_done_callback(_inner_done_callback)
+ # Keep only one callback to log on cancel
+ inner.remove_done_callback(_log_on_exception)
+ inner.add_done_callback(_log_on_exception)
+
+ if cur_task is not None:
+ inner.add_done_callback(_clear_awaited_by_callback)
+
inner.add_done_callback(_inner_done_callback)
outer.add_done_callback(_outer_done_callback)
@@ -1030,9 +1046,9 @@ def create_eager_task_factory(custom_task_constructor):
used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
"""
- def factory(loop, coro, *, name=None, context=None):
+ def factory(loop, coro, *, eager_start=True, **kwargs):
return custom_task_constructor(
- coro, loop=loop, name=name, context=context, eager_start=True)
+ coro, loop=loop, eager_start=eager_start, **kwargs)
return factory
diff --git a/Lib/asyncio/tools.py b/Lib/asyncio/tools.py
new file mode 100644
index 00000000000..2683f34cc71
--- /dev/null
+++ b/Lib/asyncio/tools.py
@@ -0,0 +1,260 @@
+"""Tools to analyze tasks running in asyncio programs."""
+
+from collections import defaultdict, namedtuple
+from itertools import count
+from enum import Enum
+import sys
+from _remote_debugging import RemoteUnwinder, FrameInfo
+
+class NodeType(Enum):
+ COROUTINE = 1
+ TASK = 2
+
+
+class CycleFoundException(Exception):
+ """Raised when there is a cycle when drawing the call tree."""
+ def __init__(
+ self,
+ cycles: list[list[int]],
+ id2name: dict[int, str],
+ ) -> None:
+ super().__init__(cycles, id2name)
+ self.cycles = cycles
+ self.id2name = id2name
+
+
+
+# ─── indexing helpers ───────────────────────────────────────────
+def _format_stack_entry(elem: str|FrameInfo) -> str:
+ if not isinstance(elem, str):
+ if elem.lineno == 0 and elem.filename == "":
+ return f"{elem.funcname}"
+ else:
+ return f"{elem.funcname} {elem.filename}:{elem.lineno}"
+ return elem
+
+
+def _index(result):
+ id2name, awaits, task_stacks = {}, [], {}
+ for awaited_info in result:
+ for task_info in awaited_info.awaited_by:
+ task_id = task_info.task_id
+ task_name = task_info.task_name
+ id2name[task_id] = task_name
+
+ # Store the internal coroutine stack for this task
+ if task_info.coroutine_stack:
+ for coro_info in task_info.coroutine_stack:
+ call_stack = coro_info.call_stack
+ internal_stack = [_format_stack_entry(frame) for frame in call_stack]
+ task_stacks[task_id] = internal_stack
+
+ # Add the awaited_by relationships (external dependencies)
+ if task_info.awaited_by:
+ for coro_info in task_info.awaited_by:
+ call_stack = coro_info.call_stack
+ parent_task_id = coro_info.task_name
+ stack = [_format_stack_entry(frame) for frame in call_stack]
+ awaits.append((parent_task_id, stack, task_id))
+ return id2name, awaits, task_stacks
+
+
+def _build_tree(id2name, awaits, task_stacks):
+ id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()}
+ children = defaultdict(list)
+ cor_nodes = defaultdict(dict) # Maps parent -> {frame_name: node_key}
+ next_cor_id = count(1)
+
+ def get_or_create_cor_node(parent, frame):
+ """Get existing coroutine node or create new one under parent"""
+ if frame in cor_nodes[parent]:
+ return cor_nodes[parent][frame]
+
+ node_key = (NodeType.COROUTINE, f"c{next(next_cor_id)}")
+ id2label[node_key] = frame
+ children[parent].append(node_key)
+ cor_nodes[parent][frame] = node_key
+ return node_key
+
+ # Build task dependency tree with coroutine frames
+ for parent_id, stack, child_id in awaits:
+ cur = (NodeType.TASK, parent_id)
+ for frame in reversed(stack):
+ cur = get_or_create_cor_node(cur, frame)
+
+ child_key = (NodeType.TASK, child_id)
+ if child_key not in children[cur]:
+ children[cur].append(child_key)
+
+ # Add coroutine stacks for leaf tasks
+ awaiting_tasks = {parent_id for parent_id, _, _ in awaits}
+ for task_id in id2name:
+ if task_id not in awaiting_tasks and task_id in task_stacks:
+ cur = (NodeType.TASK, task_id)
+ for frame in reversed(task_stacks[task_id]):
+ cur = get_or_create_cor_node(cur, frame)
+
+ return id2label, children
+
+
+def _roots(id2label, children):
+ all_children = {c for kids in children.values() for c in kids}
+ return [n for n in id2label if n not in all_children]
+
+# ─── detect cycles in the task-to-task graph ───────────────────────
+def _task_graph(awaits):
+ """Return {parent_task_id: {child_task_id, …}, …}."""
+ g = defaultdict(set)
+ for parent_id, _stack, child_id in awaits:
+ g[parent_id].add(child_id)
+ return g
+
+
+def _find_cycles(graph):
+ """
+ Depth-first search for back-edges.
+
+ Returns a list of cycles (each cycle is a list of task-ids) or an
+ empty list if the graph is acyclic.
+ """
+ WHITE, GREY, BLACK = 0, 1, 2
+ color = defaultdict(lambda: WHITE)
+ path, cycles = [], []
+
+ def dfs(v):
+ color[v] = GREY
+ path.append(v)
+ for w in graph.get(v, ()):
+ if color[w] == WHITE:
+ dfs(w)
+ elif color[w] == GREY: # back-edge → cycle!
+ i = path.index(w)
+ cycles.append(path[i:] + [w]) # make a copy
+ color[v] = BLACK
+ path.pop()
+
+ for v in list(graph):
+ if color[v] == WHITE:
+ dfs(v)
+ return cycles
+
+
+# ─── PRINT TREE FUNCTION ───────────────────────────────────────
+def get_all_awaited_by(pid):
+ unwinder = RemoteUnwinder(pid)
+ return unwinder.get_all_awaited_by()
+
+
+def build_async_tree(result, task_emoji="(T)", cor_emoji=""):
+ """
+ Build a list of strings for pretty-print an async call tree.
+
+ The call tree is produced by `get_all_async_stacks()`, prefixing tasks
+ with `task_emoji` and coroutine frames with `cor_emoji`.
+ """
+ id2name, awaits, task_stacks = _index(result)
+ g = _task_graph(awaits)
+ cycles = _find_cycles(g)
+ if cycles:
+ raise CycleFoundException(cycles, id2name)
+ labels, children = _build_tree(id2name, awaits, task_stacks)
+
+ def pretty(node):
+ flag = task_emoji if node[0] == NodeType.TASK else cor_emoji
+ return f"{flag} {labels[node]}"
+
+ def render(node, prefix="", last=True, buf=None):
+ if buf is None:
+ buf = []
+ buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}")
+ new_pref = prefix + (" " if last else "│ ")
+ kids = children.get(node, [])
+ for i, kid in enumerate(kids):
+ render(kid, new_pref, i == len(kids) - 1, buf)
+ return buf
+
+ return [render(root) for root in _roots(labels, children)]
+
+
+def build_task_table(result):
+ id2name, _, _ = _index(result)
+ table = []
+
+ for awaited_info in result:
+ thread_id = awaited_info.thread_id
+ for task_info in awaited_info.awaited_by:
+ # Get task info
+ task_id = task_info.task_id
+ task_name = task_info.task_name
+
+ # Build coroutine stack string
+ frames = [frame for coro in task_info.coroutine_stack
+ for frame in coro.call_stack]
+ coro_stack = " -> ".join(_format_stack_entry(x).split(" ")[0]
+ for x in frames)
+
+ # Handle tasks with no awaiters
+ if not task_info.awaited_by:
+ table.append([thread_id, hex(task_id), task_name, coro_stack,
+ "", "", "0x0"])
+ continue
+
+ # Handle tasks with awaiters
+ for coro_info in task_info.awaited_by:
+ parent_id = coro_info.task_name
+ awaiter_frames = [_format_stack_entry(x).split(" ")[0]
+ for x in coro_info.call_stack]
+ awaiter_chain = " -> ".join(awaiter_frames)
+ awaiter_name = id2name.get(parent_id, "Unknown")
+ parent_id_str = (hex(parent_id) if isinstance(parent_id, int)
+ else str(parent_id))
+
+ table.append([thread_id, hex(task_id), task_name, coro_stack,
+ awaiter_chain, awaiter_name, parent_id_str])
+
+ return table
+
+def _print_cycle_exception(exception: CycleFoundException):
+ print("ERROR: await-graph contains cycles - cannot print a tree!", file=sys.stderr)
+ print("", file=sys.stderr)
+ for c in exception.cycles:
+ inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c)
+ print(f"cycle: {inames}", file=sys.stderr)
+
+
+def _get_awaited_by_tasks(pid: int) -> list:
+ try:
+ return get_all_awaited_by(pid)
+ except RuntimeError as e:
+ while e.__context__ is not None:
+ e = e.__context__
+ print(f"Error retrieving tasks: {e}")
+ sys.exit(1)
+
+
+def display_awaited_by_tasks_table(pid: int) -> None:
+ """Build and print a table of all pending tasks under `pid`."""
+
+ tasks = _get_awaited_by_tasks(pid)
+ table = build_task_table(tasks)
+ # Print the table in a simple tabular format
+ print(
+ f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine stack':<50} {'awaiter chain':<50} {'awaiter name':<15} {'awaiter id':<15}"
+ )
+ print("-" * 180)
+ for row in table:
+ print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<50} {row[5]:<15} {row[6]:<15}")
+
+
+def display_awaited_by_tasks_tree(pid: int) -> None:
+ """Build and print a tree of all pending tasks under `pid`."""
+
+ tasks = _get_awaited_by_tasks(pid)
+ try:
+ result = build_async_tree(tasks)
+ except CycleFoundException as e:
+ _print_cycle_exception(e)
+ sys.exit(1)
+
+ for tree in result:
+ print("\n".join(tree))