diff options
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 70 | ||||
-rw-r--r-- | Lib/asyncio/base_subprocess.py | 7 | ||||
-rw-r--r-- | Lib/asyncio/futures.py | 17 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/tasks.py | 36 | ||||
-rw-r--r-- | Lib/asyncio/tools.py | 180 |
6 files changed, 191 insertions, 121 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 04fb961e998..520d4b39854 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -1016,38 +1016,43 @@ class BaseEventLoop(events.AbstractEventLoop): family, type_, proto, _, address = addr_info sock = None try: - sock = socket.socket(family=family, type=type_, proto=proto) - sock.setblocking(False) - if local_addr_infos is not None: - for lfamily, _, _, _, laddr in local_addr_infos: - # skip local addresses of different family - if lfamily != family: - continue - try: - sock.bind(laddr) - break - except OSError as exc: - msg = ( - f'error while attempting to bind on ' - f'address {laddr!r}: {str(exc).lower()}' - ) - exc = OSError(exc.errno, msg) - my_exceptions.append(exc) - else: # all bind attempts failed - if my_exceptions: - raise my_exceptions.pop() - else: - raise OSError(f"no matching local address with {family=} found") - await self.sock_connect(sock, address) - return sock - except OSError as exc: - my_exceptions.append(exc) - if sock is not None: - sock.close() - raise + try: + sock = socket.socket(family=family, type=type_, proto=proto) + sock.setblocking(False) + if local_addr_infos is not None: + for lfamily, _, _, _, laddr in local_addr_infos: + # skip local addresses of different family + if lfamily != family: + continue + try: + sock.bind(laddr) + break + except OSError as exc: + msg = ( + f'error while attempting to bind on ' + f'address {laddr!r}: {str(exc).lower()}' + ) + exc = OSError(exc.errno, msg) + my_exceptions.append(exc) + else: # all bind attempts failed + if my_exceptions: + raise my_exceptions.pop() + else: + raise OSError(f"no matching local address with {family=} found") + await self.sock_connect(sock, address) + return sock + except OSError as exc: + my_exceptions.append(exc) + raise except: if sock is not None: - sock.close() + try: + sock.close() + except OSError: + # An error when closing a newly created socket is + # not important, but it can overwrite more important + # non-OSError error. So ignore it. + pass raise finally: exceptions = my_exceptions = None @@ -1161,7 +1166,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise ExceptionGroup("create_connection failed", exceptions) if len(exceptions) == 1: raise exceptions[0] - else: + elif exceptions: # If they all have the same str(), raise one. model = str(exceptions[0]) if all(str(exc) == model for exc in exceptions): @@ -1170,6 +1175,9 @@ class BaseEventLoop(events.AbstractEventLoop): # the various error messages. raise OSError('Multiple exceptions: {}'.format( ', '.join(str(exc) for exc in exceptions))) + else: + # No exceptions were collected, raise a timeout error + raise TimeoutError('create_connection failed') finally: exceptions = None diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 9c2ba679ce2..d40af422e61 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -104,7 +104,12 @@ class BaseSubprocessTransport(transports.SubprocessTransport): for proto in self._pipes.values(): if proto is None: continue - proto.pipe.close() + # See gh-114177 + # skip closing the pipe if loop is already closed + # this can happen e.g. when loop is closed immediately after + # process is killed + if self._loop and not self._loop.is_closed(): + proto.pipe.close() if (self._proc is not None and # has the child process finished? diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index d1df6707302..6bd00a64478 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -351,22 +351,19 @@ def _set_concurrent_future_state(concurrent, source): def _copy_future_state(source, dest): """Internal helper to copy state from another Future. - The other Future may be a concurrent.futures.Future. + The other Future must be a concurrent.futures.Future. """ - assert source.done() if dest.cancelled(): return assert not dest.done() - if source.cancelled(): + done, cancelled, result, exception = source._get_snapshot() + assert done + if cancelled: dest.cancel() + elif exception is not None: + dest.set_exception(_convert_future_exc(exception)) else: - exception = source.exception() - if exception is not None: - dest.set_exception(_convert_future_exc(exception)) - else: - result = source.result() - dest.set_result(result) - + dest.set_result(result) def _chain_future(source, destination): """Chain two futures so that when one completes, so does the other. diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 22147451fa7..6ad84044adf 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -173,7 +173,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # listening socket has triggered an EVENT_READ. There may be multiple # connections waiting for an .accept() so it is called in a loop. # See https://bugs.python.org/issue27906 for more details. - for _ in range(backlog): + for _ in range(backlog + 1): try: conn, addr = sock.accept() if self._debug: diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 888615f8e5e..fbd5c39a7c5 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -908,6 +908,25 @@ def gather(*coros_or_futures, return_exceptions=False): return outer +def _log_on_exception(fut): + if fut.cancelled(): + return + + exc = fut.exception() + if exc is None: + return + + context = { + 'message': + f'{exc.__class__.__name__} exception in shielded future', + 'exception': exc, + 'future': fut, + } + if fut._source_traceback: + context['source_traceback'] = fut._source_traceback + fut._loop.call_exception_handler(context) + + def shield(arg): """Wait for a future, shielding it from cancellation. @@ -953,14 +972,11 @@ def shield(arg): else: cur_task = None - def _inner_done_callback(inner, cur_task=cur_task): - if cur_task is not None: - futures.future_discard_from_awaited_by(inner, cur_task) + def _clear_awaited_by_callback(inner): + futures.future_discard_from_awaited_by(inner, cur_task) + def _inner_done_callback(inner): if outer.cancelled(): - if not inner.cancelled(): - # Mark inner's result as retrieved. - inner.exception() return if inner.cancelled(): @@ -972,10 +988,16 @@ def shield(arg): else: outer.set_result(inner.result()) - def _outer_done_callback(outer): if not inner.done(): inner.remove_done_callback(_inner_done_callback) + # Keep only one callback to log on cancel + inner.remove_done_callback(_log_on_exception) + inner.add_done_callback(_log_on_exception) + + if cur_task is not None: + inner.add_done_callback(_clear_awaited_by_callback) + inner.add_done_callback(_inner_done_callback) outer.add_done_callback(_outer_done_callback) diff --git a/Lib/asyncio/tools.py b/Lib/asyncio/tools.py index bf1cb5e64cb..2683f34cc71 100644 --- a/Lib/asyncio/tools.py +++ b/Lib/asyncio/tools.py @@ -1,71 +1,99 @@ """Tools to analyze tasks running in asyncio programs.""" -from dataclasses import dataclass -from collections import defaultdict +from collections import defaultdict, namedtuple from itertools import count from enum import Enum import sys -from _remote_debugging import get_all_awaited_by - +from _remote_debugging import RemoteUnwinder, FrameInfo class NodeType(Enum): COROUTINE = 1 TASK = 2 -@dataclass(frozen=True) class CycleFoundException(Exception): """Raised when there is a cycle when drawing the call tree.""" - cycles: list[list[int]] - id2name: dict[int, str] + def __init__( + self, + cycles: list[list[int]], + id2name: dict[int, str], + ) -> None: + super().__init__(cycles, id2name) + self.cycles = cycles + self.id2name = id2name -# ─── indexing helpers ─────────────────────────────────────────── -def _format_stack_entry(elem: tuple[str, str, int] | str) -> str: - if isinstance(elem, tuple): - fqname, path, line_no = elem - return f"{fqname} {path}:{line_no}" +# ─── indexing helpers ─────────────────────────────────────────── +def _format_stack_entry(elem: str|FrameInfo) -> str: + if not isinstance(elem, str): + if elem.lineno == 0 and elem.filename == "": + return f"{elem.funcname}" + else: + return f"{elem.funcname} {elem.filename}:{elem.lineno}" return elem def _index(result): - id2name, awaits = {}, [] - for _thr_id, tasks in result: - for tid, tname, awaited in tasks: - id2name[tid] = tname - for stack, parent_id in awaited: - stack = [_format_stack_entry(elem) for elem in stack] - awaits.append((parent_id, stack, tid)) - return id2name, awaits - - -def _build_tree(id2name, awaits): + id2name, awaits, task_stacks = {}, [], {} + for awaited_info in result: + for task_info in awaited_info.awaited_by: + task_id = task_info.task_id + task_name = task_info.task_name + id2name[task_id] = task_name + + # Store the internal coroutine stack for this task + if task_info.coroutine_stack: + for coro_info in task_info.coroutine_stack: + call_stack = coro_info.call_stack + internal_stack = [_format_stack_entry(frame) for frame in call_stack] + task_stacks[task_id] = internal_stack + + # Add the awaited_by relationships (external dependencies) + if task_info.awaited_by: + for coro_info in task_info.awaited_by: + call_stack = coro_info.call_stack + parent_task_id = coro_info.task_name + stack = [_format_stack_entry(frame) for frame in call_stack] + awaits.append((parent_task_id, stack, task_id)) + return id2name, awaits, task_stacks + + +def _build_tree(id2name, awaits, task_stacks): id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()} children = defaultdict(list) - cor_names = defaultdict(dict) # (parent) -> {frame: node} - cor_id_seq = count(1) - - def _cor_node(parent_key, frame_name): - """Return an existing or new (NodeType.COROUTINE, …) node under *parent_key*.""" - bucket = cor_names[parent_key] - if frame_name in bucket: - return bucket[frame_name] - node_key = (NodeType.COROUTINE, f"c{next(cor_id_seq)}") - id2label[node_key] = frame_name - children[parent_key].append(node_key) - bucket[frame_name] = node_key + cor_nodes = defaultdict(dict) # Maps parent -> {frame_name: node_key} + next_cor_id = count(1) + + def get_or_create_cor_node(parent, frame): + """Get existing coroutine node or create new one under parent""" + if frame in cor_nodes[parent]: + return cor_nodes[parent][frame] + + node_key = (NodeType.COROUTINE, f"c{next(next_cor_id)}") + id2label[node_key] = frame + children[parent].append(node_key) + cor_nodes[parent][frame] = node_key return node_key - # lay down parent ➜ …frames… ➜ child paths + # Build task dependency tree with coroutine frames for parent_id, stack, child_id in awaits: cur = (NodeType.TASK, parent_id) - for frame in reversed(stack): # outer-most → inner-most - cur = _cor_node(cur, frame) + for frame in reversed(stack): + cur = get_or_create_cor_node(cur, frame) + child_key = (NodeType.TASK, child_id) if child_key not in children[cur]: children[cur].append(child_key) + # Add coroutine stacks for leaf tasks + awaiting_tasks = {parent_id for parent_id, _, _ in awaits} + for task_id in id2name: + if task_id not in awaiting_tasks and task_id in task_stacks: + cur = (NodeType.TASK, task_id) + for frame in reversed(task_stacks[task_id]): + cur = get_or_create_cor_node(cur, frame) + return id2label, children @@ -112,6 +140,11 @@ def _find_cycles(graph): # ─── PRINT TREE FUNCTION ─────────────────────────────────────── +def get_all_awaited_by(pid): + unwinder = RemoteUnwinder(pid) + return unwinder.get_all_awaited_by() + + def build_async_tree(result, task_emoji="(T)", cor_emoji=""): """ Build a list of strings for pretty-print an async call tree. @@ -119,12 +152,12 @@ def build_async_tree(result, task_emoji="(T)", cor_emoji=""): The call tree is produced by `get_all_async_stacks()`, prefixing tasks with `task_emoji` and coroutine frames with `cor_emoji`. """ - id2name, awaits = _index(result) + id2name, awaits, task_stacks = _index(result) g = _task_graph(awaits) cycles = _find_cycles(g) if cycles: raise CycleFoundException(cycles, id2name) - labels, children = _build_tree(id2name, awaits) + labels, children = _build_tree(id2name, awaits, task_stacks) def pretty(node): flag = task_emoji if node[0] == NodeType.TASK else cor_emoji @@ -144,35 +177,40 @@ def build_async_tree(result, task_emoji="(T)", cor_emoji=""): def build_task_table(result): - id2name, awaits = _index(result) + id2name, _, _ = _index(result) table = [] - for tid, tasks in result: - for task_id, task_name, awaited in tasks: - if not awaited: - table.append( - [ - tid, - hex(task_id), - task_name, - "", - "", - "0x0" - ] - ) - for stack, awaiter_id in awaited: - stack = [elem[0] if isinstance(elem, tuple) else elem for elem in stack] - coroutine_chain = " -> ".join(stack) - awaiter_name = id2name.get(awaiter_id, "Unknown") - table.append( - [ - tid, - hex(task_id), - task_name, - coroutine_chain, - awaiter_name, - hex(awaiter_id), - ] - ) + + for awaited_info in result: + thread_id = awaited_info.thread_id + for task_info in awaited_info.awaited_by: + # Get task info + task_id = task_info.task_id + task_name = task_info.task_name + + # Build coroutine stack string + frames = [frame for coro in task_info.coroutine_stack + for frame in coro.call_stack] + coro_stack = " -> ".join(_format_stack_entry(x).split(" ")[0] + for x in frames) + + # Handle tasks with no awaiters + if not task_info.awaited_by: + table.append([thread_id, hex(task_id), task_name, coro_stack, + "", "", "0x0"]) + continue + + # Handle tasks with awaiters + for coro_info in task_info.awaited_by: + parent_id = coro_info.task_name + awaiter_frames = [_format_stack_entry(x).split(" ")[0] + for x in coro_info.call_stack] + awaiter_chain = " -> ".join(awaiter_frames) + awaiter_name = id2name.get(parent_id, "Unknown") + parent_id_str = (hex(parent_id) if isinstance(parent_id, int) + else str(parent_id)) + + table.append([thread_id, hex(task_id), task_name, coro_stack, + awaiter_chain, awaiter_name, parent_id_str]) return table @@ -201,11 +239,11 @@ def display_awaited_by_tasks_table(pid: int) -> None: table = build_task_table(tasks) # Print the table in a simple tabular format print( - f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine chain':<50} {'awaiter name':<20} {'awaiter id':<15}" + f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine stack':<50} {'awaiter chain':<50} {'awaiter name':<15} {'awaiter id':<15}" ) - print("-" * 135) + print("-" * 180) for row in table: - print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<20} {row[5]:<15}") + print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<50} {row[5]:<15} {row[6]:<15}") def display_awaited_by_tasks_tree(pid: int) -> None: |