diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/annotationlib.py | 379 | ||||
-rw-r--r-- | Lib/ast.py | 4 | ||||
-rw-r--r-- | Lib/asyncio/__main__.py | 32 | ||||
-rw-r--r-- | Lib/asyncio/tools.py | 214 | ||||
-rw-r--r-- | Lib/cmd.py | 2 | ||||
-rw-r--r-- | Lib/pdb.py | 13 | ||||
-rw-r--r-- | Lib/test/test_annotationlib.py | 260 | ||||
-rw-r--r-- | Lib/test/test_ast/test_ast.py | 182 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_tools.py | 839 | ||||
-rw-r--r-- | Lib/test/test_cmd.py | 24 | ||||
-rw-r--r-- | Lib/test/test_curses.py | 37 | ||||
-rw-r--r-- | Lib/test/test_external_inspection.py | 345 | ||||
-rw-r--r-- | Lib/test/test_genericalias.py | 4 | ||||
-rw-r--r-- | Lib/test/test_inspect/test_inspect.py | 2 | ||||
-rw-r--r-- | Lib/test/test_interpreters/test_stress.py | 30 | ||||
-rw-r--r-- | Lib/test/test_io.py | 44 | ||||
-rw-r--r-- | Lib/test/test_posix.py | 45 | ||||
-rw-r--r-- | Lib/test/test_remote_pdb.py | 38 | ||||
-rw-r--r-- | Lib/test/test_sys.py | 2 | ||||
-rw-r--r-- | Lib/typing.py | 4 |
20 files changed, 2235 insertions, 265 deletions
diff --git a/Lib/annotationlib.py b/Lib/annotationlib.py index 971f636f971..5ad0893106a 100644 --- a/Lib/annotationlib.py +++ b/Lib/annotationlib.py @@ -12,7 +12,7 @@ __all__ = [ "ForwardRef", "call_annotate_function", "call_evaluate_function", - "get_annotate_function", + "get_annotate_from_class_namespace", "get_annotations", "annotations_to_string", "type_repr", @@ -38,6 +38,7 @@ _SLOTS = ( "__weakref__", "__arg__", "__globals__", + "__extra_names__", "__code__", "__ast_node__", "__cell__", @@ -82,6 +83,7 @@ class ForwardRef: # is created through __class__ assignment on a _Stringifier object. self.__globals__ = None self.__cell__ = None + self.__extra_names__ = None # These are initially None but serve as a cache and may be set to a non-None # value later. self.__code__ = None @@ -90,11 +92,28 @@ class ForwardRef: def __init_subclass__(cls, /, *args, **kwds): raise TypeError("Cannot subclass ForwardRef") - def evaluate(self, *, globals=None, locals=None, type_params=None, owner=None): + def evaluate( + self, + *, + globals=None, + locals=None, + type_params=None, + owner=None, + format=Format.VALUE, + ): """Evaluate the forward reference and return the value. If the forward reference cannot be evaluated, raise an exception. """ + match format: + case Format.STRING: + return self.__forward_arg__ + case Format.VALUE: + is_forwardref_format = False + case Format.FORWARDREF: + is_forwardref_format = True + case _: + raise NotImplementedError(format) if self.__cell__ is not None: try: return self.__cell__.cell_contents @@ -151,21 +170,42 @@ class ForwardRef: if not self.__forward_is_class__ or param_name not in globals: globals[param_name] = param locals.pop(param_name, None) + if self.__extra_names__: + locals = {**locals, **self.__extra_names__} arg = self.__forward_arg__ if arg.isidentifier() and not keyword.iskeyword(arg): if arg in locals: - value = locals[arg] + return locals[arg] elif arg in globals: - value = globals[arg] + return globals[arg] elif hasattr(builtins, arg): return getattr(builtins, arg) + elif is_forwardref_format: + return self else: raise NameError(arg) else: code = self.__forward_code__ - value = eval(code, globals=globals, locals=locals) - return value + try: + return eval(code, globals=globals, locals=locals) + except Exception: + if not is_forwardref_format: + raise + new_locals = _StringifierDict( + {**builtins.__dict__, **locals}, + globals=globals, + owner=owner, + is_class=self.__forward_is_class__, + format=format, + ) + try: + result = eval(code, globals=globals, locals=new_locals) + except Exception: + return self + else: + new_locals.transmogrify() + return result def _evaluate(self, globalns, localns, type_params=_sentinel, *, recursive_guard): import typing @@ -231,6 +271,10 @@ class ForwardRef: and self.__forward_is_class__ == other.__forward_is_class__ and self.__cell__ == other.__cell__ and self.__owner__ == other.__owner__ + and ( + (tuple(sorted(self.__extra_names__.items())) if self.__extra_names__ else None) == + (tuple(sorted(other.__extra_names__.items())) if other.__extra_names__ else None) + ) ) def __hash__(self): @@ -241,6 +285,7 @@ class ForwardRef: self.__forward_is_class__, self.__cell__, self.__owner__, + tuple(sorted(self.__extra_names__.items())) if self.__extra_names__ else None, )) def __or__(self, other): @@ -274,6 +319,7 @@ class _Stringifier: cell=None, *, stringifier_dict, + extra_names=None, ): # Either an AST node or a simple str (for the common case where a ForwardRef # represent a single name). @@ -285,6 +331,7 @@ class _Stringifier: self.__code__ = None self.__ast_node__ = node self.__globals__ = globals + self.__extra_names__ = extra_names self.__cell__ = cell self.__owner__ = owner self.__stringifier_dict__ = stringifier_dict @@ -292,28 +339,63 @@ class _Stringifier: def __convert_to_ast(self, other): if isinstance(other, _Stringifier): if isinstance(other.__ast_node__, str): - return ast.Name(id=other.__ast_node__) - return other.__ast_node__ - elif isinstance(other, slice): + return ast.Name(id=other.__ast_node__), other.__extra_names__ + return other.__ast_node__, other.__extra_names__ + elif ( + # In STRING format we don't bother with the create_unique_name() dance; + # it's better to emit the repr() of the object instead of an opaque name. + self.__stringifier_dict__.format == Format.STRING + or other is None + or type(other) in (str, int, float, bool, complex) + ): + return ast.Constant(value=other), None + elif type(other) is dict: + extra_names = {} + keys = [] + values = [] + for key, value in other.items(): + new_key, new_extra_names = self.__convert_to_ast(key) + if new_extra_names is not None: + extra_names.update(new_extra_names) + keys.append(new_key) + new_value, new_extra_names = self.__convert_to_ast(value) + if new_extra_names is not None: + extra_names.update(new_extra_names) + values.append(new_value) + return ast.Dict(keys, values), extra_names + elif type(other) in (list, tuple, set): + extra_names = {} + elts = [] + for elt in other: + new_elt, new_extra_names = self.__convert_to_ast(elt) + if new_extra_names is not None: + extra_names.update(new_extra_names) + elts.append(new_elt) + ast_class = {list: ast.List, tuple: ast.Tuple, set: ast.Set}[type(other)] + return ast_class(elts), extra_names + else: + name = self.__stringifier_dict__.create_unique_name() + return ast.Name(id=name), {name: other} + + def __convert_to_ast_getitem(self, other): + if isinstance(other, slice): + extra_names = {} + + def conv(obj): + if obj is None: + return None + new_obj, new_extra_names = self.__convert_to_ast(obj) + if new_extra_names is not None: + extra_names.update(new_extra_names) + return new_obj + return ast.Slice( - lower=( - self.__convert_to_ast(other.start) - if other.start is not None - else None - ), - upper=( - self.__convert_to_ast(other.stop) - if other.stop is not None - else None - ), - step=( - self.__convert_to_ast(other.step) - if other.step is not None - else None - ), - ) + lower=conv(other.start), + upper=conv(other.stop), + step=conv(other.step), + ), extra_names else: - return ast.Constant(value=other) + return self.__convert_to_ast(other) def __get_ast(self): node = self.__ast_node__ @@ -321,13 +403,19 @@ class _Stringifier: return ast.Name(id=node) return node - def __make_new(self, node): + def __make_new(self, node, extra_names=None): + new_extra_names = {} + if self.__extra_names__ is not None: + new_extra_names.update(self.__extra_names__) + if extra_names is not None: + new_extra_names.update(extra_names) stringifier = _Stringifier( node, self.__globals__, self.__owner__, self.__forward_is_class__, stringifier_dict=self.__stringifier_dict__, + extra_names=new_extra_names or None, ) self.__stringifier_dict__.stringifiers.append(stringifier) return stringifier @@ -343,27 +431,37 @@ class _Stringifier: if self.__ast_node__ == "__classdict__": raise KeyError if isinstance(other, tuple): - elts = [self.__convert_to_ast(elt) for elt in other] + extra_names = {} + elts = [] + for elt in other: + new_elt, new_extra_names = self.__convert_to_ast_getitem(elt) + if new_extra_names is not None: + extra_names.update(new_extra_names) + elts.append(new_elt) other = ast.Tuple(elts) else: - other = self.__convert_to_ast(other) + other, extra_names = self.__convert_to_ast_getitem(other) assert isinstance(other, ast.AST), repr(other) - return self.__make_new(ast.Subscript(self.__get_ast(), other)) + return self.__make_new(ast.Subscript(self.__get_ast(), other), extra_names) def __getattr__(self, attr): return self.__make_new(ast.Attribute(self.__get_ast(), attr)) def __call__(self, *args, **kwargs): - return self.__make_new( - ast.Call( - self.__get_ast(), - [self.__convert_to_ast(arg) for arg in args], - [ - ast.keyword(key, self.__convert_to_ast(value)) - for key, value in kwargs.items() - ], - ) - ) + extra_names = {} + ast_args = [] + for arg in args: + new_arg, new_extra_names = self.__convert_to_ast(arg) + if new_extra_names is not None: + extra_names.update(new_extra_names) + ast_args.append(new_arg) + ast_kwargs = [] + for key, value in kwargs.items(): + new_value, new_extra_names = self.__convert_to_ast(value) + if new_extra_names is not None: + extra_names.update(new_extra_names) + ast_kwargs.append(ast.keyword(key, new_value)) + return self.__make_new(ast.Call(self.__get_ast(), ast_args, ast_kwargs), extra_names) def __iter__(self): yield self.__make_new(ast.Starred(self.__get_ast())) @@ -378,8 +476,9 @@ class _Stringifier: def _make_binop(op: ast.AST): def binop(self, other): + rhs, extra_names = self.__convert_to_ast(other) return self.__make_new( - ast.BinOp(self.__get_ast(), op, self.__convert_to_ast(other)) + ast.BinOp(self.__get_ast(), op, rhs), extra_names ) return binop @@ -402,8 +501,9 @@ class _Stringifier: def _make_rbinop(op: ast.AST): def rbinop(self, other): + new_other, extra_names = self.__convert_to_ast(other) return self.__make_new( - ast.BinOp(self.__convert_to_ast(other), op, self.__get_ast()) + ast.BinOp(new_other, op, self.__get_ast()), extra_names ) return rbinop @@ -426,12 +526,14 @@ class _Stringifier: def _make_compare(op): def compare(self, other): + rhs, extra_names = self.__convert_to_ast(other) return self.__make_new( ast.Compare( left=self.__get_ast(), ops=[op], - comparators=[self.__convert_to_ast(other)], - ) + comparators=[rhs], + ), + extra_names, ) return compare @@ -459,13 +561,15 @@ class _Stringifier: class _StringifierDict(dict): - def __init__(self, namespace, globals=None, owner=None, is_class=False): + def __init__(self, namespace, *, globals=None, owner=None, is_class=False, format): super().__init__(namespace) self.namespace = namespace self.globals = globals self.owner = owner self.is_class = is_class self.stringifiers = [] + self.next_id = 1 + self.format = format def __missing__(self, key): fwdref = _Stringifier( @@ -478,6 +582,19 @@ class _StringifierDict(dict): self.stringifiers.append(fwdref) return fwdref + def transmogrify(self): + for obj in self.stringifiers: + obj.__class__ = ForwardRef + obj.__stringifier_dict__ = None # not needed for ForwardRef + if isinstance(obj.__ast_node__, str): + obj.__arg__ = obj.__ast_node__ + obj.__ast_node__ = None + + def create_unique_name(self): + name = f"__annotationlib_name_{self.next_id}__" + self.next_id += 1 + return name + def call_evaluate_function(evaluate, format, *, owner=None): """Call an evaluate function. Evaluate functions are normally generated for @@ -521,20 +638,11 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False): # possibly constants if the annotate function uses them directly). We then # convert each of those into a string to get an approximation of the # original source. - globals = _StringifierDict({}) - if annotate.__closure__: - freevars = annotate.__code__.co_freevars - new_closure = [] - for i, cell in enumerate(annotate.__closure__): - if i < len(freevars): - name = freevars[i] - else: - name = "__cell__" - fwdref = _Stringifier(name, stringifier_dict=globals) - new_closure.append(types.CellType(fwdref)) - closure = tuple(new_closure) - else: - closure = None + globals = _StringifierDict({}, format=format) + is_class = isinstance(owner, type) + closure = _build_closure( + annotate, owner, is_class, globals, allow_evaluation=False + ) func = types.FunctionType( annotate.__code__, globals, @@ -544,9 +652,9 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False): ) annos = func(Format.VALUE_WITH_FAKE_GLOBALS) if _is_evaluate: - return annos if isinstance(annos, str) else repr(annos) + return _stringify_single(annos) return { - key: val if isinstance(val, str) else repr(val) + key: _stringify_single(val) for key, val in annos.items() } elif format == Format.FORWARDREF: @@ -569,33 +677,43 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False): # that returns a bool and an defined set of attributes. namespace = {**annotate.__builtins__, **annotate.__globals__} is_class = isinstance(owner, type) - globals = _StringifierDict(namespace, annotate.__globals__, owner, is_class) - if annotate.__closure__: - freevars = annotate.__code__.co_freevars - new_closure = [] - for i, cell in enumerate(annotate.__closure__): - try: - cell.cell_contents - except ValueError: - if i < len(freevars): - name = freevars[i] - else: - name = "__cell__" - fwdref = _Stringifier( - name, - cell=cell, - owner=owner, - globals=annotate.__globals__, - is_class=is_class, - stringifier_dict=globals, - ) - globals.stringifiers.append(fwdref) - new_closure.append(types.CellType(fwdref)) - else: - new_closure.append(cell) - closure = tuple(new_closure) + globals = _StringifierDict( + namespace, + globals=annotate.__globals__, + owner=owner, + is_class=is_class, + format=format, + ) + closure = _build_closure( + annotate, owner, is_class, globals, allow_evaluation=True + ) + func = types.FunctionType( + annotate.__code__, + globals, + closure=closure, + argdefs=annotate.__defaults__, + kwdefaults=annotate.__kwdefaults__, + ) + try: + result = func(Format.VALUE_WITH_FAKE_GLOBALS) + except Exception: + pass else: - closure = None + globals.transmogrify() + return result + + # Try again, but do not provide any globals. This allows us to return + # a value in certain cases where an exception gets raised during evaluation. + globals = _StringifierDict( + {}, + globals=annotate.__globals__, + owner=owner, + is_class=is_class, + format=format, + ) + closure = _build_closure( + annotate, owner, is_class, globals, allow_evaluation=False + ) func = types.FunctionType( annotate.__code__, globals, @@ -604,13 +722,21 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False): kwdefaults=annotate.__kwdefaults__, ) result = func(Format.VALUE_WITH_FAKE_GLOBALS) - for obj in globals.stringifiers: - obj.__class__ = ForwardRef - obj.__stringifier_dict__ = None # not needed for ForwardRef - if isinstance(obj.__ast_node__, str): - obj.__arg__ = obj.__ast_node__ - obj.__ast_node__ = None - return result + globals.transmogrify() + if _is_evaluate: + if isinstance(result, ForwardRef): + return result.evaluate(format=Format.FORWARDREF) + else: + return result + else: + return { + key: ( + val.evaluate(format=Format.FORWARDREF) + if isinstance(val, ForwardRef) + else val + ) + for key, val in result.items() + } elif format == Format.VALUE: # Should be impossible because __annotate__ functions must not raise # NotImplementedError for this format. @@ -619,20 +745,59 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False): raise ValueError(f"Invalid format: {format!r}") -def get_annotate_function(obj): - """Get the __annotate__ function for an object. +def _build_closure(annotate, owner, is_class, stringifier_dict, *, allow_evaluation): + if not annotate.__closure__: + return None + freevars = annotate.__code__.co_freevars + new_closure = [] + for i, cell in enumerate(annotate.__closure__): + if i < len(freevars): + name = freevars[i] + else: + name = "__cell__" + new_cell = None + if allow_evaluation: + try: + cell.cell_contents + except ValueError: + pass + else: + new_cell = cell + if new_cell is None: + fwdref = _Stringifier( + name, + cell=cell, + owner=owner, + globals=annotate.__globals__, + is_class=is_class, + stringifier_dict=stringifier_dict, + ) + stringifier_dict.stringifiers.append(fwdref) + new_cell = types.CellType(fwdref) + new_closure.append(new_cell) + return tuple(new_closure) + + +def _stringify_single(anno): + if anno is ...: + return "..." + # We have to handle str specially to support PEP 563 stringified annotations. + elif isinstance(anno, str): + return anno + else: + return repr(anno) + - obj may be a function, class, or module, or a user-defined type with - an `__annotate__` attribute. +def get_annotate_from_class_namespace(obj): + """Retrieve the annotate function from a class namespace dictionary. - Returns the __annotate__ function or None. + Return None if the namespace does not contain an annotate function. + This is useful in metaclass ``__new__`` methods to retrieve the annotate function. """ - if isinstance(obj, dict): - try: - return obj["__annotate__"] - except KeyError: - return obj.get("__annotate_func__", None) - return getattr(obj, "__annotate__", None) + try: + return obj["__annotate__"] + except KeyError: + return obj.get("__annotate_func__", None) def get_annotations( @@ -724,7 +889,7 @@ def get_annotations( # But if we didn't get it, we use __annotations__ instead. ann = _get_dunder_annotations(obj) if ann is not None: - return annotations_to_string(ann) + return annotations_to_string(ann) case Format.VALUE_WITH_FAKE_GLOBALS: raise ValueError("The VALUE_WITH_FAKE_GLOBALS format is for internal use only") case _: @@ -832,7 +997,7 @@ def _get_and_call_annotate(obj, format): May not return a fresh dictionary. """ - annotate = get_annotate_function(obj) + annotate = getattr(obj, "__annotate__", None) if annotate is not None: ann = call_annotate_function(annotate, format, owner=obj) if not isinstance(ann, dict): diff --git a/Lib/ast.py b/Lib/ast.py index aa788e6eb62..af4fe8ff5a8 100644 --- a/Lib/ast.py +++ b/Lib/ast.py @@ -626,7 +626,7 @@ def unparse(ast_obj): return unparser.visit(ast_obj) -def main(): +def main(args=None): import argparse import sys @@ -643,7 +643,7 @@ def main(): 'column offsets') parser.add_argument('-i', '--indent', type=int, default=3, help='indentation of nodes (number of spaces)') - args = parser.parse_args() + args = parser.parse_args(args) if args.infile == '-': name = '<stdin>' diff --git a/Lib/asyncio/__main__.py b/Lib/asyncio/__main__.py index 69f5a30cfe5..7d980bc401a 100644 --- a/Lib/asyncio/__main__.py +++ b/Lib/asyncio/__main__.py @@ -1,5 +1,7 @@ +import argparse import ast import asyncio +import asyncio.tools import concurrent.futures import contextvars import inspect @@ -140,6 +142,36 @@ class REPLThread(threading.Thread): if __name__ == '__main__': + parser = argparse.ArgumentParser( + prog="python3 -m asyncio", + description="Interactive asyncio shell and CLI tools", + ) + subparsers = parser.add_subparsers(help="sub-commands", dest="command") + ps = subparsers.add_parser( + "ps", help="Display a table of all pending tasks in a process" + ) + ps.add_argument("pid", type=int, help="Process ID to inspect") + pstree = subparsers.add_parser( + "pstree", help="Display a tree of all pending tasks in a process" + ) + pstree.add_argument("pid", type=int, help="Process ID to inspect") + args = parser.parse_args() + match args.command: + case "ps": + asyncio.tools.display_awaited_by_tasks_table(args.pid) + sys.exit(0) + case "pstree": + asyncio.tools.display_awaited_by_tasks_tree(args.pid) + sys.exit(0) + case None: + pass # continue to the interactive shell + case _: + # shouldn't happen as an invalid command-line wouldn't parse + # but let's keep it for the next person adding a command + print(f"error: unhandled command {args.command}", file=sys.stderr) + parser.print_usage(file=sys.stderr) + sys.exit(1) + sys.audit("cpython.run_stdin") if os.getenv('PYTHON_BASIC_REPL'): diff --git a/Lib/asyncio/tools.py b/Lib/asyncio/tools.py new file mode 100644 index 00000000000..6c1f725e777 --- /dev/null +++ b/Lib/asyncio/tools.py @@ -0,0 +1,214 @@ +"""Tools to analyze tasks running in asyncio programs.""" + +from dataclasses import dataclass +from collections import defaultdict +from itertools import count +from enum import Enum +import sys +from _remotedebugging import get_all_awaited_by + + +class NodeType(Enum): + COROUTINE = 1 + TASK = 2 + + +@dataclass(frozen=True) +class CycleFoundException(Exception): + """Raised when there is a cycle when drawing the call tree.""" + cycles: list[list[int]] + id2name: dict[int, str] + + +# ─── indexing helpers ─────────────────────────────────────────── +def _index(result): + id2name, awaits = {}, [] + for _thr_id, tasks in result: + for tid, tname, awaited in tasks: + id2name[tid] = tname + for stack, parent_id in awaited: + stack = [elem[0] if isinstance(elem, tuple) else elem for elem in stack] + awaits.append((parent_id, stack, tid)) + return id2name, awaits + + +def _build_tree(id2name, awaits): + id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()} + children = defaultdict(list) + cor_names = defaultdict(dict) # (parent) -> {frame: node} + cor_id_seq = count(1) + + def _cor_node(parent_key, frame_name): + """Return an existing or new (NodeType.COROUTINE, …) node under *parent_key*.""" + bucket = cor_names[parent_key] + if frame_name in bucket: + return bucket[frame_name] + node_key = (NodeType.COROUTINE, f"c{next(cor_id_seq)}") + id2label[node_key] = frame_name + children[parent_key].append(node_key) + bucket[frame_name] = node_key + return node_key + + # lay down parent ➜ …frames… ➜ child paths + for parent_id, stack, child_id in awaits: + cur = (NodeType.TASK, parent_id) + for frame in reversed(stack): # outer-most → inner-most + cur = _cor_node(cur, frame) + child_key = (NodeType.TASK, child_id) + if child_key not in children[cur]: + children[cur].append(child_key) + + return id2label, children + + +def _roots(id2label, children): + all_children = {c for kids in children.values() for c in kids} + return [n for n in id2label if n not in all_children] + +# ─── detect cycles in the task-to-task graph ─────────────────────── +def _task_graph(awaits): + """Return {parent_task_id: {child_task_id, …}, …}.""" + g = defaultdict(set) + for parent_id, _stack, child_id in awaits: + g[parent_id].add(child_id) + return g + + +def _find_cycles(graph): + """ + Depth-first search for back-edges. + + Returns a list of cycles (each cycle is a list of task-ids) or an + empty list if the graph is acyclic. + """ + WHITE, GREY, BLACK = 0, 1, 2 + color = defaultdict(lambda: WHITE) + path, cycles = [], [] + + def dfs(v): + color[v] = GREY + path.append(v) + for w in graph.get(v, ()): + if color[w] == WHITE: + dfs(w) + elif color[w] == GREY: # back-edge → cycle! + i = path.index(w) + cycles.append(path[i:] + [w]) # make a copy + color[v] = BLACK + path.pop() + + for v in list(graph): + if color[v] == WHITE: + dfs(v) + return cycles + + +# ─── PRINT TREE FUNCTION ─────────────────────────────────────── +def build_async_tree(result, task_emoji="(T)", cor_emoji=""): + """ + Build a list of strings for pretty-print a async call tree. + + The call tree is produced by `get_all_async_stacks()`, prefixing tasks + with `task_emoji` and coroutine frames with `cor_emoji`. + """ + id2name, awaits = _index(result) + g = _task_graph(awaits) + cycles = _find_cycles(g) + if cycles: + raise CycleFoundException(cycles, id2name) + labels, children = _build_tree(id2name, awaits) + + def pretty(node): + flag = task_emoji if node[0] == NodeType.TASK else cor_emoji + return f"{flag} {labels[node]}" + + def render(node, prefix="", last=True, buf=None): + if buf is None: + buf = [] + buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}") + new_pref = prefix + (" " if last else "│ ") + kids = children.get(node, []) + for i, kid in enumerate(kids): + render(kid, new_pref, i == len(kids) - 1, buf) + return buf + + return [render(root) for root in _roots(labels, children)] + + +def build_task_table(result): + id2name, awaits = _index(result) + table = [] + for tid, tasks in result: + for task_id, task_name, awaited in tasks: + if not awaited: + table.append( + [ + tid, + hex(task_id), + task_name, + "", + "", + "0x0" + ] + ) + for stack, awaiter_id in awaited: + stack = [elem[0] if isinstance(elem, tuple) else elem for elem in stack] + coroutine_chain = " -> ".join(stack) + awaiter_name = id2name.get(awaiter_id, "Unknown") + table.append( + [ + tid, + hex(task_id), + task_name, + coroutine_chain, + awaiter_name, + hex(awaiter_id), + ] + ) + + return table + +def _print_cycle_exception(exception: CycleFoundException): + print("ERROR: await-graph contains cycles – cannot print a tree!", file=sys.stderr) + print("", file=sys.stderr) + for c in exception.cycles: + inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c) + print(f"cycle: {inames}", file=sys.stderr) + + +def _get_awaited_by_tasks(pid: int) -> list: + try: + return get_all_awaited_by(pid) + except RuntimeError as e: + while e.__context__ is not None: + e = e.__context__ + print(f"Error retrieving tasks: {e}") + sys.exit(1) + + +def display_awaited_by_tasks_table(pid: int) -> None: + """Build and print a table of all pending tasks under `pid`.""" + + tasks = _get_awaited_by_tasks(pid) + table = build_task_table(tasks) + # Print the table in a simple tabular format + print( + f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine chain':<50} {'awaiter name':<20} {'awaiter id':<15}" + ) + print("-" * 135) + for row in table: + print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<20} {row[5]:<15}") + + +def display_awaited_by_tasks_tree(pid: int) -> None: + """Build and print a tree of all pending tasks under `pid`.""" + + tasks = _get_awaited_by_tasks(pid) + try: + result = build_async_tree(tasks) + except CycleFoundException as e: + _print_cycle_exception(e) + sys.exit(1) + + for tree in result: + print("\n".join(tree)) diff --git a/Lib/cmd.py b/Lib/cmd.py index 438b88aa104..51495fb3216 100644 --- a/Lib/cmd.py +++ b/Lib/cmd.py @@ -273,7 +273,7 @@ class Cmd: endidx = readline.get_endidx() - stripped if begidx>0: cmd, args, foo = self.parseline(line) - if cmd == '': + if not cmd: compfunc = self.completedefault else: try: diff --git a/Lib/pdb.py b/Lib/pdb.py index 343cf4404d7..2aa60c75396 100644 --- a/Lib/pdb.py +++ b/Lib/pdb.py @@ -2933,6 +2933,7 @@ class _PdbClient: self.completion_matches = [] self.state = "dumb" self.write_failed = False + self.multiline_block = False def _ensure_valid_message(self, msg): # Ensure the message conforms to our protocol. @@ -2979,6 +2980,7 @@ class _PdbClient: self.write_failed = True def read_command(self, prompt): + self.multiline_block = False reply = input(prompt) if self.state == "dumb": @@ -3003,6 +3005,7 @@ class _PdbClient: return prefix + reply # Otherwise, valid first line of a multi-line statement + self.multiline_block = True continue_prompt = "...".ljust(len(prompt)) while codeop.compile_command(reply, "<stdin>", "single") is None: reply += "\n" + input(continue_prompt) @@ -3105,9 +3108,13 @@ class _PdbClient: origline = readline.get_line_buffer() line = origline.lstrip() - stripped = len(origline) - len(line) - begidx = readline.get_begidx() - stripped - endidx = readline.get_endidx() - stripped + if self.multiline_block: + # We're completing a line contained in a multi-line block. + # Force the remote to treat it as a Python expression. + line = "! " + line + offset = len(origline) - len(line) + begidx = readline.get_begidx() - offset + endidx = readline.get_endidx() - offset msg = { "complete": { diff --git a/Lib/test/test_annotationlib.py b/Lib/test/test_annotationlib.py index be55f044b15..13c6a2a584b 100644 --- a/Lib/test/test_annotationlib.py +++ b/Lib/test/test_annotationlib.py @@ -1,5 +1,6 @@ """Tests for the annotations module.""" +import textwrap import annotationlib import builtins import collections @@ -12,7 +13,6 @@ from annotationlib import ( Format, ForwardRef, get_annotations, - get_annotate_function, annotations_to_string, type_repr, ) @@ -121,6 +121,28 @@ class TestForwardRefFormat(unittest.TestCase): self.assertIsInstance(gamma_anno, ForwardRef) self.assertEqual(gamma_anno, support.EqualToForwardRef("some < obj", owner=f)) + def test_partially_nonexistent_union(self): + # Test unions with '|' syntax equal unions with typing.Union[] with some forwardrefs + class UnionForwardrefs: + pipe: str | undefined + union: Union[str, undefined] + + annos = get_annotations(UnionForwardrefs, format=Format.FORWARDREF) + + pipe = annos["pipe"] + self.assertIsInstance(pipe, ForwardRef) + self.assertEqual( + pipe.evaluate(globals={"undefined": int}), + str | int, + ) + union = annos["union"] + self.assertIsInstance(union, Union) + arg1, arg2 = typing.get_args(union) + self.assertIs(arg1, str) + self.assertEqual( + arg2, support.EqualToForwardRef("undefined", is_class=True, owner=UnionForwardrefs) + ) + class TestStringFormat(unittest.TestCase): def test_closure(self): @@ -251,6 +273,89 @@ class TestStringFormat(unittest.TestCase): }, ) + def test_getitem(self): + def f(x: undef1[str, undef2]): + pass + anno = get_annotations(f, format=Format.STRING) + self.assertEqual(anno, {"x": "undef1[str, undef2]"}) + + anno = get_annotations(f, format=Format.FORWARDREF) + fwdref = anno["x"] + self.assertIsInstance(fwdref, ForwardRef) + self.assertEqual( + fwdref.evaluate(globals={"undef1": dict, "undef2": float}), dict[str, float] + ) + + def test_slice(self): + def f(x: a[b:c]): + pass + anno = get_annotations(f, format=Format.STRING) + self.assertEqual(anno, {"x": "a[b:c]"}) + + def f(x: a[b:c, d:e]): + pass + anno = get_annotations(f, format=Format.STRING) + self.assertEqual(anno, {"x": "a[b:c, d:e]"}) + + obj = slice(1, 1, 1) + def f(x: obj): + pass + anno = get_annotations(f, format=Format.STRING) + self.assertEqual(anno, {"x": "obj"}) + + def test_literals(self): + def f( + a: 1, + b: 1.0, + c: "hello", + d: b"hello", + e: True, + f: None, + g: ..., + h: 1j, + ): + pass + + anno = get_annotations(f, format=Format.STRING) + self.assertEqual( + anno, + { + "a": "1", + "b": "1.0", + "c": 'hello', + "d": "b'hello'", + "e": "True", + "f": "None", + "g": "...", + "h": "1j", + }, + ) + + def test_displays(self): + # Simple case first + def f(x: a[[int, str], float]): + pass + anno = get_annotations(f, format=Format.STRING) + self.assertEqual(anno, {"x": "a[[int, str], float]"}) + + def g( + w: a[[int, str], float], + x: a[{int, str}, 3], + y: a[{int: str}, 4], + z: a[(int, str), 5], + ): + pass + anno = get_annotations(g, format=Format.STRING) + self.assertEqual( + anno, + { + "w": "a[[int, str], float]", + "x": "a[{int, str}, 3]", + "y": "a[{int: str}, 4]", + "z": "a[(int, str), 5]", + }, + ) + def test_nested_expressions(self): def f( nested: list[Annotated[set[int], "set of ints", 4j]], @@ -296,6 +401,17 @@ class TestStringFormat(unittest.TestCase): with self.assertRaisesRegex(TypeError, format_msg): get_annotations(f, format=Format.STRING) + def test_shenanigans(self): + # In cases like this we can't reconstruct the source; test that we do something + # halfway reasonable. + def f(x: x | (1).__class__, y: (1).__class__): + pass + + self.assertEqual( + get_annotations(f, format=Format.STRING), + {"x": "x | <class 'int'>", "y": "<class 'int'>"}, + ) + class TestGetAnnotations(unittest.TestCase): def test_builtin_type(self): @@ -901,6 +1017,58 @@ class TestGetAnnotations(unittest.TestCase): set(results.generic_func.__type_params__), ) + def test_partial_evaluation(self): + def f( + x: builtins.undef, + y: list[int], + z: 1 + int, + a: builtins.int, + b: [builtins.undef, builtins.int], + ): + pass + + self.assertEqual( + get_annotations(f, format=Format.FORWARDREF), + { + "x": support.EqualToForwardRef("builtins.undef", owner=f), + "y": list[int], + "z": support.EqualToForwardRef("1 + int", owner=f), + "a": int, + "b": [ + support.EqualToForwardRef("builtins.undef", owner=f), + # We can't resolve this because we have to evaluate the whole annotation + support.EqualToForwardRef("builtins.int", owner=f), + ], + }, + ) + + self.assertEqual( + get_annotations(f, format=Format.STRING), + { + "x": "builtins.undef", + "y": "list[int]", + "z": "1 + int", + "a": "builtins.int", + "b": "[builtins.undef, builtins.int]", + }, + ) + + def test_partial_evaluation_cell(self): + obj = object() + + class RaisesAttributeError: + attriberr: obj.missing + + anno = get_annotations(RaisesAttributeError, format=Format.FORWARDREF) + self.assertEqual( + anno, + { + "attriberr": support.EqualToForwardRef( + "obj.missing", is_class=True, owner=RaisesAttributeError + ) + }, + ) + class TestCallEvaluateFunction(unittest.TestCase): def test_evaluation(self): @@ -933,13 +1101,13 @@ class MetaclassTests(unittest.TestCase): b: float self.assertEqual(get_annotations(Meta), {"a": int}) - self.assertEqual(get_annotate_function(Meta)(Format.VALUE), {"a": int}) + self.assertEqual(Meta.__annotate__(Format.VALUE), {"a": int}) self.assertEqual(get_annotations(X), {}) - self.assertIs(get_annotate_function(X), None) + self.assertIs(X.__annotate__, None) self.assertEqual(get_annotations(Y), {"b": float}) - self.assertEqual(get_annotate_function(Y)(Format.VALUE), {"b": float}) + self.assertEqual(Y.__annotate__(Format.VALUE), {"b": float}) def test_unannotated_meta(self): class Meta(type): @@ -952,13 +1120,13 @@ class MetaclassTests(unittest.TestCase): pass self.assertEqual(get_annotations(Meta), {}) - self.assertIs(get_annotate_function(Meta), None) + self.assertIs(Meta.__annotate__, None) self.assertEqual(get_annotations(Y), {}) - self.assertIs(get_annotate_function(Y), None) + self.assertIs(Y.__annotate__, None) self.assertEqual(get_annotations(X), {"a": str}) - self.assertEqual(get_annotate_function(X)(Format.VALUE), {"a": str}) + self.assertEqual(X.__annotate__(Format.VALUE), {"a": str}) def test_ordering(self): # Based on a sample by David Ellis @@ -996,7 +1164,7 @@ class MetaclassTests(unittest.TestCase): for c in classes: with self.subTest(c=c): self.assertEqual(get_annotations(c), c.expected_annotations) - annotate_func = get_annotate_function(c) + annotate_func = getattr(c, "__annotate__", None) if c.expected_annotations: self.assertEqual( annotate_func(Format.VALUE), c.expected_annotations @@ -1005,25 +1173,39 @@ class MetaclassTests(unittest.TestCase): self.assertIs(annotate_func, None) -class TestGetAnnotateFunction(unittest.TestCase): - def test_static_class(self): - self.assertIsNone(get_annotate_function(object)) - self.assertIsNone(get_annotate_function(int)) - - def test_unannotated_class(self): - class C: - pass +class TestGetAnnotateFromClassNamespace(unittest.TestCase): + def test_with_metaclass(self): + class Meta(type): + def __new__(mcls, name, bases, ns): + annotate = annotationlib.get_annotate_from_class_namespace(ns) + expected = ns["expected_annotate"] + with self.subTest(name=name): + if expected: + self.assertIsNotNone(annotate) + else: + self.assertIsNone(annotate) + return super().__new__(mcls, name, bases, ns) + + class HasAnnotations(metaclass=Meta): + expected_annotate = True + a: int - self.assertIsNone(get_annotate_function(C)) + class NoAnnotations(metaclass=Meta): + expected_annotate = False - D = type("D", (), {}) - self.assertIsNone(get_annotate_function(D)) + class CustomAnnotate(metaclass=Meta): + expected_annotate = True + def __annotate__(format): + return {} - def test_annotated_class(self): - class C: - a: int + code = """ + from __future__ import annotations - self.assertEqual(get_annotate_function(C)(Format.VALUE), {"a": int}) + class HasFutureAnnotations(metaclass=Meta): + expected_annotate = False + a: int + """ + exec(textwrap.dedent(code), {"Meta": Meta}) class TestTypeRepr(unittest.TestCase): @@ -1240,6 +1422,38 @@ class TestForwardRefClass(unittest.TestCase): with self.assertRaises(TypeError): pickle.dumps(fr, proto) + def test_evaluate_string_format(self): + fr = ForwardRef("set[Any]") + self.assertEqual(fr.evaluate(format=Format.STRING), "set[Any]") + + def test_evaluate_forwardref_format(self): + fr = ForwardRef("undef") + evaluated = fr.evaluate(format=Format.FORWARDREF) + self.assertIs(fr, evaluated) + + fr = ForwardRef("set[undefined]") + evaluated = fr.evaluate(format=Format.FORWARDREF) + self.assertEqual( + evaluated, + set[support.EqualToForwardRef("undefined")], + ) + + fr = ForwardRef("a + b") + self.assertEqual( + fr.evaluate(format=Format.FORWARDREF), + support.EqualToForwardRef("a + b"), + ) + self.assertEqual( + fr.evaluate(format=Format.FORWARDREF, locals={"a": 1, "b": 2}), + 3, + ) + + fr = ForwardRef('"a" + 1') + self.assertEqual( + fr.evaluate(format=Format.FORWARDREF), + support.EqualToForwardRef('"a" + 1'), + ) + def test_evaluate_with_type_params(self): class Gen[T]: alias = int diff --git a/Lib/test/test_ast/test_ast.py b/Lib/test/test_ast/test_ast.py index ae9db093d2e..6a9b7812ef6 100644 --- a/Lib/test/test_ast/test_ast.py +++ b/Lib/test/test_ast/test_ast.py @@ -1,16 +1,20 @@ import _ast_unparse import ast import builtins +import contextlib import copy import dis import enum +import itertools import os import re import sys +import tempfile import textwrap import types import unittest import weakref +from io import StringIO from pathlib import Path from textwrap import dedent try: @@ -19,7 +23,7 @@ except ImportError: _testinternalcapi = None from test import support -from test.support import os_helper, script_helper +from test.support import os_helper from test.support import skip_emscripten_stack_overflow, skip_wasi_stack_overflow from test.support.ast_helper import ASTTestMixin from test.test_ast.utils import to_tuple @@ -3232,23 +3236,169 @@ class ModuleStateTests(unittest.TestCase): self.assertEqual(res, 0) -class ASTMainTests(unittest.TestCase): - # Tests `ast.main()` function. +class CommandLineTests(unittest.TestCase): + def setUp(self): + self.filename = tempfile.mktemp() + self.addCleanup(os_helper.unlink, self.filename) - def test_cli_file_input(self): - code = "print(1, 2, 3)" - expected = ast.dump(ast.parse(code), indent=3) - - with os_helper.temp_dir() as tmp_dir: - filename = os.path.join(tmp_dir, "test_module.py") - with open(filename, 'w', encoding='utf-8') as f: - f.write(code) - res, _ = script_helper.run_python_until_end("-m", "ast", filename) + @staticmethod + def text_normalize(string): + return textwrap.dedent(string).strip() + + def set_source(self, content): + Path(self.filename).write_text(self.text_normalize(content)) + + def invoke_ast(self, *flags): + stderr = StringIO() + stdout = StringIO() + with ( + contextlib.redirect_stdout(stdout), + contextlib.redirect_stderr(stderr), + ): + ast.main(args=[*flags, self.filename]) + self.assertEqual(stderr.getvalue(), '') + return stdout.getvalue().strip() + + def check_output(self, source, expect, *flags): + self.set_source(source) + res = self.invoke_ast(*flags) + expect = self.text_normalize(expect) + self.assertEqual(res, expect) + + def test_invocation(self): + # test various combinations of parameters + base_flags = ( + ('-m=exec', '--mode=exec'), + ('--no-type-comments', '--no-type-comments'), + ('-a', '--include-attributes'), + ('-i=4', '--indent=4'), + ) + self.set_source(''' + print(1, 2, 3) + def f(x: int) -> int: + x -= 1 + return x + ''') - self.assertEqual(res.err, b"") - self.assertEqual(expected.splitlines(), - res.out.decode("utf8").splitlines()) - self.assertEqual(res.rc, 0) + for r in range(1, len(base_flags) + 1): + for choices in itertools.combinations(base_flags, r=r): + for args in itertools.product(*choices): + with self.subTest(flags=args): + self.invoke_ast(*args) + + def test_help_message(self): + for flag in ('-h', '--help', '--unknown'): + with self.subTest(flag=flag): + output = StringIO() + with self.assertRaises(SystemExit): + with contextlib.redirect_stderr(output): + ast.main(args=flag) + self.assertStartsWith(output.getvalue(), 'usage: ') + + def test_exec_mode_flag(self): + # test 'python -m ast -m/--mode exec' + source = 'x: bool = 1 # type: ignore[assignment]' + expect = ''' + Module( + body=[ + AnnAssign( + target=Name(id='x', ctx=Store()), + annotation=Name(id='bool', ctx=Load()), + value=Constant(value=1), + simple=1)], + type_ignores=[ + TypeIgnore(lineno=1, tag='[assignment]')]) + ''' + for flag in ('-m=exec', '--mode=exec'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) + + def test_single_mode_flag(self): + # test 'python -m ast -m/--mode single' + source = 'pass' + expect = ''' + Interactive( + body=[ + Pass()]) + ''' + for flag in ('-m=single', '--mode=single'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) + + def test_eval_mode_flag(self): + # test 'python -m ast -m/--mode eval' + source = 'print(1, 2, 3)' + expect = ''' + Expression( + body=Call( + func=Name(id='print', ctx=Load()), + args=[ + Constant(value=1), + Constant(value=2), + Constant(value=3)])) + ''' + for flag in ('-m=eval', '--mode=eval'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) + + def test_func_type_mode_flag(self): + # test 'python -m ast -m/--mode func_type' + source = '(int, str) -> list[int]' + expect = ''' + FunctionType( + argtypes=[ + Name(id='int', ctx=Load()), + Name(id='str', ctx=Load())], + returns=Subscript( + value=Name(id='list', ctx=Load()), + slice=Name(id='int', ctx=Load()), + ctx=Load())) + ''' + for flag in ('-m=func_type', '--mode=func_type'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) + + def test_no_type_comments_flag(self): + # test 'python -m ast --no-type-comments' + source = 'x: bool = 1 # type: ignore[assignment]' + expect = ''' + Module( + body=[ + AnnAssign( + target=Name(id='x', ctx=Store()), + annotation=Name(id='bool', ctx=Load()), + value=Constant(value=1), + simple=1)]) + ''' + self.check_output(source, expect, '--no-type-comments') + + def test_include_attributes_flag(self): + # test 'python -m ast -a/--include-attributes' + source = 'pass' + expect = ''' + Module( + body=[ + Pass( + lineno=1, + col_offset=0, + end_lineno=1, + end_col_offset=4)]) + ''' + for flag in ('-a', '--include-attributes'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) + + def test_indent_flag(self): + # test 'python -m ast -i/--indent' + source = 'pass' + expect = ''' + Module( + body=[ + Pass()]) + ''' + for flag in ('-i=0', '--indent=0'): + with self.subTest(flag=flag): + self.check_output(source, expect, flag) class ASTOptimiziationTests(unittest.TestCase): diff --git a/Lib/test/test_asyncio/test_tools.py b/Lib/test/test_asyncio/test_tools.py new file mode 100644 index 00000000000..2caf56172c9 --- /dev/null +++ b/Lib/test/test_asyncio/test_tools.py @@ -0,0 +1,839 @@ +import unittest + +from asyncio import tools + + +# mock output of get_all_awaited_by function. +TEST_INPUTS_TREE = [ + [ + # test case containing a task called timer being awaited in two + # different subtasks part of a TaskGroup (root1 and root2) which call + # awaiter functions. + ( + ( + 1, + [ + (2, "Task-1", []), + ( + 3, + "timer", + [ + [["awaiter3", "awaiter2", "awaiter"], 4], + [["awaiter1_3", "awaiter1_2", "awaiter1"], 5], + [["awaiter1_3", "awaiter1_2", "awaiter1"], 6], + [["awaiter3", "awaiter2", "awaiter"], 7], + ], + ), + ( + 8, + "root1", + [[["_aexit", "__aexit__", "main"], 2]], + ), + ( + 9, + "root2", + [[["_aexit", "__aexit__", "main"], 2]], + ), + ( + 4, + "child1_1", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 8, + ] + ], + ), + ( + 6, + "child2_1", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 8, + ] + ], + ), + ( + 7, + "child1_2", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 9, + ] + ], + ), + ( + 5, + "child2_2", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 9, + ] + ], + ), + ], + ), + (0, []), + ), + ( + [ + [ + "└── (T) Task-1", + " └── main", + " └── __aexit__", + " └── _aexit", + " ├── (T) root1", + " │ └── bloch", + " │ └── blocho_caller", + " │ └── __aexit__", + " │ └── _aexit", + " │ ├── (T) child1_1", + " │ │ └── awaiter", + " │ │ └── awaiter2", + " │ │ └── awaiter3", + " │ │ └── (T) timer", + " │ └── (T) child2_1", + " │ └── awaiter1", + " │ └── awaiter1_2", + " │ └── awaiter1_3", + " │ └── (T) timer", + " └── (T) root2", + " └── bloch", + " └── blocho_caller", + " └── __aexit__", + " └── _aexit", + " ├── (T) child1_2", + " │ └── awaiter", + " │ └── awaiter2", + " │ └── awaiter3", + " │ └── (T) timer", + " └── (T) child2_2", + " └── awaiter1", + " └── awaiter1_2", + " └── awaiter1_3", + " └── (T) timer", + ] + ] + ), + ], + [ + # test case containing two roots + ( + ( + 9, + [ + (5, "Task-5", []), + (6, "Task-6", [[["main2"], 5]]), + (7, "Task-7", [[["main2"], 5]]), + (8, "Task-8", [[["main2"], 5]]), + ], + ), + ( + 10, + [ + (1, "Task-1", []), + (2, "Task-2", [[["main"], 1]]), + (3, "Task-3", [[["main"], 1]]), + (4, "Task-4", [[["main"], 1]]), + ], + ), + (11, []), + (0, []), + ), + ( + [ + [ + "└── (T) Task-5", + " └── main2", + " ├── (T) Task-6", + " ├── (T) Task-7", + " └── (T) Task-8", + ], + [ + "└── (T) Task-1", + " └── main", + " ├── (T) Task-2", + " ├── (T) Task-3", + " └── (T) Task-4", + ], + ] + ), + ], + [ + # test case containing two roots, one of them without subtasks + ( + [ + (1, [(2, "Task-5", [])]), + ( + 3, + [ + (4, "Task-1", []), + (5, "Task-2", [[["main"], 4]]), + (6, "Task-3", [[["main"], 4]]), + (7, "Task-4", [[["main"], 4]]), + ], + ), + (8, []), + (0, []), + ] + ), + ( + [ + ["└── (T) Task-5"], + [ + "└── (T) Task-1", + " └── main", + " ├── (T) Task-2", + " ├── (T) Task-3", + " └── (T) Task-4", + ], + ] + ), + ], +] + +TEST_INPUTS_CYCLES_TREE = [ + [ + # this test case contains a cycle: two tasks awaiting each other. + ( + [ + ( + 1, + [ + (2, "Task-1", []), + ( + 3, + "a", + [[["awaiter2"], 4], [["main"], 2]], + ), + (4, "b", [[["awaiter"], 3]]), + ], + ), + (0, []), + ] + ), + ([[4, 3, 4]]), + ], + [ + # this test case contains two cycles + ( + [ + ( + 1, + [ + (2, "Task-1", []), + ( + 3, + "A", + [[["nested", "nested", "task_b"], 4]], + ), + ( + 4, + "B", + [ + [["nested", "nested", "task_c"], 5], + [["nested", "nested", "task_a"], 3], + ], + ), + (5, "C", [[["nested", "nested"], 6]]), + ( + 6, + "Task-2", + [[["nested", "nested", "task_b"], 4]], + ), + ], + ), + (0, []), + ] + ), + ([[4, 3, 4], [4, 6, 5, 4]]), + ], +] + +TEST_INPUTS_TABLE = [ + [ + # test case containing a task called timer being awaited in two + # different subtasks part of a TaskGroup (root1 and root2) which call + # awaiter functions. + ( + ( + 1, + [ + (2, "Task-1", []), + ( + 3, + "timer", + [ + [["awaiter3", "awaiter2", "awaiter"], 4], + [["awaiter1_3", "awaiter1_2", "awaiter1"], 5], + [["awaiter1_3", "awaiter1_2", "awaiter1"], 6], + [["awaiter3", "awaiter2", "awaiter"], 7], + ], + ), + ( + 8, + "root1", + [[["_aexit", "__aexit__", "main"], 2]], + ), + ( + 9, + "root2", + [[["_aexit", "__aexit__", "main"], 2]], + ), + ( + 4, + "child1_1", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 8, + ] + ], + ), + ( + 6, + "child2_1", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 8, + ] + ], + ), + ( + 7, + "child1_2", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 9, + ] + ], + ), + ( + 5, + "child2_2", + [ + [ + ["_aexit", "__aexit__", "blocho_caller", "bloch"], + 9, + ] + ], + ), + ], + ), + (0, []), + ), + ( + [ + [1, "0x2", "Task-1", "", "", "0x0"], + [ + 1, + "0x3", + "timer", + "awaiter3 -> awaiter2 -> awaiter", + "child1_1", + "0x4", + ], + [ + 1, + "0x3", + "timer", + "awaiter1_3 -> awaiter1_2 -> awaiter1", + "child2_2", + "0x5", + ], + [ + 1, + "0x3", + "timer", + "awaiter1_3 -> awaiter1_2 -> awaiter1", + "child2_1", + "0x6", + ], + [ + 1, + "0x3", + "timer", + "awaiter3 -> awaiter2 -> awaiter", + "child1_2", + "0x7", + ], + [ + 1, + "0x8", + "root1", + "_aexit -> __aexit__ -> main", + "Task-1", + "0x2", + ], + [ + 1, + "0x9", + "root2", + "_aexit -> __aexit__ -> main", + "Task-1", + "0x2", + ], + [ + 1, + "0x4", + "child1_1", + "_aexit -> __aexit__ -> blocho_caller -> bloch", + "root1", + "0x8", + ], + [ + 1, + "0x6", + "child2_1", + "_aexit -> __aexit__ -> blocho_caller -> bloch", + "root1", + "0x8", + ], + [ + 1, + "0x7", + "child1_2", + "_aexit -> __aexit__ -> blocho_caller -> bloch", + "root2", + "0x9", + ], + [ + 1, + "0x5", + "child2_2", + "_aexit -> __aexit__ -> blocho_caller -> bloch", + "root2", + "0x9", + ], + ] + ), + ], + [ + # test case containing two roots + ( + ( + 9, + [ + (5, "Task-5", []), + (6, "Task-6", [[["main2"], 5]]), + (7, "Task-7", [[["main2"], 5]]), + (8, "Task-8", [[["main2"], 5]]), + ], + ), + ( + 10, + [ + (1, "Task-1", []), + (2, "Task-2", [[["main"], 1]]), + (3, "Task-3", [[["main"], 1]]), + (4, "Task-4", [[["main"], 1]]), + ], + ), + (11, []), + (0, []), + ), + ( + [ + [9, "0x5", "Task-5", "", "", "0x0"], + [9, "0x6", "Task-6", "main2", "Task-5", "0x5"], + [9, "0x7", "Task-7", "main2", "Task-5", "0x5"], + [9, "0x8", "Task-8", "main2", "Task-5", "0x5"], + [10, "0x1", "Task-1", "", "", "0x0"], + [10, "0x2", "Task-2", "main", "Task-1", "0x1"], + [10, "0x3", "Task-3", "main", "Task-1", "0x1"], + [10, "0x4", "Task-4", "main", "Task-1", "0x1"], + ] + ), + ], + [ + # test case containing two roots, one of them without subtasks + ( + [ + (1, [(2, "Task-5", [])]), + ( + 3, + [ + (4, "Task-1", []), + (5, "Task-2", [[["main"], 4]]), + (6, "Task-3", [[["main"], 4]]), + (7, "Task-4", [[["main"], 4]]), + ], + ), + (8, []), + (0, []), + ] + ), + ( + [ + [1, "0x2", "Task-5", "", "", "0x0"], + [3, "0x4", "Task-1", "", "", "0x0"], + [3, "0x5", "Task-2", "main", "Task-1", "0x4"], + [3, "0x6", "Task-3", "main", "Task-1", "0x4"], + [3, "0x7", "Task-4", "main", "Task-1", "0x4"], + ] + ), + ], + # CASES WITH CYCLES + [ + # this test case contains a cycle: two tasks awaiting each other. + ( + [ + ( + 1, + [ + (2, "Task-1", []), + ( + 3, + "a", + [[["awaiter2"], 4], [["main"], 2]], + ), + (4, "b", [[["awaiter"], 3]]), + ], + ), + (0, []), + ] + ), + ( + [ + [1, "0x2", "Task-1", "", "", "0x0"], + [1, "0x3", "a", "awaiter2", "b", "0x4"], + [1, "0x3", "a", "main", "Task-1", "0x2"], + [1, "0x4", "b", "awaiter", "a", "0x3"], + ] + ), + ], + [ + # this test case contains two cycles + ( + [ + ( + 1, + [ + (2, "Task-1", []), + ( + 3, + "A", + [[["nested", "nested", "task_b"], 4]], + ), + ( + 4, + "B", + [ + [["nested", "nested", "task_c"], 5], + [["nested", "nested", "task_a"], 3], + ], + ), + (5, "C", [[["nested", "nested"], 6]]), + ( + 6, + "Task-2", + [[["nested", "nested", "task_b"], 4]], + ), + ], + ), + (0, []), + ] + ), + ( + [ + [1, "0x2", "Task-1", "", "", "0x0"], + [ + 1, + "0x3", + "A", + "nested -> nested -> task_b", + "B", + "0x4", + ], + [ + 1, + "0x4", + "B", + "nested -> nested -> task_c", + "C", + "0x5", + ], + [ + 1, + "0x4", + "B", + "nested -> nested -> task_a", + "A", + "0x3", + ], + [ + 1, + "0x5", + "C", + "nested -> nested", + "Task-2", + "0x6", + ], + [ + 1, + "0x6", + "Task-2", + "nested -> nested -> task_b", + "B", + "0x4", + ], + ] + ), + ], +] + + +class TestAsyncioToolsTree(unittest.TestCase): + + def test_asyncio_utils(self): + for input_, tree in TEST_INPUTS_TREE: + with self.subTest(input_): + self.assertEqual(tools.build_async_tree(input_), tree) + + def test_asyncio_utils_cycles(self): + for input_, cycles in TEST_INPUTS_CYCLES_TREE: + with self.subTest(input_): + try: + tools.build_async_tree(input_) + except tools.CycleFoundException as e: + self.assertEqual(e.cycles, cycles) + + +class TestAsyncioToolsTable(unittest.TestCase): + def test_asyncio_utils(self): + for input_, table in TEST_INPUTS_TABLE: + with self.subTest(input_): + self.assertEqual(tools.build_task_table(input_), table) + + +class TestAsyncioToolsBasic(unittest.TestCase): + def test_empty_input_tree(self): + """Test build_async_tree with empty input.""" + result = [] + expected_output = [] + self.assertEqual(tools.build_async_tree(result), expected_output) + + def test_empty_input_table(self): + """Test build_task_table with empty input.""" + result = [] + expected_output = [] + self.assertEqual(tools.build_task_table(result), expected_output) + + def test_only_independent_tasks_tree(self): + input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])] + expected = [["└── (T) taskA"], ["└── (T) taskB"]] + result = tools.build_async_tree(input_) + self.assertEqual(sorted(result), sorted(expected)) + + def test_only_independent_tasks_table(self): + input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])] + self.assertEqual( + tools.build_task_table(input_), + [[1, "0xa", "taskA", "", "", "0x0"], [1, "0xb", "taskB", "", "", "0x0"]], + ) + + def test_single_task_tree(self): + """Test build_async_tree with a single task and no awaits.""" + result = [ + ( + 1, + [ + (2, "Task-1", []), + ], + ) + ] + expected_output = [ + [ + "└── (T) Task-1", + ] + ] + self.assertEqual(tools.build_async_tree(result), expected_output) + + def test_single_task_table(self): + """Test build_task_table with a single task and no awaits.""" + result = [ + ( + 1, + [ + (2, "Task-1", []), + ], + ) + ] + expected_output = [[1, "0x2", "Task-1", "", "", "0x0"]] + self.assertEqual(tools.build_task_table(result), expected_output) + + def test_cycle_detection(self): + """Test build_async_tree raises CycleFoundException for cyclic input.""" + result = [ + ( + 1, + [ + (2, "Task-1", [[["main"], 3]]), + (3, "Task-2", [[["main"], 2]]), + ], + ) + ] + with self.assertRaises(tools.CycleFoundException) as context: + tools.build_async_tree(result) + self.assertEqual(context.exception.cycles, [[3, 2, 3]]) + + def test_complex_tree(self): + """Test build_async_tree with a more complex tree structure.""" + result = [ + ( + 1, + [ + (2, "Task-1", []), + (3, "Task-2", [[["main"], 2]]), + (4, "Task-3", [[["main"], 3]]), + ], + ) + ] + expected_output = [ + [ + "└── (T) Task-1", + " └── main", + " └── (T) Task-2", + " └── main", + " └── (T) Task-3", + ] + ] + self.assertEqual(tools.build_async_tree(result), expected_output) + + def test_complex_table(self): + """Test build_task_table with a more complex tree structure.""" + result = [ + ( + 1, + [ + (2, "Task-1", []), + (3, "Task-2", [[["main"], 2]]), + (4, "Task-3", [[["main"], 3]]), + ], + ) + ] + expected_output = [ + [1, "0x2", "Task-1", "", "", "0x0"], + [1, "0x3", "Task-2", "main", "Task-1", "0x2"], + [1, "0x4", "Task-3", "main", "Task-2", "0x3"], + ] + self.assertEqual(tools.build_task_table(result), expected_output) + + def test_deep_coroutine_chain(self): + input_ = [ + ( + 1, + [ + (10, "leaf", [[["c1", "c2", "c3", "c4", "c5"], 11]]), + (11, "root", []), + ], + ) + ] + expected = [ + [ + "└── (T) root", + " └── c5", + " └── c4", + " └── c3", + " └── c2", + " └── c1", + " └── (T) leaf", + ] + ] + result = tools.build_async_tree(input_) + self.assertEqual(result, expected) + + def test_multiple_cycles_same_node(self): + input_ = [ + ( + 1, + [ + (1, "Task-A", [[["call1"], 2]]), + (2, "Task-B", [[["call2"], 3]]), + (3, "Task-C", [[["call3"], 1], [["call4"], 2]]), + ], + ) + ] + with self.assertRaises(tools.CycleFoundException) as ctx: + tools.build_async_tree(input_) + cycles = ctx.exception.cycles + self.assertTrue(any(set(c) == {1, 2, 3} for c in cycles)) + + def test_table_output_format(self): + input_ = [(1, [(1, "Task-A", [[["foo"], 2]]), (2, "Task-B", [])])] + table = tools.build_task_table(input_) + for row in table: + self.assertEqual(len(row), 6) + self.assertIsInstance(row[0], int) # thread ID + self.assertTrue( + isinstance(row[1], str) and row[1].startswith("0x") + ) # hex task ID + self.assertIsInstance(row[2], str) # task name + self.assertIsInstance(row[3], str) # coroutine chain + self.assertIsInstance(row[4], str) # awaiter name + self.assertTrue( + isinstance(row[5], str) and row[5].startswith("0x") + ) # hex awaiter ID + + +class TestAsyncioToolsEdgeCases(unittest.TestCase): + + def test_task_awaits_self(self): + """A task directly awaits itself – should raise a cycle.""" + input_ = [(1, [(1, "Self-Awaiter", [[["loopback"], 1]])])] + with self.assertRaises(tools.CycleFoundException) as ctx: + tools.build_async_tree(input_) + self.assertIn([1, 1], ctx.exception.cycles) + + def test_task_with_missing_awaiter_id(self): + """Awaiter ID not in task list – should not crash, just show 'Unknown'.""" + input_ = [(1, [(1, "Task-A", [[["coro"], 999]])])] # 999 not defined + table = tools.build_task_table(input_) + self.assertEqual(len(table), 1) + self.assertEqual(table[0][4], "Unknown") + + def test_duplicate_coroutine_frames(self): + """Same coroutine frame repeated under a parent – should deduplicate.""" + input_ = [ + ( + 1, + [ + (1, "Task-1", [[["frameA"], 2], [["frameA"], 3]]), + (2, "Task-2", []), + (3, "Task-3", []), + ], + ) + ] + tree = tools.build_async_tree(input_) + # Both children should be under the same coroutine node + flat = "\n".join(tree[0]) + self.assertIn("frameA", flat) + self.assertIn("Task-2", flat) + self.assertIn("Task-1", flat) + + flat = "\n".join(tree[1]) + self.assertIn("frameA", flat) + self.assertIn("Task-3", flat) + self.assertIn("Task-1", flat) + + def test_task_with_no_name(self): + """Task with no name in id2name – should still render with fallback.""" + input_ = [(1, [(1, "root", [[["f1"], 2]]), (2, None, [])])] + # If name is None, fallback to string should not crash + tree = tools.build_async_tree(input_) + self.assertIn("(T) None", "\n".join(tree[0])) + + def test_tree_rendering_with_custom_emojis(self): + """Pass custom emojis to the tree renderer.""" + input_ = [(1, [(1, "MainTask", [[["f1", "f2"], 2]]), (2, "SubTask", [])])] + tree = tools.build_async_tree(input_, task_emoji="🧵", cor_emoji="🔁") + flat = "\n".join(tree[0]) + self.assertIn("🧵 MainTask", flat) + self.assertIn("🔁 f1", flat) + self.assertIn("🔁 f2", flat) + self.assertIn("🧵 SubTask", flat) diff --git a/Lib/test/test_cmd.py b/Lib/test/test_cmd.py index 46ec82b7049..0ae44f3987d 100644 --- a/Lib/test/test_cmd.py +++ b/Lib/test/test_cmd.py @@ -289,6 +289,30 @@ class CmdTestReadline(unittest.TestCase): self.assertIn(b'ab_completion_test', output) self.assertIn(b'tab completion success', output) + def test_bang_completion_without_do_shell(self): + script = textwrap.dedent(""" + import cmd + class simplecmd(cmd.Cmd): + def completedefault(self, text, line, begidx, endidx): + return ["hello"] + + def default(self, line): + if line.replace(" ", "") == "!hello": + print('tab completion success') + else: + print('tab completion failure') + return True + + simplecmd().cmdloop() + """) + + # '! h' or '!h' and complete 'ello' to 'hello' + for input in [b"! h\t\n", b"!h\t\n"]: + with self.subTest(input=input): + output = run_pty(script, input) + self.assertIn(b'hello', output) + self.assertIn(b'tab completion success', output) + def load_tests(loader, tests, pattern): tests.addTest(doctest.DocTestSuite()) return tests diff --git a/Lib/test/test_curses.py b/Lib/test/test_curses.py index 6fe0e7fd4b7..c307258e565 100644 --- a/Lib/test/test_curses.py +++ b/Lib/test/test_curses.py @@ -8,7 +8,8 @@ import unittest from unittest.mock import MagicMock from test.support import (requires, verbose, SaveSignals, cpython_only, - check_disallow_instantiation, MISSING_C_DOCSTRINGS) + check_disallow_instantiation, MISSING_C_DOCSTRINGS, + gc_collect) from test.support.import_helper import import_module # Optionally test curses module. This currently requires that the @@ -51,12 +52,6 @@ def requires_colors(test): term = os.environ.get('TERM') SHORT_MAX = 0x7fff -DEFAULT_PAIR_CONTENTS = [ - (curses.COLOR_WHITE, curses.COLOR_BLACK), - (0, 0), - (-1, -1), - (15, 0), # for xterm-256color (15 is for BRIGHT WHITE) -] # If newterm was supported we could use it instead of initscr and not exit @unittest.skipIf(not term or term == 'unknown', @@ -187,6 +182,14 @@ class TestCurses(unittest.TestCase): self.assertEqual(win3.getparyx(), (2, 1)) self.assertEqual(win3.getmaxyx(), (6, 11)) + def test_subwindows_references(self): + win = curses.newwin(5, 10) + win2 = win.subwin(3, 7) + del win + gc_collect() + del win2 + gc_collect() + def test_move_cursor(self): stdscr = self.stdscr win = stdscr.subwin(10, 15, 2, 5) @@ -948,8 +951,6 @@ class TestCurses(unittest.TestCase): @requires_colors def test_pair_content(self): - if not hasattr(curses, 'use_default_colors'): - self.assertIn(curses.pair_content(0), DEFAULT_PAIR_CONTENTS) curses.pair_content(0) maxpair = self.get_pair_limit() - 1 if maxpair > 0: @@ -994,13 +995,27 @@ class TestCurses(unittest.TestCase): @requires_curses_func('use_default_colors') @requires_colors def test_use_default_colors(self): - old = curses.pair_content(0) try: curses.use_default_colors() except curses.error: self.skipTest('cannot change color (use_default_colors() failed)') self.assertEqual(curses.pair_content(0), (-1, -1)) - self.assertIn(old, DEFAULT_PAIR_CONTENTS) + + @requires_curses_func('assume_default_colors') + @requires_colors + def test_assume_default_colors(self): + try: + curses.assume_default_colors(-1, -1) + except curses.error: + self.skipTest('cannot change color (assume_default_colors() failed)') + self.assertEqual(curses.pair_content(0), (-1, -1)) + curses.assume_default_colors(curses.COLOR_YELLOW, curses.COLOR_BLUE) + self.assertEqual(curses.pair_content(0), (curses.COLOR_YELLOW, curses.COLOR_BLUE)) + curses.assume_default_colors(curses.COLOR_RED, -1) + self.assertEqual(curses.pair_content(0), (curses.COLOR_RED, -1)) + curses.assume_default_colors(-1, curses.COLOR_GREEN) + self.assertEqual(curses.pair_content(0), (-1, curses.COLOR_GREEN)) + curses.assume_default_colors(-1, -1) def test_keyname(self): # TODO: key_name() diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index aa05db972f0..f787190b1ae 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -4,6 +4,8 @@ import textwrap import importlib import sys import socket +from asyncio import staggered, taskgroups +from unittest.mock import ANY from test.support import os_helper, SHORT_TIMEOUT, busy_retry from test.support.script_helper import make_script from test.support.socket_helper import find_unused_port @@ -13,32 +15,38 @@ import subprocess PROCESS_VM_READV_SUPPORTED = False try: - from _testexternalinspection import PROCESS_VM_READV_SUPPORTED - from _testexternalinspection import get_stack_trace - from _testexternalinspection import get_async_stack_trace - from _testexternalinspection import get_all_awaited_by + from _remotedebugging import PROCESS_VM_READV_SUPPORTED + from _remotedebugging import get_stack_trace + from _remotedebugging import get_async_stack_trace + from _remotedebugging import get_all_awaited_by except ImportError: - raise unittest.SkipTest( - "Test only runs when _testexternalinspection is available") + raise unittest.SkipTest("Test only runs when _remotedebuggingmodule is available") + def _make_test_script(script_dir, script_basename, source): to_return = make_script(script_dir, script_basename, source) importlib.invalidate_caches() return to_return -skip_if_not_supported = unittest.skipIf((sys.platform != "darwin" - and sys.platform != "linux" - and sys.platform != "win32"), - "Test only runs on Linux, Windows and MacOS") + +skip_if_not_supported = unittest.skipIf( + (sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32"), + "Test only runs on Linux, Windows and MacOS", +) + + class TestGetStackTrace(unittest.TestCase): @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import time, sys, socket # Connect to the test process sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -56,7 +64,8 @@ class TestGetStackTrace(unittest.TestCase): time.sleep(1000) bar() - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -65,11 +74,11 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -87,22 +96,24 @@ class TestGetStackTrace(unittest.TestCase): p.terminate() p.wait(timeout=SHORT_TIMEOUT) - expected_stack_trace = [ - 'foo', - 'baz', - 'bar', - '<module>' + ("foo", script_name, 15), + ("baz", script_name, 11), + ("bar", script_name, 9), + ("<module>", script_name, 17), ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import time import sys @@ -142,7 +153,8 @@ class TestGetStackTrace(unittest.TestCase): return loop asyncio.run(main(), loop_factory={{TASK_FACTORY}}) - """) + """ + ) stack_trace = None for task_factory_variant in "asyncio.new_event_loop", "new_eager_loop": with ( @@ -153,25 +165,24 @@ class TestGetStackTrace(unittest.TestCase): os.mkdir(script_dir) server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) script_name = _make_test_script( - script_dir, 'script', - script.format(TASK_FACTORY=task_factory_variant)) + script_dir, + "script", + script.format(TASK_FACTORY=task_factory_variant), + ) client_socket = None try: - p = subprocess.Popen( - [sys.executable, script_name] - ) + p = subprocess.Popen([sys.executable, script_name]) client_socket, _ = server_socket.accept() server_socket.close() response = client_socket.recv(1024) self.assertEqual(response, b"ready") stack_trace = get_async_stack_trace(p.pid) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -184,23 +195,91 @@ class TestGetStackTrace(unittest.TestCase): root_task = "Task-1" expected_stack_trace = [ - ["c5", "c4", "c3", "c2"], + [ + ("c5", script_name, 11), + ("c4", script_name, 15), + ("c3", script_name, 18), + ("c2", script_name, 21), + ], "c2_root", [ - [["main"], root_task, []], - [["c1"], "sub_main_1", [[["main"], root_task, []]]], - [["c1"], "sub_main_2", [[["main"], root_task, []]]], + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 27), + ], + "Task-1", + [], + ], + [ + [("c1", script_name, 24)], + "sub_main_1", + [ + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 27), + ], + "Task-1", + [], + ] + ], + ], + [ + [("c1", script_name, 24)], + "sub_main_2", + [ + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 27), + ], + "Task-1", + [], + ] + ], + ], ], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_asyncgen_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import time import sys @@ -224,7 +303,8 @@ class TestGetStackTrace(unittest.TestCase): pass asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -232,10 +312,10 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -257,17 +337,26 @@ class TestGetStackTrace(unittest.TestCase): stack_trace[2].sort(key=lambda x: x[1]) expected_stack_trace = [ - ['gen_nested_call', 'gen', 'main'], 'Task-1', [] + [ + ("gen_nested_call", script_name, 11), + ("gen", script_name, 17), + ("main", script_name, 20), + ], + "Task-1", + [], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_gather_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import time import sys @@ -292,7 +381,8 @@ class TestGetStackTrace(unittest.TestCase): await asyncio.gather(c1(), c2()) asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -300,10 +390,10 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -313,8 +403,7 @@ class TestGetStackTrace(unittest.TestCase): self.assertEqual(response, b"ready") stack_trace = get_async_stack_trace(p.pid) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -325,18 +414,23 @@ class TestGetStackTrace(unittest.TestCase): # sets are unordered, so we want to sort "awaited_by"s stack_trace[2].sort(key=lambda x: x[1]) - expected_stack_trace = [ - ['deep', 'c1'], 'Task-2', [[['main'], 'Task-1', []]] + expected_stack_trace = [ + [("deep", script_name, ANY), ("c1", script_name, 16)], + "Task-2", + [[[("main", script_name, 22)], "Task-1", []]], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_staggered_race_remote_stack_trace(self): # Spawn a process with some realistic Python code port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio.staggered import time import sys @@ -364,7 +458,8 @@ class TestGetStackTrace(unittest.TestCase): ) asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -372,10 +467,10 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) @@ -385,8 +480,7 @@ class TestGetStackTrace(unittest.TestCase): self.assertEqual(response, b"ready") stack_trace = get_async_stack_trace(p.pid) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -396,18 +490,35 @@ class TestGetStackTrace(unittest.TestCase): # sets are unordered, so we want to sort "awaited_by"s stack_trace[2].sort(key=lambda x: x[1]) - - expected_stack_trace = [ - ['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]] + expected_stack_trace = [ + [ + ("deep", script_name, ANY), + ("c1", script_name, 16), + ("staggered_race.<locals>.run_one_coro", staggered.__file__, ANY), + ], + "Task-2", + [ + [ + [ + ("staggered_race", staggered.__file__, ANY), + ("main", script_name, 22), + ], + "Task-1", + [], + ] + ], ] self.assertEqual(stack_trace, expected_stack_trace) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_async_global_awaited_by(self): port = find_unused_port() - script = textwrap.dedent(f"""\ + script = textwrap.dedent( + f"""\ import asyncio import os import random @@ -443,6 +554,8 @@ class TestGetStackTrace(unittest.TestCase): assert message == data.decode() writer.close() await writer.wait_closed() + # Signal we are ready to sleep + sock.sendall(b"ready") await asyncio.sleep(SHORT_TIMEOUT) async def echo_client_spam(server): @@ -452,8 +565,10 @@ class TestGetStackTrace(unittest.TestCase): random.shuffle(msg) tg.create_task(echo_client("".join(msg))) await asyncio.sleep(0) - # at least a 1000 tasks created - sock.sendall(b"ready") + # at least a 1000 tasks created. Each task will signal + # when is ready to avoid the race caused by the fact that + # tasks are waited on tg.__exit__ and we cannot signal when + # that happens otherwise # at this point all client tasks completed without assertion errors # let's wrap up the test server.close() @@ -468,7 +583,8 @@ class TestGetStackTrace(unittest.TestCase): tg.create_task(echo_client_spam(server), name="echo client spam") asyncio.run(main()) - """) + """ + ) stack_trace = None with os_helper.temp_dir() as work_dir: script_dir = os.path.join(work_dir, "script_pkg") @@ -476,17 +592,19 @@ class TestGetStackTrace(unittest.TestCase): # Create a socket server to communicate with the target process server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(('localhost', port)) + server_socket.bind(("localhost", port)) server_socket.settimeout(SHORT_TIMEOUT) server_socket.listen(1) - script_name = _make_test_script(script_dir, 'script', script) + script_name = _make_test_script(script_dir, "script", script) client_socket = None try: p = subprocess.Popen([sys.executable, script_name]) client_socket, _ = server_socket.accept() server_socket.close() - response = client_socket.recv(1024) - self.assertEqual(response, b"ready") + for _ in range(1000): + expected_response = b"ready" + response = client_socket.recv(len(expected_response)) + self.assertEqual(response, expected_response) for _ in busy_retry(SHORT_TIMEOUT): try: all_awaited_by = get_all_awaited_by(p.pid) @@ -497,7 +615,9 @@ class TestGetStackTrace(unittest.TestCase): msg = str(re) if msg.startswith("Task list appears corrupted"): continue - elif msg.startswith("Invalid linked list structure reading remote memory"): + elif msg.startswith( + "Invalid linked list structure reading remote memory" + ): continue elif msg.startswith("Unknown error reading memory"): continue @@ -516,22 +636,62 @@ class TestGetStackTrace(unittest.TestCase): # expected: at least 1000 pending tasks self.assertGreaterEqual(len(entries), 1000) # the first three tasks stem from the code structure - self.assertIn(('Task-1', []), entries) - self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries) - self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries) + self.assertIn((ANY, "Task-1", []), entries) + main_stack = [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("main", script_name, 60), + ] + self.assertIn( + (ANY, "server task", [[main_stack, ANY]]), + entries, + ) + self.assertIn( + (ANY, "echo client spam", [[main_stack, ANY]]), + entries, + ) - expected_stack = [[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]] - tasks_with_stack = [task for task in entries if task[1] == expected_stack] + expected_stack = [ + [ + [ + ( + "TaskGroup._aexit", + taskgroups.__file__, + ANY, + ), + ( + "TaskGroup.__aexit__", + taskgroups.__file__, + ANY, + ), + ("echo_client_spam", script_name, 41), + ], + ANY, + ] + ] + tasks_with_stack = [ + task for task in entries if task[2] == expected_stack + ] self.assertGreaterEqual(len(tasks_with_stack), 1000) # the final task will have some random number, but it should for # sure be one of the echo client spam horde (In windows this is not true # for some reason) if sys.platform != "win32": - self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1]) + self.assertEqual( + expected_stack, + entries[-1][2], + ) except PermissionError: - self.skipTest( - "Insufficient permissions to read the stack trace") + self.skipTest("Insufficient permissions to read the stack trace") finally: if client_socket is not None: client_socket.close() @@ -540,12 +700,21 @@ class TestGetStackTrace(unittest.TestCase): p.wait(timeout=SHORT_TIMEOUT) @skip_if_not_supported - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, - "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf( + sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support", + ) def test_self_trace(self): stack_trace = get_stack_trace(os.getpid()) - print(stack_trace) - self.assertEqual(stack_trace[0], "test_self_trace") + self.assertEqual( + stack_trace[0], + ( + "TestGetStackTrace.test_self_trace", + __file__, + self.test_self_trace.__code__.co_firstlineno + 6, + ), + ) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_genericalias.py b/Lib/test/test_genericalias.py index 5c13897b8d9..8d21ded4501 100644 --- a/Lib/test/test_genericalias.py +++ b/Lib/test/test_genericalias.py @@ -137,7 +137,9 @@ class BaseTest(unittest.TestCase): Future, _WorkItem, Morsel, DictReader, DictWriter, - array] + array, + staticmethod, + classmethod] if ctypes is not None: generic_types.extend((ctypes.Array, ctypes.LibraryLoader, ctypes.py_object)) if ValueProxy is not None: diff --git a/Lib/test/test_inspect/test_inspect.py b/Lib/test/test_inspect/test_inspect.py index 4950af42cfe..c9b37fcd8f6 100644 --- a/Lib/test/test_inspect/test_inspect.py +++ b/Lib/test/test_inspect/test_inspect.py @@ -5844,7 +5844,7 @@ class TestSignatureDefinitions(unittest.TestCase): self._test_module_has_signatures(operator) def test_os_module_has_signatures(self): - unsupported_signature = {'chmod', 'utime'} + unsupported_signature = {'chmod', 'link', 'utime'} unsupported_signature |= {name for name in ['get_terminal_size', 'posix_spawn', 'posix_spawnp', 'register_at_fork', 'startfile'] diff --git a/Lib/test/test_interpreters/test_stress.py b/Lib/test/test_interpreters/test_stress.py index 56bfc172199..fae2f38cb55 100644 --- a/Lib/test/test_interpreters/test_stress.py +++ b/Lib/test/test_interpreters/test_stress.py @@ -21,21 +21,29 @@ class StressTests(TestBase): for _ in range(100): interp = interpreters.create() alive.append(interp) + del alive + support.gc_collect() - @support.requires_resource('cpu') - @threading_helper.requires_working_threading() - def test_create_many_threaded(self): + @support.bigmemtest(size=200, memuse=32*2**20, dry_run=False) + def test_create_many_threaded(self, size): alive = [] + start = threading.Event() def task(): + # try to create all interpreters simultaneously + if not start.wait(support.SHORT_TIMEOUT): + raise TimeoutError interp = interpreters.create() alive.append(interp) - threads = (threading.Thread(target=task) for _ in range(200)) + threads = [threading.Thread(target=task) for _ in range(size)] with threading_helper.start_threads(threads): - pass + start.set() + del alive + support.gc_collect() - @support.requires_resource('cpu') @threading_helper.requires_working_threading() - def test_many_threads_running_interp_in_other_interp(self): + @support.bigmemtest(size=200, memuse=34*2**20, dry_run=False) + def test_many_threads_running_interp_in_other_interp(self, size): + start = threading.Event() interp = interpreters.create() script = f"""if True: @@ -47,6 +55,9 @@ class StressTests(TestBase): interp = interpreters.create() alreadyrunning = (f'{interpreters.InterpreterError}: ' 'interpreter already running') + # try to run all interpreters simultaneously + if not start.wait(support.SHORT_TIMEOUT): + raise TimeoutError success = False while not success: try: @@ -58,9 +69,10 @@ class StressTests(TestBase): else: success = True - threads = (threading.Thread(target=run) for _ in range(200)) + threads = [threading.Thread(target=run) for _ in range(size)] with threading_helper.start_threads(threads): - pass + start.set() + support.gc_collect() if __name__ == '__main__': diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 545643aa455..5a8f1949baa 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -1373,6 +1373,28 @@ class CommonBufferedTests: with self.assertRaises(AttributeError): buf.raw = x + def test_pickling_subclass(self): + global MyBufferedIO + class MyBufferedIO(self.tp): + def __init__(self, raw, tag): + super().__init__(raw) + self.tag = tag + def __getstate__(self): + return self.tag, self.raw.getvalue() + def __setstate__(slf, state): + tag, value = state + slf.__init__(self.BytesIO(value), tag) + + raw = self.BytesIO(b'data') + buf = MyBufferedIO(raw, tag='ham') + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + pickled = pickle.dumps(buf, proto) + newbuf = pickle.loads(pickled) + self.assertEqual(newbuf.raw.getvalue(), b'data') + self.assertEqual(newbuf.tag, 'ham') + del MyBufferedIO + class SizeofTest: @@ -3950,6 +3972,28 @@ class TextIOWrapperTest(unittest.TestCase): f.write(res) self.assertEqual(res + f.readline(), 'foo\nbar\n') + def test_pickling_subclass(self): + global MyTextIO + class MyTextIO(self.TextIOWrapper): + def __init__(self, raw, tag): + super().__init__(raw) + self.tag = tag + def __getstate__(self): + return self.tag, self.buffer.getvalue() + def __setstate__(slf, state): + tag, value = state + slf.__init__(self.BytesIO(value), tag) + + raw = self.BytesIO(b'data') + txt = MyTextIO(raw, 'ham') + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + pickled = pickle.dumps(txt, proto) + newtxt = pickle.loads(pickled) + self.assertEqual(newtxt.buffer.getvalue(), b'data') + self.assertEqual(newtxt.tag, 'ham') + del MyTextIO + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") def test_read_non_blocking(self): import os diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index c9cbe1541e7..b6a07f214fa 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -1521,6 +1521,51 @@ class PosixTester(unittest.TestCase): self.assertEqual(cm.exception.errno, errno.EINVAL) os.close(os.pidfd_open(os.getpid(), 0)) + @unittest.skipUnless(hasattr(os, "link"), "test needs os.link()") + @support.skip_android_selinux('hard links to symbolic links') + def test_link_follow_symlinks(self): + default_follow = sys.platform.startswith( + ('darwin', 'freebsd', 'netbsd', 'openbsd', 'dragonfly', 'sunos5')) + default_no_follow = sys.platform.startswith(('win32', 'linux')) + orig = os_helper.TESTFN + symlink = orig + 'symlink' + posix.symlink(orig, symlink) + self.addCleanup(os_helper.unlink, symlink) + + with self.subTest('no follow_symlinks'): + # no follow_symlinks -> platform depending + link = orig + 'link' + posix.link(symlink, link) + self.addCleanup(os_helper.unlink, link) + if os.link in os.supports_follow_symlinks or default_follow: + self.assertEqual(posix.lstat(link), posix.lstat(orig)) + elif default_no_follow: + self.assertEqual(posix.lstat(link), posix.lstat(symlink)) + + with self.subTest('follow_symlinks=False'): + # follow_symlinks=False -> duplicate the symlink itself + link = orig + 'link_nofollow' + try: + posix.link(symlink, link, follow_symlinks=False) + except NotImplementedError: + if os.link in os.supports_follow_symlinks or default_no_follow: + raise + else: + self.addCleanup(os_helper.unlink, link) + self.assertEqual(posix.lstat(link), posix.lstat(symlink)) + + with self.subTest('follow_symlinks=True'): + # follow_symlinks=True -> duplicate the target file + link = orig + 'link_following' + try: + posix.link(symlink, link, follow_symlinks=True) + except NotImplementedError: + if os.link in os.supports_follow_symlinks or default_follow: + raise + else: + self.addCleanup(os_helper.unlink, link) + self.assertEqual(posix.lstat(link), posix.lstat(orig)) + # tests for the posix *at functions follow class TestPosixDirFd(unittest.TestCase): diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py index e4c44c78d4a..9fbe94fcdd6 100644 --- a/Lib/test/test_remote_pdb.py +++ b/Lib/test/test_remote_pdb.py @@ -531,6 +531,44 @@ class PdbClientTestCase(unittest.TestCase): expected_state={"state": "pdb"}, ) + def test_multiline_completion_in_pdb_state(self): + """Test requesting tab completions at a (Pdb) continuation prompt.""" + # GIVEN + incoming = [ + ("server", {"prompt": "(Pdb) ", "state": "pdb"}), + ("user", {"prompt": "(Pdb) ", "input": "if True:"}), + ( + "user", + { + "prompt": "... ", + "completion_request": { + "line": " b", + "begidx": 4, + "endidx": 5, + }, + "input": " bool()", + }, + ), + ("server", {"completions": ["bin", "bool", "bytes"]}), + ("user", {"prompt": "... ", "input": ""}), + ] + self.do_test( + incoming=incoming, + expected_outgoing=[ + { + "complete": { + "text": "b", + "line": "! b", + "begidx": 2, + "endidx": 3, + } + }, + {"reply": "if True:\n bool()\n"}, + ], + expected_completions=["bin", "bool", "bytes"], + expected_state={"state": "pdb"}, + ) + def test_completion_in_interact_state(self): """Test requesting tab completions at a >>> prompt.""" incoming = [ diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 56413d00823..10c3e0e9a1d 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -1960,7 +1960,7 @@ def _supports_remote_attaching(): PROCESS_VM_READV_SUPPORTED = False try: - from _testexternalinspection import PROCESS_VM_READV_SUPPORTED + from _remotedebuggingmodule import PROCESS_VM_READV_SUPPORTED except ImportError: pass diff --git a/Lib/typing.py b/Lib/typing.py index f70dcd0b5b7..e019c597580 100644 --- a/Lib/typing.py +++ b/Lib/typing.py @@ -2906,7 +2906,7 @@ class NamedTupleMeta(type): types = ns["__annotations__"] field_names = list(types) annotate = _make_eager_annotate(types) - elif (original_annotate := _lazy_annotationlib.get_annotate_function(ns)) is not None: + elif (original_annotate := _lazy_annotationlib.get_annotate_from_class_namespace(ns)) is not None: types = _lazy_annotationlib.call_annotate_function( original_annotate, _lazy_annotationlib.Format.FORWARDREF) field_names = list(types) @@ -3092,7 +3092,7 @@ class _TypedDictMeta(type): if "__annotations__" in ns: own_annotate = None own_annotations = ns["__annotations__"] - elif (own_annotate := _lazy_annotationlib.get_annotate_function(ns)) is not None: + elif (own_annotate := _lazy_annotationlib.get_annotate_from_class_namespace(ns)) is not None: own_annotations = _lazy_annotationlib.call_annotate_function( own_annotate, _lazy_annotationlib.Format.FORWARDREF, owner=tp_dict ) |