diff options
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/__main__.py | 40 | ||||
-rw-r--r-- | Lib/asyncio/base_events.py | 7 | ||||
-rw-r--r-- | Lib/asyncio/base_subprocess.py | 7 | ||||
-rw-r--r-- | Lib/asyncio/futures.py | 17 | ||||
-rw-r--r-- | Lib/asyncio/graph.py | 6 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/taskgroups.py | 7 | ||||
-rw-r--r-- | Lib/asyncio/tasks.py | 50 | ||||
-rw-r--r-- | Lib/asyncio/tools.py | 260 |
9 files changed, 353 insertions, 43 deletions
diff --git a/Lib/asyncio/__main__.py b/Lib/asyncio/__main__.py index 69f5a30cfe5..21ca5c5f62a 100644 --- a/Lib/asyncio/__main__.py +++ b/Lib/asyncio/__main__.py @@ -1,5 +1,7 @@ +import argparse import ast import asyncio +import asyncio.tools import concurrent.futures import contextvars import inspect @@ -10,7 +12,7 @@ import threading import types import warnings -from _colorize import can_colorize, ANSIColors # type: ignore[import-not-found] +from _colorize import get_theme from _pyrepl.console import InteractiveColoredConsole from . import futures @@ -101,8 +103,9 @@ class REPLThread(threading.Thread): exec(startup_code, console.locals) ps1 = getattr(sys, "ps1", ">>> ") - if can_colorize() and CAN_USE_PYREPL: - ps1 = f"{ANSIColors.BOLD_MAGENTA}{ps1}{ANSIColors.RESET}" + if CAN_USE_PYREPL: + theme = get_theme().syntax + ps1 = f"{theme.prompt}{ps1}{theme.reset}" console.write(f"{ps1}import asyncio\n") if CAN_USE_PYREPL: @@ -140,6 +143,37 @@ class REPLThread(threading.Thread): if __name__ == '__main__': + parser = argparse.ArgumentParser( + prog="python3 -m asyncio", + description="Interactive asyncio shell and CLI tools", + color=True, + ) + subparsers = parser.add_subparsers(help="sub-commands", dest="command") + ps = subparsers.add_parser( + "ps", help="Display a table of all pending tasks in a process" + ) + ps.add_argument("pid", type=int, help="Process ID to inspect") + pstree = subparsers.add_parser( + "pstree", help="Display a tree of all pending tasks in a process" + ) + pstree.add_argument("pid", type=int, help="Process ID to inspect") + args = parser.parse_args() + match args.command: + case "ps": + asyncio.tools.display_awaited_by_tasks_table(args.pid) + sys.exit(0) + case "pstree": + asyncio.tools.display_awaited_by_tasks_tree(args.pid) + sys.exit(0) + case None: + pass # continue to the interactive shell + case _: + # shouldn't happen as an invalid command-line wouldn't parse + # but let's keep it for the next person adding a command + print(f"error: unhandled command {args.command}", file=sys.stderr) + parser.print_usage(file=sys.stderr) + sys.exit(1) + sys.audit("cpython.run_stdin") if os.getenv('PYTHON_BASIC_REPL'): diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 29b872ce00e..2ff9e4017bb 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -459,7 +459,7 @@ class BaseEventLoop(events.AbstractEventLoop): return futures.Future(loop=self) def create_task(self, coro, **kwargs): - """Schedule a coroutine object. + """Schedule or begin executing a coroutine object. Return a task object. """ @@ -1161,7 +1161,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise ExceptionGroup("create_connection failed", exceptions) if len(exceptions) == 1: raise exceptions[0] - else: + elif exceptions: # If they all have the same str(), raise one. model = str(exceptions[0]) if all(str(exc) == model for exc in exceptions): @@ -1170,6 +1170,9 @@ class BaseEventLoop(events.AbstractEventLoop): # the various error messages. raise OSError('Multiple exceptions: {}'.format( ', '.join(str(exc) for exc in exceptions))) + else: + # No exceptions were collected, raise a timeout error + raise TimeoutError('create_connection failed') finally: exceptions = None diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 9c2ba679ce2..d40af422e61 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -104,7 +104,12 @@ class BaseSubprocessTransport(transports.SubprocessTransport): for proto in self._pipes.values(): if proto is None: continue - proto.pipe.close() + # See gh-114177 + # skip closing the pipe if loop is already closed + # this can happen e.g. when loop is closed immediately after + # process is killed + if self._loop and not self._loop.is_closed(): + proto.pipe.close() if (self._proc is not None and # has the child process finished? diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index d1df6707302..6bd00a64478 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -351,22 +351,19 @@ def _set_concurrent_future_state(concurrent, source): def _copy_future_state(source, dest): """Internal helper to copy state from another Future. - The other Future may be a concurrent.futures.Future. + The other Future must be a concurrent.futures.Future. """ - assert source.done() if dest.cancelled(): return assert not dest.done() - if source.cancelled(): + done, cancelled, result, exception = source._get_snapshot() + assert done + if cancelled: dest.cancel() + elif exception is not None: + dest.set_exception(_convert_future_exc(exception)) else: - exception = source.exception() - if exception is not None: - dest.set_exception(_convert_future_exc(exception)) - else: - result = source.result() - dest.set_result(result) - + dest.set_result(result) def _chain_future(source, destination): """Chain two futures so that when one completes, so does the other. diff --git a/Lib/asyncio/graph.py b/Lib/asyncio/graph.py index d8df7c9919a..b5bfeb1630a 100644 --- a/Lib/asyncio/graph.py +++ b/Lib/asyncio/graph.py @@ -1,6 +1,7 @@ """Introspection utils for tasks call graphs.""" import dataclasses +import io import sys import types @@ -16,9 +17,6 @@ __all__ = ( 'FutureCallGraph', ) -if False: # for type checkers - from typing import TextIO - # Sadly, we can't re-use the traceback module's datastructures as those # are tailored for error reporting, whereas we need to represent an # async call graph. @@ -270,7 +268,7 @@ def print_call_graph( future: futures.Future | None = None, /, *, - file: TextIO | None = None, + file: io.Writer[str] | None = None, depth: int = 1, limit: int | None = None, ) -> None: diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 22147451fa7..6ad84044adf 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -173,7 +173,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # listening socket has triggered an EVENT_READ. There may be multiple # connections waiting for an .accept() so it is called in a loop. # See https://bugs.python.org/issue27906 for more details. - for _ in range(backlog): + for _ in range(backlog + 1): try: conn, addr = sock.accept() if self._debug: diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 1633478d1c8..00e8f6d5d1a 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -179,7 +179,7 @@ class TaskGroup: exc = None - def create_task(self, coro, *, name=None, context=None): + def create_task(self, coro, **kwargs): """Create a new task in this group and return it. Similar to `asyncio.create_task`. @@ -193,10 +193,7 @@ class TaskGroup: if self._aborting: coro.close() raise RuntimeError(f"TaskGroup {self!r} is shutting down") - if context is None: - task = self._loop.create_task(coro, name=name) - else: - task = self._loop.create_task(coro, name=name, context=context) + task = self._loop.create_task(coro, **kwargs) futures.future_add_to_awaited_by(task, self._parent_task) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 825e91f5594..fbd5c39a7c5 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -386,19 +386,13 @@ else: Task = _CTask = _asyncio.Task -def create_task(coro, *, name=None, context=None): +def create_task(coro, **kwargs): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() - if context is None: - # Use legacy API if context is not needed - task = loop.create_task(coro, name=name) - else: - task = loop.create_task(coro, name=name, context=context) - - return task + return loop.create_task(coro, **kwargs) # wait() and as_completed() similar to those in PEP 3148. @@ -914,6 +908,25 @@ def gather(*coros_or_futures, return_exceptions=False): return outer +def _log_on_exception(fut): + if fut.cancelled(): + return + + exc = fut.exception() + if exc is None: + return + + context = { + 'message': + f'{exc.__class__.__name__} exception in shielded future', + 'exception': exc, + 'future': fut, + } + if fut._source_traceback: + context['source_traceback'] = fut._source_traceback + fut._loop.call_exception_handler(context) + + def shield(arg): """Wait for a future, shielding it from cancellation. @@ -959,14 +972,11 @@ def shield(arg): else: cur_task = None - def _inner_done_callback(inner, cur_task=cur_task): - if cur_task is not None: - futures.future_discard_from_awaited_by(inner, cur_task) + def _clear_awaited_by_callback(inner): + futures.future_discard_from_awaited_by(inner, cur_task) + def _inner_done_callback(inner): if outer.cancelled(): - if not inner.cancelled(): - # Mark inner's result as retrieved. - inner.exception() return if inner.cancelled(): @@ -978,10 +988,16 @@ def shield(arg): else: outer.set_result(inner.result()) - def _outer_done_callback(outer): if not inner.done(): inner.remove_done_callback(_inner_done_callback) + # Keep only one callback to log on cancel + inner.remove_done_callback(_log_on_exception) + inner.add_done_callback(_log_on_exception) + + if cur_task is not None: + inner.add_done_callback(_clear_awaited_by_callback) + inner.add_done_callback(_inner_done_callback) outer.add_done_callback(_outer_done_callback) @@ -1030,9 +1046,9 @@ def create_eager_task_factory(custom_task_constructor): used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. """ - def factory(loop, coro, *, name=None, context=None): + def factory(loop, coro, *, eager_start=True, **kwargs): return custom_task_constructor( - coro, loop=loop, name=name, context=context, eager_start=True) + coro, loop=loop, eager_start=eager_start, **kwargs) return factory diff --git a/Lib/asyncio/tools.py b/Lib/asyncio/tools.py new file mode 100644 index 00000000000..2683f34cc71 --- /dev/null +++ b/Lib/asyncio/tools.py @@ -0,0 +1,260 @@ +"""Tools to analyze tasks running in asyncio programs.""" + +from collections import defaultdict, namedtuple +from itertools import count +from enum import Enum +import sys +from _remote_debugging import RemoteUnwinder, FrameInfo + +class NodeType(Enum): + COROUTINE = 1 + TASK = 2 + + +class CycleFoundException(Exception): + """Raised when there is a cycle when drawing the call tree.""" + def __init__( + self, + cycles: list[list[int]], + id2name: dict[int, str], + ) -> None: + super().__init__(cycles, id2name) + self.cycles = cycles + self.id2name = id2name + + + +# ─── indexing helpers ─────────────────────────────────────────── +def _format_stack_entry(elem: str|FrameInfo) -> str: + if not isinstance(elem, str): + if elem.lineno == 0 and elem.filename == "": + return f"{elem.funcname}" + else: + return f"{elem.funcname} {elem.filename}:{elem.lineno}" + return elem + + +def _index(result): + id2name, awaits, task_stacks = {}, [], {} + for awaited_info in result: + for task_info in awaited_info.awaited_by: + task_id = task_info.task_id + task_name = task_info.task_name + id2name[task_id] = task_name + + # Store the internal coroutine stack for this task + if task_info.coroutine_stack: + for coro_info in task_info.coroutine_stack: + call_stack = coro_info.call_stack + internal_stack = [_format_stack_entry(frame) for frame in call_stack] + task_stacks[task_id] = internal_stack + + # Add the awaited_by relationships (external dependencies) + if task_info.awaited_by: + for coro_info in task_info.awaited_by: + call_stack = coro_info.call_stack + parent_task_id = coro_info.task_name + stack = [_format_stack_entry(frame) for frame in call_stack] + awaits.append((parent_task_id, stack, task_id)) + return id2name, awaits, task_stacks + + +def _build_tree(id2name, awaits, task_stacks): + id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()} + children = defaultdict(list) + cor_nodes = defaultdict(dict) # Maps parent -> {frame_name: node_key} + next_cor_id = count(1) + + def get_or_create_cor_node(parent, frame): + """Get existing coroutine node or create new one under parent""" + if frame in cor_nodes[parent]: + return cor_nodes[parent][frame] + + node_key = (NodeType.COROUTINE, f"c{next(next_cor_id)}") + id2label[node_key] = frame + children[parent].append(node_key) + cor_nodes[parent][frame] = node_key + return node_key + + # Build task dependency tree with coroutine frames + for parent_id, stack, child_id in awaits: + cur = (NodeType.TASK, parent_id) + for frame in reversed(stack): + cur = get_or_create_cor_node(cur, frame) + + child_key = (NodeType.TASK, child_id) + if child_key not in children[cur]: + children[cur].append(child_key) + + # Add coroutine stacks for leaf tasks + awaiting_tasks = {parent_id for parent_id, _, _ in awaits} + for task_id in id2name: + if task_id not in awaiting_tasks and task_id in task_stacks: + cur = (NodeType.TASK, task_id) + for frame in reversed(task_stacks[task_id]): + cur = get_or_create_cor_node(cur, frame) + + return id2label, children + + +def _roots(id2label, children): + all_children = {c for kids in children.values() for c in kids} + return [n for n in id2label if n not in all_children] + +# ─── detect cycles in the task-to-task graph ─────────────────────── +def _task_graph(awaits): + """Return {parent_task_id: {child_task_id, …}, …}.""" + g = defaultdict(set) + for parent_id, _stack, child_id in awaits: + g[parent_id].add(child_id) + return g + + +def _find_cycles(graph): + """ + Depth-first search for back-edges. + + Returns a list of cycles (each cycle is a list of task-ids) or an + empty list if the graph is acyclic. + """ + WHITE, GREY, BLACK = 0, 1, 2 + color = defaultdict(lambda: WHITE) + path, cycles = [], [] + + def dfs(v): + color[v] = GREY + path.append(v) + for w in graph.get(v, ()): + if color[w] == WHITE: + dfs(w) + elif color[w] == GREY: # back-edge → cycle! + i = path.index(w) + cycles.append(path[i:] + [w]) # make a copy + color[v] = BLACK + path.pop() + + for v in list(graph): + if color[v] == WHITE: + dfs(v) + return cycles + + +# ─── PRINT TREE FUNCTION ─────────────────────────────────────── +def get_all_awaited_by(pid): + unwinder = RemoteUnwinder(pid) + return unwinder.get_all_awaited_by() + + +def build_async_tree(result, task_emoji="(T)", cor_emoji=""): + """ + Build a list of strings for pretty-print an async call tree. + + The call tree is produced by `get_all_async_stacks()`, prefixing tasks + with `task_emoji` and coroutine frames with `cor_emoji`. + """ + id2name, awaits, task_stacks = _index(result) + g = _task_graph(awaits) + cycles = _find_cycles(g) + if cycles: + raise CycleFoundException(cycles, id2name) + labels, children = _build_tree(id2name, awaits, task_stacks) + + def pretty(node): + flag = task_emoji if node[0] == NodeType.TASK else cor_emoji + return f"{flag} {labels[node]}" + + def render(node, prefix="", last=True, buf=None): + if buf is None: + buf = [] + buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}") + new_pref = prefix + (" " if last else "│ ") + kids = children.get(node, []) + for i, kid in enumerate(kids): + render(kid, new_pref, i == len(kids) - 1, buf) + return buf + + return [render(root) for root in _roots(labels, children)] + + +def build_task_table(result): + id2name, _, _ = _index(result) + table = [] + + for awaited_info in result: + thread_id = awaited_info.thread_id + for task_info in awaited_info.awaited_by: + # Get task info + task_id = task_info.task_id + task_name = task_info.task_name + + # Build coroutine stack string + frames = [frame for coro in task_info.coroutine_stack + for frame in coro.call_stack] + coro_stack = " -> ".join(_format_stack_entry(x).split(" ")[0] + for x in frames) + + # Handle tasks with no awaiters + if not task_info.awaited_by: + table.append([thread_id, hex(task_id), task_name, coro_stack, + "", "", "0x0"]) + continue + + # Handle tasks with awaiters + for coro_info in task_info.awaited_by: + parent_id = coro_info.task_name + awaiter_frames = [_format_stack_entry(x).split(" ")[0] + for x in coro_info.call_stack] + awaiter_chain = " -> ".join(awaiter_frames) + awaiter_name = id2name.get(parent_id, "Unknown") + parent_id_str = (hex(parent_id) if isinstance(parent_id, int) + else str(parent_id)) + + table.append([thread_id, hex(task_id), task_name, coro_stack, + awaiter_chain, awaiter_name, parent_id_str]) + + return table + +def _print_cycle_exception(exception: CycleFoundException): + print("ERROR: await-graph contains cycles - cannot print a tree!", file=sys.stderr) + print("", file=sys.stderr) + for c in exception.cycles: + inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c) + print(f"cycle: {inames}", file=sys.stderr) + + +def _get_awaited_by_tasks(pid: int) -> list: + try: + return get_all_awaited_by(pid) + except RuntimeError as e: + while e.__context__ is not None: + e = e.__context__ + print(f"Error retrieving tasks: {e}") + sys.exit(1) + + +def display_awaited_by_tasks_table(pid: int) -> None: + """Build and print a table of all pending tasks under `pid`.""" + + tasks = _get_awaited_by_tasks(pid) + table = build_task_table(tasks) + # Print the table in a simple tabular format + print( + f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine stack':<50} {'awaiter chain':<50} {'awaiter name':<15} {'awaiter id':<15}" + ) + print("-" * 180) + for row in table: + print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<50} {row[5]:<15} {row[6]:<15}") + + +def display_awaited_by_tasks_tree(pid: int) -> None: + """Build and print a tree of all pending tasks under `pid`.""" + + tasks = _get_awaited_by_tasks(pid) + try: + result = build_async_tree(tasks) + except CycleFoundException as e: + _print_cycle_exception(e) + sys.exit(1) + + for tree in result: + print("\n".join(tree)) |