aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/annotationlib.py379
-rw-r--r--Lib/ast.py4
-rw-r--r--Lib/asyncio/__main__.py32
-rw-r--r--Lib/asyncio/tools.py214
-rw-r--r--Lib/cmd.py2
-rw-r--r--Lib/pdb.py13
-rw-r--r--Lib/test/test_annotationlib.py260
-rw-r--r--Lib/test/test_ast/test_ast.py182
-rw-r--r--Lib/test/test_asyncio/test_tools.py839
-rw-r--r--Lib/test/test_cmd.py24
-rw-r--r--Lib/test/test_curses.py37
-rw-r--r--Lib/test/test_external_inspection.py345
-rw-r--r--Lib/test/test_genericalias.py4
-rw-r--r--Lib/test/test_inspect/test_inspect.py2
-rw-r--r--Lib/test/test_interpreters/test_stress.py30
-rw-r--r--Lib/test/test_io.py44
-rw-r--r--Lib/test/test_posix.py45
-rw-r--r--Lib/test/test_remote_pdb.py38
-rw-r--r--Lib/test/test_sys.py2
-rw-r--r--Lib/typing.py4
20 files changed, 2235 insertions, 265 deletions
diff --git a/Lib/annotationlib.py b/Lib/annotationlib.py
index 971f636f971..5ad0893106a 100644
--- a/Lib/annotationlib.py
+++ b/Lib/annotationlib.py
@@ -12,7 +12,7 @@ __all__ = [
"ForwardRef",
"call_annotate_function",
"call_evaluate_function",
- "get_annotate_function",
+ "get_annotate_from_class_namespace",
"get_annotations",
"annotations_to_string",
"type_repr",
@@ -38,6 +38,7 @@ _SLOTS = (
"__weakref__",
"__arg__",
"__globals__",
+ "__extra_names__",
"__code__",
"__ast_node__",
"__cell__",
@@ -82,6 +83,7 @@ class ForwardRef:
# is created through __class__ assignment on a _Stringifier object.
self.__globals__ = None
self.__cell__ = None
+ self.__extra_names__ = None
# These are initially None but serve as a cache and may be set to a non-None
# value later.
self.__code__ = None
@@ -90,11 +92,28 @@ class ForwardRef:
def __init_subclass__(cls, /, *args, **kwds):
raise TypeError("Cannot subclass ForwardRef")
- def evaluate(self, *, globals=None, locals=None, type_params=None, owner=None):
+ def evaluate(
+ self,
+ *,
+ globals=None,
+ locals=None,
+ type_params=None,
+ owner=None,
+ format=Format.VALUE,
+ ):
"""Evaluate the forward reference and return the value.
If the forward reference cannot be evaluated, raise an exception.
"""
+ match format:
+ case Format.STRING:
+ return self.__forward_arg__
+ case Format.VALUE:
+ is_forwardref_format = False
+ case Format.FORWARDREF:
+ is_forwardref_format = True
+ case _:
+ raise NotImplementedError(format)
if self.__cell__ is not None:
try:
return self.__cell__.cell_contents
@@ -151,21 +170,42 @@ class ForwardRef:
if not self.__forward_is_class__ or param_name not in globals:
globals[param_name] = param
locals.pop(param_name, None)
+ if self.__extra_names__:
+ locals = {**locals, **self.__extra_names__}
arg = self.__forward_arg__
if arg.isidentifier() and not keyword.iskeyword(arg):
if arg in locals:
- value = locals[arg]
+ return locals[arg]
elif arg in globals:
- value = globals[arg]
+ return globals[arg]
elif hasattr(builtins, arg):
return getattr(builtins, arg)
+ elif is_forwardref_format:
+ return self
else:
raise NameError(arg)
else:
code = self.__forward_code__
- value = eval(code, globals=globals, locals=locals)
- return value
+ try:
+ return eval(code, globals=globals, locals=locals)
+ except Exception:
+ if not is_forwardref_format:
+ raise
+ new_locals = _StringifierDict(
+ {**builtins.__dict__, **locals},
+ globals=globals,
+ owner=owner,
+ is_class=self.__forward_is_class__,
+ format=format,
+ )
+ try:
+ result = eval(code, globals=globals, locals=new_locals)
+ except Exception:
+ return self
+ else:
+ new_locals.transmogrify()
+ return result
def _evaluate(self, globalns, localns, type_params=_sentinel, *, recursive_guard):
import typing
@@ -231,6 +271,10 @@ class ForwardRef:
and self.__forward_is_class__ == other.__forward_is_class__
and self.__cell__ == other.__cell__
and self.__owner__ == other.__owner__
+ and (
+ (tuple(sorted(self.__extra_names__.items())) if self.__extra_names__ else None) ==
+ (tuple(sorted(other.__extra_names__.items())) if other.__extra_names__ else None)
+ )
)
def __hash__(self):
@@ -241,6 +285,7 @@ class ForwardRef:
self.__forward_is_class__,
self.__cell__,
self.__owner__,
+ tuple(sorted(self.__extra_names__.items())) if self.__extra_names__ else None,
))
def __or__(self, other):
@@ -274,6 +319,7 @@ class _Stringifier:
cell=None,
*,
stringifier_dict,
+ extra_names=None,
):
# Either an AST node or a simple str (for the common case where a ForwardRef
# represent a single name).
@@ -285,6 +331,7 @@ class _Stringifier:
self.__code__ = None
self.__ast_node__ = node
self.__globals__ = globals
+ self.__extra_names__ = extra_names
self.__cell__ = cell
self.__owner__ = owner
self.__stringifier_dict__ = stringifier_dict
@@ -292,28 +339,63 @@ class _Stringifier:
def __convert_to_ast(self, other):
if isinstance(other, _Stringifier):
if isinstance(other.__ast_node__, str):
- return ast.Name(id=other.__ast_node__)
- return other.__ast_node__
- elif isinstance(other, slice):
+ return ast.Name(id=other.__ast_node__), other.__extra_names__
+ return other.__ast_node__, other.__extra_names__
+ elif (
+ # In STRING format we don't bother with the create_unique_name() dance;
+ # it's better to emit the repr() of the object instead of an opaque name.
+ self.__stringifier_dict__.format == Format.STRING
+ or other is None
+ or type(other) in (str, int, float, bool, complex)
+ ):
+ return ast.Constant(value=other), None
+ elif type(other) is dict:
+ extra_names = {}
+ keys = []
+ values = []
+ for key, value in other.items():
+ new_key, new_extra_names = self.__convert_to_ast(key)
+ if new_extra_names is not None:
+ extra_names.update(new_extra_names)
+ keys.append(new_key)
+ new_value, new_extra_names = self.__convert_to_ast(value)
+ if new_extra_names is not None:
+ extra_names.update(new_extra_names)
+ values.append(new_value)
+ return ast.Dict(keys, values), extra_names
+ elif type(other) in (list, tuple, set):
+ extra_names = {}
+ elts = []
+ for elt in other:
+ new_elt, new_extra_names = self.__convert_to_ast(elt)
+ if new_extra_names is not None:
+ extra_names.update(new_extra_names)
+ elts.append(new_elt)
+ ast_class = {list: ast.List, tuple: ast.Tuple, set: ast.Set}[type(other)]
+ return ast_class(elts), extra_names
+ else:
+ name = self.__stringifier_dict__.create_unique_name()
+ return ast.Name(id=name), {name: other}
+
+ def __convert_to_ast_getitem(self, other):
+ if isinstance(other, slice):
+ extra_names = {}
+
+ def conv(obj):
+ if obj is None:
+ return None
+ new_obj, new_extra_names = self.__convert_to_ast(obj)
+ if new_extra_names is not None:
+ extra_names.update(new_extra_names)
+ return new_obj
+
return ast.Slice(
- lower=(
- self.__convert_to_ast(other.start)
- if other.start is not None
- else None
- ),
- upper=(
- self.__convert_to_ast(other.stop)
- if other.stop is not None
- else None
- ),
- step=(
- self.__convert_to_ast(other.step)
- if other.step is not None
- else None
- ),
- )
+ lower=conv(other.start),
+ upper=conv(other.stop),
+ step=conv(other.step),
+ ), extra_names
else:
- return ast.Constant(value=other)
+ return self.__convert_to_ast(other)
def __get_ast(self):
node = self.__ast_node__
@@ -321,13 +403,19 @@ class _Stringifier:
return ast.Name(id=node)
return node
- def __make_new(self, node):
+ def __make_new(self, node, extra_names=None):
+ new_extra_names = {}
+ if self.__extra_names__ is not None:
+ new_extra_names.update(self.__extra_names__)
+ if extra_names is not None:
+ new_extra_names.update(extra_names)
stringifier = _Stringifier(
node,
self.__globals__,
self.__owner__,
self.__forward_is_class__,
stringifier_dict=self.__stringifier_dict__,
+ extra_names=new_extra_names or None,
)
self.__stringifier_dict__.stringifiers.append(stringifier)
return stringifier
@@ -343,27 +431,37 @@ class _Stringifier:
if self.__ast_node__ == "__classdict__":
raise KeyError
if isinstance(other, tuple):
- elts = [self.__convert_to_ast(elt) for elt in other]
+ extra_names = {}
+ elts = []
+ for elt in other:
+ new_elt, new_extra_names = self.__convert_to_ast_getitem(elt)
+ if new_extra_names is not None:
+ extra_names.update(new_extra_names)
+ elts.append(new_elt)
other = ast.Tuple(elts)
else:
- other = self.__convert_to_ast(other)
+ other, extra_names = self.__convert_to_ast_getitem(other)
assert isinstance(other, ast.AST), repr(other)
- return self.__make_new(ast.Subscript(self.__get_ast(), other))
+ return self.__make_new(ast.Subscript(self.__get_ast(), other), extra_names)
def __getattr__(self, attr):
return self.__make_new(ast.Attribute(self.__get_ast(), attr))
def __call__(self, *args, **kwargs):
- return self.__make_new(
- ast.Call(
- self.__get_ast(),
- [self.__convert_to_ast(arg) for arg in args],
- [
- ast.keyword(key, self.__convert_to_ast(value))
- for key, value in kwargs.items()
- ],
- )
- )
+ extra_names = {}
+ ast_args = []
+ for arg in args:
+ new_arg, new_extra_names = self.__convert_to_ast(arg)
+ if new_extra_names is not None:
+ extra_names.update(new_extra_names)
+ ast_args.append(new_arg)
+ ast_kwargs = []
+ for key, value in kwargs.items():
+ new_value, new_extra_names = self.__convert_to_ast(value)
+ if new_extra_names is not None:
+ extra_names.update(new_extra_names)
+ ast_kwargs.append(ast.keyword(key, new_value))
+ return self.__make_new(ast.Call(self.__get_ast(), ast_args, ast_kwargs), extra_names)
def __iter__(self):
yield self.__make_new(ast.Starred(self.__get_ast()))
@@ -378,8 +476,9 @@ class _Stringifier:
def _make_binop(op: ast.AST):
def binop(self, other):
+ rhs, extra_names = self.__convert_to_ast(other)
return self.__make_new(
- ast.BinOp(self.__get_ast(), op, self.__convert_to_ast(other))
+ ast.BinOp(self.__get_ast(), op, rhs), extra_names
)
return binop
@@ -402,8 +501,9 @@ class _Stringifier:
def _make_rbinop(op: ast.AST):
def rbinop(self, other):
+ new_other, extra_names = self.__convert_to_ast(other)
return self.__make_new(
- ast.BinOp(self.__convert_to_ast(other), op, self.__get_ast())
+ ast.BinOp(new_other, op, self.__get_ast()), extra_names
)
return rbinop
@@ -426,12 +526,14 @@ class _Stringifier:
def _make_compare(op):
def compare(self, other):
+ rhs, extra_names = self.__convert_to_ast(other)
return self.__make_new(
ast.Compare(
left=self.__get_ast(),
ops=[op],
- comparators=[self.__convert_to_ast(other)],
- )
+ comparators=[rhs],
+ ),
+ extra_names,
)
return compare
@@ -459,13 +561,15 @@ class _Stringifier:
class _StringifierDict(dict):
- def __init__(self, namespace, globals=None, owner=None, is_class=False):
+ def __init__(self, namespace, *, globals=None, owner=None, is_class=False, format):
super().__init__(namespace)
self.namespace = namespace
self.globals = globals
self.owner = owner
self.is_class = is_class
self.stringifiers = []
+ self.next_id = 1
+ self.format = format
def __missing__(self, key):
fwdref = _Stringifier(
@@ -478,6 +582,19 @@ class _StringifierDict(dict):
self.stringifiers.append(fwdref)
return fwdref
+ def transmogrify(self):
+ for obj in self.stringifiers:
+ obj.__class__ = ForwardRef
+ obj.__stringifier_dict__ = None # not needed for ForwardRef
+ if isinstance(obj.__ast_node__, str):
+ obj.__arg__ = obj.__ast_node__
+ obj.__ast_node__ = None
+
+ def create_unique_name(self):
+ name = f"__annotationlib_name_{self.next_id}__"
+ self.next_id += 1
+ return name
+
def call_evaluate_function(evaluate, format, *, owner=None):
"""Call an evaluate function. Evaluate functions are normally generated for
@@ -521,20 +638,11 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False):
# possibly constants if the annotate function uses them directly). We then
# convert each of those into a string to get an approximation of the
# original source.
- globals = _StringifierDict({})
- if annotate.__closure__:
- freevars = annotate.__code__.co_freevars
- new_closure = []
- for i, cell in enumerate(annotate.__closure__):
- if i < len(freevars):
- name = freevars[i]
- else:
- name = "__cell__"
- fwdref = _Stringifier(name, stringifier_dict=globals)
- new_closure.append(types.CellType(fwdref))
- closure = tuple(new_closure)
- else:
- closure = None
+ globals = _StringifierDict({}, format=format)
+ is_class = isinstance(owner, type)
+ closure = _build_closure(
+ annotate, owner, is_class, globals, allow_evaluation=False
+ )
func = types.FunctionType(
annotate.__code__,
globals,
@@ -544,9 +652,9 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False):
)
annos = func(Format.VALUE_WITH_FAKE_GLOBALS)
if _is_evaluate:
- return annos if isinstance(annos, str) else repr(annos)
+ return _stringify_single(annos)
return {
- key: val if isinstance(val, str) else repr(val)
+ key: _stringify_single(val)
for key, val in annos.items()
}
elif format == Format.FORWARDREF:
@@ -569,33 +677,43 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False):
# that returns a bool and an defined set of attributes.
namespace = {**annotate.__builtins__, **annotate.__globals__}
is_class = isinstance(owner, type)
- globals = _StringifierDict(namespace, annotate.__globals__, owner, is_class)
- if annotate.__closure__:
- freevars = annotate.__code__.co_freevars
- new_closure = []
- for i, cell in enumerate(annotate.__closure__):
- try:
- cell.cell_contents
- except ValueError:
- if i < len(freevars):
- name = freevars[i]
- else:
- name = "__cell__"
- fwdref = _Stringifier(
- name,
- cell=cell,
- owner=owner,
- globals=annotate.__globals__,
- is_class=is_class,
- stringifier_dict=globals,
- )
- globals.stringifiers.append(fwdref)
- new_closure.append(types.CellType(fwdref))
- else:
- new_closure.append(cell)
- closure = tuple(new_closure)
+ globals = _StringifierDict(
+ namespace,
+ globals=annotate.__globals__,
+ owner=owner,
+ is_class=is_class,
+ format=format,
+ )
+ closure = _build_closure(
+ annotate, owner, is_class, globals, allow_evaluation=True
+ )
+ func = types.FunctionType(
+ annotate.__code__,
+ globals,
+ closure=closure,
+ argdefs=annotate.__defaults__,
+ kwdefaults=annotate.__kwdefaults__,
+ )
+ try:
+ result = func(Format.VALUE_WITH_FAKE_GLOBALS)
+ except Exception:
+ pass
else:
- closure = None
+ globals.transmogrify()
+ return result
+
+ # Try again, but do not provide any globals. This allows us to return
+ # a value in certain cases where an exception gets raised during evaluation.
+ globals = _StringifierDict(
+ {},
+ globals=annotate.__globals__,
+ owner=owner,
+ is_class=is_class,
+ format=format,
+ )
+ closure = _build_closure(
+ annotate, owner, is_class, globals, allow_evaluation=False
+ )
func = types.FunctionType(
annotate.__code__,
globals,
@@ -604,13 +722,21 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False):
kwdefaults=annotate.__kwdefaults__,
)
result = func(Format.VALUE_WITH_FAKE_GLOBALS)
- for obj in globals.stringifiers:
- obj.__class__ = ForwardRef
- obj.__stringifier_dict__ = None # not needed for ForwardRef
- if isinstance(obj.__ast_node__, str):
- obj.__arg__ = obj.__ast_node__
- obj.__ast_node__ = None
- return result
+ globals.transmogrify()
+ if _is_evaluate:
+ if isinstance(result, ForwardRef):
+ return result.evaluate(format=Format.FORWARDREF)
+ else:
+ return result
+ else:
+ return {
+ key: (
+ val.evaluate(format=Format.FORWARDREF)
+ if isinstance(val, ForwardRef)
+ else val
+ )
+ for key, val in result.items()
+ }
elif format == Format.VALUE:
# Should be impossible because __annotate__ functions must not raise
# NotImplementedError for this format.
@@ -619,20 +745,59 @@ def call_annotate_function(annotate, format, *, owner=None, _is_evaluate=False):
raise ValueError(f"Invalid format: {format!r}")
-def get_annotate_function(obj):
- """Get the __annotate__ function for an object.
+def _build_closure(annotate, owner, is_class, stringifier_dict, *, allow_evaluation):
+ if not annotate.__closure__:
+ return None
+ freevars = annotate.__code__.co_freevars
+ new_closure = []
+ for i, cell in enumerate(annotate.__closure__):
+ if i < len(freevars):
+ name = freevars[i]
+ else:
+ name = "__cell__"
+ new_cell = None
+ if allow_evaluation:
+ try:
+ cell.cell_contents
+ except ValueError:
+ pass
+ else:
+ new_cell = cell
+ if new_cell is None:
+ fwdref = _Stringifier(
+ name,
+ cell=cell,
+ owner=owner,
+ globals=annotate.__globals__,
+ is_class=is_class,
+ stringifier_dict=stringifier_dict,
+ )
+ stringifier_dict.stringifiers.append(fwdref)
+ new_cell = types.CellType(fwdref)
+ new_closure.append(new_cell)
+ return tuple(new_closure)
+
+
+def _stringify_single(anno):
+ if anno is ...:
+ return "..."
+ # We have to handle str specially to support PEP 563 stringified annotations.
+ elif isinstance(anno, str):
+ return anno
+ else:
+ return repr(anno)
+
- obj may be a function, class, or module, or a user-defined type with
- an `__annotate__` attribute.
+def get_annotate_from_class_namespace(obj):
+ """Retrieve the annotate function from a class namespace dictionary.
- Returns the __annotate__ function or None.
+ Return None if the namespace does not contain an annotate function.
+ This is useful in metaclass ``__new__`` methods to retrieve the annotate function.
"""
- if isinstance(obj, dict):
- try:
- return obj["__annotate__"]
- except KeyError:
- return obj.get("__annotate_func__", None)
- return getattr(obj, "__annotate__", None)
+ try:
+ return obj["__annotate__"]
+ except KeyError:
+ return obj.get("__annotate_func__", None)
def get_annotations(
@@ -724,7 +889,7 @@ def get_annotations(
# But if we didn't get it, we use __annotations__ instead.
ann = _get_dunder_annotations(obj)
if ann is not None:
- return annotations_to_string(ann)
+ return annotations_to_string(ann)
case Format.VALUE_WITH_FAKE_GLOBALS:
raise ValueError("The VALUE_WITH_FAKE_GLOBALS format is for internal use only")
case _:
@@ -832,7 +997,7 @@ def _get_and_call_annotate(obj, format):
May not return a fresh dictionary.
"""
- annotate = get_annotate_function(obj)
+ annotate = getattr(obj, "__annotate__", None)
if annotate is not None:
ann = call_annotate_function(annotate, format, owner=obj)
if not isinstance(ann, dict):
diff --git a/Lib/ast.py b/Lib/ast.py
index aa788e6eb62..af4fe8ff5a8 100644
--- a/Lib/ast.py
+++ b/Lib/ast.py
@@ -626,7 +626,7 @@ def unparse(ast_obj):
return unparser.visit(ast_obj)
-def main():
+def main(args=None):
import argparse
import sys
@@ -643,7 +643,7 @@ def main():
'column offsets')
parser.add_argument('-i', '--indent', type=int, default=3,
help='indentation of nodes (number of spaces)')
- args = parser.parse_args()
+ args = parser.parse_args(args)
if args.infile == '-':
name = '<stdin>'
diff --git a/Lib/asyncio/__main__.py b/Lib/asyncio/__main__.py
index 69f5a30cfe5..7d980bc401a 100644
--- a/Lib/asyncio/__main__.py
+++ b/Lib/asyncio/__main__.py
@@ -1,5 +1,7 @@
+import argparse
import ast
import asyncio
+import asyncio.tools
import concurrent.futures
import contextvars
import inspect
@@ -140,6 +142,36 @@ class REPLThread(threading.Thread):
if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ prog="python3 -m asyncio",
+ description="Interactive asyncio shell and CLI tools",
+ )
+ subparsers = parser.add_subparsers(help="sub-commands", dest="command")
+ ps = subparsers.add_parser(
+ "ps", help="Display a table of all pending tasks in a process"
+ )
+ ps.add_argument("pid", type=int, help="Process ID to inspect")
+ pstree = subparsers.add_parser(
+ "pstree", help="Display a tree of all pending tasks in a process"
+ )
+ pstree.add_argument("pid", type=int, help="Process ID to inspect")
+ args = parser.parse_args()
+ match args.command:
+ case "ps":
+ asyncio.tools.display_awaited_by_tasks_table(args.pid)
+ sys.exit(0)
+ case "pstree":
+ asyncio.tools.display_awaited_by_tasks_tree(args.pid)
+ sys.exit(0)
+ case None:
+ pass # continue to the interactive shell
+ case _:
+ # shouldn't happen as an invalid command-line wouldn't parse
+ # but let's keep it for the next person adding a command
+ print(f"error: unhandled command {args.command}", file=sys.stderr)
+ parser.print_usage(file=sys.stderr)
+ sys.exit(1)
+
sys.audit("cpython.run_stdin")
if os.getenv('PYTHON_BASIC_REPL'):
diff --git a/Lib/asyncio/tools.py b/Lib/asyncio/tools.py
new file mode 100644
index 00000000000..6c1f725e777
--- /dev/null
+++ b/Lib/asyncio/tools.py
@@ -0,0 +1,214 @@
+"""Tools to analyze tasks running in asyncio programs."""
+
+from dataclasses import dataclass
+from collections import defaultdict
+from itertools import count
+from enum import Enum
+import sys
+from _remotedebugging import get_all_awaited_by
+
+
+class NodeType(Enum):
+ COROUTINE = 1
+ TASK = 2
+
+
+@dataclass(frozen=True)
+class CycleFoundException(Exception):
+ """Raised when there is a cycle when drawing the call tree."""
+ cycles: list[list[int]]
+ id2name: dict[int, str]
+
+
+# ─── indexing helpers ───────────────────────────────────────────
+def _index(result):
+ id2name, awaits = {}, []
+ for _thr_id, tasks in result:
+ for tid, tname, awaited in tasks:
+ id2name[tid] = tname
+ for stack, parent_id in awaited:
+ stack = [elem[0] if isinstance(elem, tuple) else elem for elem in stack]
+ awaits.append((parent_id, stack, tid))
+ return id2name, awaits
+
+
+def _build_tree(id2name, awaits):
+ id2label = {(NodeType.TASK, tid): name for tid, name in id2name.items()}
+ children = defaultdict(list)
+ cor_names = defaultdict(dict) # (parent) -> {frame: node}
+ cor_id_seq = count(1)
+
+ def _cor_node(parent_key, frame_name):
+ """Return an existing or new (NodeType.COROUTINE, …) node under *parent_key*."""
+ bucket = cor_names[parent_key]
+ if frame_name in bucket:
+ return bucket[frame_name]
+ node_key = (NodeType.COROUTINE, f"c{next(cor_id_seq)}")
+ id2label[node_key] = frame_name
+ children[parent_key].append(node_key)
+ bucket[frame_name] = node_key
+ return node_key
+
+ # lay down parent ➜ …frames… ➜ child paths
+ for parent_id, stack, child_id in awaits:
+ cur = (NodeType.TASK, parent_id)
+ for frame in reversed(stack): # outer-most → inner-most
+ cur = _cor_node(cur, frame)
+ child_key = (NodeType.TASK, child_id)
+ if child_key not in children[cur]:
+ children[cur].append(child_key)
+
+ return id2label, children
+
+
+def _roots(id2label, children):
+ all_children = {c for kids in children.values() for c in kids}
+ return [n for n in id2label if n not in all_children]
+
+# ─── detect cycles in the task-to-task graph ───────────────────────
+def _task_graph(awaits):
+ """Return {parent_task_id: {child_task_id, …}, …}."""
+ g = defaultdict(set)
+ for parent_id, _stack, child_id in awaits:
+ g[parent_id].add(child_id)
+ return g
+
+
+def _find_cycles(graph):
+ """
+ Depth-first search for back-edges.
+
+ Returns a list of cycles (each cycle is a list of task-ids) or an
+ empty list if the graph is acyclic.
+ """
+ WHITE, GREY, BLACK = 0, 1, 2
+ color = defaultdict(lambda: WHITE)
+ path, cycles = [], []
+
+ def dfs(v):
+ color[v] = GREY
+ path.append(v)
+ for w in graph.get(v, ()):
+ if color[w] == WHITE:
+ dfs(w)
+ elif color[w] == GREY: # back-edge → cycle!
+ i = path.index(w)
+ cycles.append(path[i:] + [w]) # make a copy
+ color[v] = BLACK
+ path.pop()
+
+ for v in list(graph):
+ if color[v] == WHITE:
+ dfs(v)
+ return cycles
+
+
+# ─── PRINT TREE FUNCTION ───────────────────────────────────────
+def build_async_tree(result, task_emoji="(T)", cor_emoji=""):
+ """
+ Build a list of strings for pretty-print a async call tree.
+
+ The call tree is produced by `get_all_async_stacks()`, prefixing tasks
+ with `task_emoji` and coroutine frames with `cor_emoji`.
+ """
+ id2name, awaits = _index(result)
+ g = _task_graph(awaits)
+ cycles = _find_cycles(g)
+ if cycles:
+ raise CycleFoundException(cycles, id2name)
+ labels, children = _build_tree(id2name, awaits)
+
+ def pretty(node):
+ flag = task_emoji if node[0] == NodeType.TASK else cor_emoji
+ return f"{flag} {labels[node]}"
+
+ def render(node, prefix="", last=True, buf=None):
+ if buf is None:
+ buf = []
+ buf.append(f"{prefix}{'└── ' if last else '├── '}{pretty(node)}")
+ new_pref = prefix + (" " if last else "│ ")
+ kids = children.get(node, [])
+ for i, kid in enumerate(kids):
+ render(kid, new_pref, i == len(kids) - 1, buf)
+ return buf
+
+ return [render(root) for root in _roots(labels, children)]
+
+
+def build_task_table(result):
+ id2name, awaits = _index(result)
+ table = []
+ for tid, tasks in result:
+ for task_id, task_name, awaited in tasks:
+ if not awaited:
+ table.append(
+ [
+ tid,
+ hex(task_id),
+ task_name,
+ "",
+ "",
+ "0x0"
+ ]
+ )
+ for stack, awaiter_id in awaited:
+ stack = [elem[0] if isinstance(elem, tuple) else elem for elem in stack]
+ coroutine_chain = " -> ".join(stack)
+ awaiter_name = id2name.get(awaiter_id, "Unknown")
+ table.append(
+ [
+ tid,
+ hex(task_id),
+ task_name,
+ coroutine_chain,
+ awaiter_name,
+ hex(awaiter_id),
+ ]
+ )
+
+ return table
+
+def _print_cycle_exception(exception: CycleFoundException):
+ print("ERROR: await-graph contains cycles – cannot print a tree!", file=sys.stderr)
+ print("", file=sys.stderr)
+ for c in exception.cycles:
+ inames = " → ".join(exception.id2name.get(tid, hex(tid)) for tid in c)
+ print(f"cycle: {inames}", file=sys.stderr)
+
+
+def _get_awaited_by_tasks(pid: int) -> list:
+ try:
+ return get_all_awaited_by(pid)
+ except RuntimeError as e:
+ while e.__context__ is not None:
+ e = e.__context__
+ print(f"Error retrieving tasks: {e}")
+ sys.exit(1)
+
+
+def display_awaited_by_tasks_table(pid: int) -> None:
+ """Build and print a table of all pending tasks under `pid`."""
+
+ tasks = _get_awaited_by_tasks(pid)
+ table = build_task_table(tasks)
+ # Print the table in a simple tabular format
+ print(
+ f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine chain':<50} {'awaiter name':<20} {'awaiter id':<15}"
+ )
+ print("-" * 135)
+ for row in table:
+ print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<20} {row[5]:<15}")
+
+
+def display_awaited_by_tasks_tree(pid: int) -> None:
+ """Build and print a tree of all pending tasks under `pid`."""
+
+ tasks = _get_awaited_by_tasks(pid)
+ try:
+ result = build_async_tree(tasks)
+ except CycleFoundException as e:
+ _print_cycle_exception(e)
+ sys.exit(1)
+
+ for tree in result:
+ print("\n".join(tree))
diff --git a/Lib/cmd.py b/Lib/cmd.py
index 438b88aa104..51495fb3216 100644
--- a/Lib/cmd.py
+++ b/Lib/cmd.py
@@ -273,7 +273,7 @@ class Cmd:
endidx = readline.get_endidx() - stripped
if begidx>0:
cmd, args, foo = self.parseline(line)
- if cmd == '':
+ if not cmd:
compfunc = self.completedefault
else:
try:
diff --git a/Lib/pdb.py b/Lib/pdb.py
index 343cf4404d7..2aa60c75396 100644
--- a/Lib/pdb.py
+++ b/Lib/pdb.py
@@ -2933,6 +2933,7 @@ class _PdbClient:
self.completion_matches = []
self.state = "dumb"
self.write_failed = False
+ self.multiline_block = False
def _ensure_valid_message(self, msg):
# Ensure the message conforms to our protocol.
@@ -2979,6 +2980,7 @@ class _PdbClient:
self.write_failed = True
def read_command(self, prompt):
+ self.multiline_block = False
reply = input(prompt)
if self.state == "dumb":
@@ -3003,6 +3005,7 @@ class _PdbClient:
return prefix + reply
# Otherwise, valid first line of a multi-line statement
+ self.multiline_block = True
continue_prompt = "...".ljust(len(prompt))
while codeop.compile_command(reply, "<stdin>", "single") is None:
reply += "\n" + input(continue_prompt)
@@ -3105,9 +3108,13 @@ class _PdbClient:
origline = readline.get_line_buffer()
line = origline.lstrip()
- stripped = len(origline) - len(line)
- begidx = readline.get_begidx() - stripped
- endidx = readline.get_endidx() - stripped
+ if self.multiline_block:
+ # We're completing a line contained in a multi-line block.
+ # Force the remote to treat it as a Python expression.
+ line = "! " + line
+ offset = len(origline) - len(line)
+ begidx = readline.get_begidx() - offset
+ endidx = readline.get_endidx() - offset
msg = {
"complete": {
diff --git a/Lib/test/test_annotationlib.py b/Lib/test/test_annotationlib.py
index be55f044b15..13c6a2a584b 100644
--- a/Lib/test/test_annotationlib.py
+++ b/Lib/test/test_annotationlib.py
@@ -1,5 +1,6 @@
"""Tests for the annotations module."""
+import textwrap
import annotationlib
import builtins
import collections
@@ -12,7 +13,6 @@ from annotationlib import (
Format,
ForwardRef,
get_annotations,
- get_annotate_function,
annotations_to_string,
type_repr,
)
@@ -121,6 +121,28 @@ class TestForwardRefFormat(unittest.TestCase):
self.assertIsInstance(gamma_anno, ForwardRef)
self.assertEqual(gamma_anno, support.EqualToForwardRef("some < obj", owner=f))
+ def test_partially_nonexistent_union(self):
+ # Test unions with '|' syntax equal unions with typing.Union[] with some forwardrefs
+ class UnionForwardrefs:
+ pipe: str | undefined
+ union: Union[str, undefined]
+
+ annos = get_annotations(UnionForwardrefs, format=Format.FORWARDREF)
+
+ pipe = annos["pipe"]
+ self.assertIsInstance(pipe, ForwardRef)
+ self.assertEqual(
+ pipe.evaluate(globals={"undefined": int}),
+ str | int,
+ )
+ union = annos["union"]
+ self.assertIsInstance(union, Union)
+ arg1, arg2 = typing.get_args(union)
+ self.assertIs(arg1, str)
+ self.assertEqual(
+ arg2, support.EqualToForwardRef("undefined", is_class=True, owner=UnionForwardrefs)
+ )
+
class TestStringFormat(unittest.TestCase):
def test_closure(self):
@@ -251,6 +273,89 @@ class TestStringFormat(unittest.TestCase):
},
)
+ def test_getitem(self):
+ def f(x: undef1[str, undef2]):
+ pass
+ anno = get_annotations(f, format=Format.STRING)
+ self.assertEqual(anno, {"x": "undef1[str, undef2]"})
+
+ anno = get_annotations(f, format=Format.FORWARDREF)
+ fwdref = anno["x"]
+ self.assertIsInstance(fwdref, ForwardRef)
+ self.assertEqual(
+ fwdref.evaluate(globals={"undef1": dict, "undef2": float}), dict[str, float]
+ )
+
+ def test_slice(self):
+ def f(x: a[b:c]):
+ pass
+ anno = get_annotations(f, format=Format.STRING)
+ self.assertEqual(anno, {"x": "a[b:c]"})
+
+ def f(x: a[b:c, d:e]):
+ pass
+ anno = get_annotations(f, format=Format.STRING)
+ self.assertEqual(anno, {"x": "a[b:c, d:e]"})
+
+ obj = slice(1, 1, 1)
+ def f(x: obj):
+ pass
+ anno = get_annotations(f, format=Format.STRING)
+ self.assertEqual(anno, {"x": "obj"})
+
+ def test_literals(self):
+ def f(
+ a: 1,
+ b: 1.0,
+ c: "hello",
+ d: b"hello",
+ e: True,
+ f: None,
+ g: ...,
+ h: 1j,
+ ):
+ pass
+
+ anno = get_annotations(f, format=Format.STRING)
+ self.assertEqual(
+ anno,
+ {
+ "a": "1",
+ "b": "1.0",
+ "c": 'hello',
+ "d": "b'hello'",
+ "e": "True",
+ "f": "None",
+ "g": "...",
+ "h": "1j",
+ },
+ )
+
+ def test_displays(self):
+ # Simple case first
+ def f(x: a[[int, str], float]):
+ pass
+ anno = get_annotations(f, format=Format.STRING)
+ self.assertEqual(anno, {"x": "a[[int, str], float]"})
+
+ def g(
+ w: a[[int, str], float],
+ x: a[{int, str}, 3],
+ y: a[{int: str}, 4],
+ z: a[(int, str), 5],
+ ):
+ pass
+ anno = get_annotations(g, format=Format.STRING)
+ self.assertEqual(
+ anno,
+ {
+ "w": "a[[int, str], float]",
+ "x": "a[{int, str}, 3]",
+ "y": "a[{int: str}, 4]",
+ "z": "a[(int, str), 5]",
+ },
+ )
+
def test_nested_expressions(self):
def f(
nested: list[Annotated[set[int], "set of ints", 4j]],
@@ -296,6 +401,17 @@ class TestStringFormat(unittest.TestCase):
with self.assertRaisesRegex(TypeError, format_msg):
get_annotations(f, format=Format.STRING)
+ def test_shenanigans(self):
+ # In cases like this we can't reconstruct the source; test that we do something
+ # halfway reasonable.
+ def f(x: x | (1).__class__, y: (1).__class__):
+ pass
+
+ self.assertEqual(
+ get_annotations(f, format=Format.STRING),
+ {"x": "x | <class 'int'>", "y": "<class 'int'>"},
+ )
+
class TestGetAnnotations(unittest.TestCase):
def test_builtin_type(self):
@@ -901,6 +1017,58 @@ class TestGetAnnotations(unittest.TestCase):
set(results.generic_func.__type_params__),
)
+ def test_partial_evaluation(self):
+ def f(
+ x: builtins.undef,
+ y: list[int],
+ z: 1 + int,
+ a: builtins.int,
+ b: [builtins.undef, builtins.int],
+ ):
+ pass
+
+ self.assertEqual(
+ get_annotations(f, format=Format.FORWARDREF),
+ {
+ "x": support.EqualToForwardRef("builtins.undef", owner=f),
+ "y": list[int],
+ "z": support.EqualToForwardRef("1 + int", owner=f),
+ "a": int,
+ "b": [
+ support.EqualToForwardRef("builtins.undef", owner=f),
+ # We can't resolve this because we have to evaluate the whole annotation
+ support.EqualToForwardRef("builtins.int", owner=f),
+ ],
+ },
+ )
+
+ self.assertEqual(
+ get_annotations(f, format=Format.STRING),
+ {
+ "x": "builtins.undef",
+ "y": "list[int]",
+ "z": "1 + int",
+ "a": "builtins.int",
+ "b": "[builtins.undef, builtins.int]",
+ },
+ )
+
+ def test_partial_evaluation_cell(self):
+ obj = object()
+
+ class RaisesAttributeError:
+ attriberr: obj.missing
+
+ anno = get_annotations(RaisesAttributeError, format=Format.FORWARDREF)
+ self.assertEqual(
+ anno,
+ {
+ "attriberr": support.EqualToForwardRef(
+ "obj.missing", is_class=True, owner=RaisesAttributeError
+ )
+ },
+ )
+
class TestCallEvaluateFunction(unittest.TestCase):
def test_evaluation(self):
@@ -933,13 +1101,13 @@ class MetaclassTests(unittest.TestCase):
b: float
self.assertEqual(get_annotations(Meta), {"a": int})
- self.assertEqual(get_annotate_function(Meta)(Format.VALUE), {"a": int})
+ self.assertEqual(Meta.__annotate__(Format.VALUE), {"a": int})
self.assertEqual(get_annotations(X), {})
- self.assertIs(get_annotate_function(X), None)
+ self.assertIs(X.__annotate__, None)
self.assertEqual(get_annotations(Y), {"b": float})
- self.assertEqual(get_annotate_function(Y)(Format.VALUE), {"b": float})
+ self.assertEqual(Y.__annotate__(Format.VALUE), {"b": float})
def test_unannotated_meta(self):
class Meta(type):
@@ -952,13 +1120,13 @@ class MetaclassTests(unittest.TestCase):
pass
self.assertEqual(get_annotations(Meta), {})
- self.assertIs(get_annotate_function(Meta), None)
+ self.assertIs(Meta.__annotate__, None)
self.assertEqual(get_annotations(Y), {})
- self.assertIs(get_annotate_function(Y), None)
+ self.assertIs(Y.__annotate__, None)
self.assertEqual(get_annotations(X), {"a": str})
- self.assertEqual(get_annotate_function(X)(Format.VALUE), {"a": str})
+ self.assertEqual(X.__annotate__(Format.VALUE), {"a": str})
def test_ordering(self):
# Based on a sample by David Ellis
@@ -996,7 +1164,7 @@ class MetaclassTests(unittest.TestCase):
for c in classes:
with self.subTest(c=c):
self.assertEqual(get_annotations(c), c.expected_annotations)
- annotate_func = get_annotate_function(c)
+ annotate_func = getattr(c, "__annotate__", None)
if c.expected_annotations:
self.assertEqual(
annotate_func(Format.VALUE), c.expected_annotations
@@ -1005,25 +1173,39 @@ class MetaclassTests(unittest.TestCase):
self.assertIs(annotate_func, None)
-class TestGetAnnotateFunction(unittest.TestCase):
- def test_static_class(self):
- self.assertIsNone(get_annotate_function(object))
- self.assertIsNone(get_annotate_function(int))
-
- def test_unannotated_class(self):
- class C:
- pass
+class TestGetAnnotateFromClassNamespace(unittest.TestCase):
+ def test_with_metaclass(self):
+ class Meta(type):
+ def __new__(mcls, name, bases, ns):
+ annotate = annotationlib.get_annotate_from_class_namespace(ns)
+ expected = ns["expected_annotate"]
+ with self.subTest(name=name):
+ if expected:
+ self.assertIsNotNone(annotate)
+ else:
+ self.assertIsNone(annotate)
+ return super().__new__(mcls, name, bases, ns)
+
+ class HasAnnotations(metaclass=Meta):
+ expected_annotate = True
+ a: int
- self.assertIsNone(get_annotate_function(C))
+ class NoAnnotations(metaclass=Meta):
+ expected_annotate = False
- D = type("D", (), {})
- self.assertIsNone(get_annotate_function(D))
+ class CustomAnnotate(metaclass=Meta):
+ expected_annotate = True
+ def __annotate__(format):
+ return {}
- def test_annotated_class(self):
- class C:
- a: int
+ code = """
+ from __future__ import annotations
- self.assertEqual(get_annotate_function(C)(Format.VALUE), {"a": int})
+ class HasFutureAnnotations(metaclass=Meta):
+ expected_annotate = False
+ a: int
+ """
+ exec(textwrap.dedent(code), {"Meta": Meta})
class TestTypeRepr(unittest.TestCase):
@@ -1240,6 +1422,38 @@ class TestForwardRefClass(unittest.TestCase):
with self.assertRaises(TypeError):
pickle.dumps(fr, proto)
+ def test_evaluate_string_format(self):
+ fr = ForwardRef("set[Any]")
+ self.assertEqual(fr.evaluate(format=Format.STRING), "set[Any]")
+
+ def test_evaluate_forwardref_format(self):
+ fr = ForwardRef("undef")
+ evaluated = fr.evaluate(format=Format.FORWARDREF)
+ self.assertIs(fr, evaluated)
+
+ fr = ForwardRef("set[undefined]")
+ evaluated = fr.evaluate(format=Format.FORWARDREF)
+ self.assertEqual(
+ evaluated,
+ set[support.EqualToForwardRef("undefined")],
+ )
+
+ fr = ForwardRef("a + b")
+ self.assertEqual(
+ fr.evaluate(format=Format.FORWARDREF),
+ support.EqualToForwardRef("a + b"),
+ )
+ self.assertEqual(
+ fr.evaluate(format=Format.FORWARDREF, locals={"a": 1, "b": 2}),
+ 3,
+ )
+
+ fr = ForwardRef('"a" + 1')
+ self.assertEqual(
+ fr.evaluate(format=Format.FORWARDREF),
+ support.EqualToForwardRef('"a" + 1'),
+ )
+
def test_evaluate_with_type_params(self):
class Gen[T]:
alias = int
diff --git a/Lib/test/test_ast/test_ast.py b/Lib/test/test_ast/test_ast.py
index ae9db093d2e..6a9b7812ef6 100644
--- a/Lib/test/test_ast/test_ast.py
+++ b/Lib/test/test_ast/test_ast.py
@@ -1,16 +1,20 @@
import _ast_unparse
import ast
import builtins
+import contextlib
import copy
import dis
import enum
+import itertools
import os
import re
import sys
+import tempfile
import textwrap
import types
import unittest
import weakref
+from io import StringIO
from pathlib import Path
from textwrap import dedent
try:
@@ -19,7 +23,7 @@ except ImportError:
_testinternalcapi = None
from test import support
-from test.support import os_helper, script_helper
+from test.support import os_helper
from test.support import skip_emscripten_stack_overflow, skip_wasi_stack_overflow
from test.support.ast_helper import ASTTestMixin
from test.test_ast.utils import to_tuple
@@ -3232,23 +3236,169 @@ class ModuleStateTests(unittest.TestCase):
self.assertEqual(res, 0)
-class ASTMainTests(unittest.TestCase):
- # Tests `ast.main()` function.
+class CommandLineTests(unittest.TestCase):
+ def setUp(self):
+ self.filename = tempfile.mktemp()
+ self.addCleanup(os_helper.unlink, self.filename)
- def test_cli_file_input(self):
- code = "print(1, 2, 3)"
- expected = ast.dump(ast.parse(code), indent=3)
-
- with os_helper.temp_dir() as tmp_dir:
- filename = os.path.join(tmp_dir, "test_module.py")
- with open(filename, 'w', encoding='utf-8') as f:
- f.write(code)
- res, _ = script_helper.run_python_until_end("-m", "ast", filename)
+ @staticmethod
+ def text_normalize(string):
+ return textwrap.dedent(string).strip()
+
+ def set_source(self, content):
+ Path(self.filename).write_text(self.text_normalize(content))
+
+ def invoke_ast(self, *flags):
+ stderr = StringIO()
+ stdout = StringIO()
+ with (
+ contextlib.redirect_stdout(stdout),
+ contextlib.redirect_stderr(stderr),
+ ):
+ ast.main(args=[*flags, self.filename])
+ self.assertEqual(stderr.getvalue(), '')
+ return stdout.getvalue().strip()
+
+ def check_output(self, source, expect, *flags):
+ self.set_source(source)
+ res = self.invoke_ast(*flags)
+ expect = self.text_normalize(expect)
+ self.assertEqual(res, expect)
+
+ def test_invocation(self):
+ # test various combinations of parameters
+ base_flags = (
+ ('-m=exec', '--mode=exec'),
+ ('--no-type-comments', '--no-type-comments'),
+ ('-a', '--include-attributes'),
+ ('-i=4', '--indent=4'),
+ )
+ self.set_source('''
+ print(1, 2, 3)
+ def f(x: int) -> int:
+ x -= 1
+ return x
+ ''')
- self.assertEqual(res.err, b"")
- self.assertEqual(expected.splitlines(),
- res.out.decode("utf8").splitlines())
- self.assertEqual(res.rc, 0)
+ for r in range(1, len(base_flags) + 1):
+ for choices in itertools.combinations(base_flags, r=r):
+ for args in itertools.product(*choices):
+ with self.subTest(flags=args):
+ self.invoke_ast(*args)
+
+ def test_help_message(self):
+ for flag in ('-h', '--help', '--unknown'):
+ with self.subTest(flag=flag):
+ output = StringIO()
+ with self.assertRaises(SystemExit):
+ with contextlib.redirect_stderr(output):
+ ast.main(args=flag)
+ self.assertStartsWith(output.getvalue(), 'usage: ')
+
+ def test_exec_mode_flag(self):
+ # test 'python -m ast -m/--mode exec'
+ source = 'x: bool = 1 # type: ignore[assignment]'
+ expect = '''
+ Module(
+ body=[
+ AnnAssign(
+ target=Name(id='x', ctx=Store()),
+ annotation=Name(id='bool', ctx=Load()),
+ value=Constant(value=1),
+ simple=1)],
+ type_ignores=[
+ TypeIgnore(lineno=1, tag='[assignment]')])
+ '''
+ for flag in ('-m=exec', '--mode=exec'):
+ with self.subTest(flag=flag):
+ self.check_output(source, expect, flag)
+
+ def test_single_mode_flag(self):
+ # test 'python -m ast -m/--mode single'
+ source = 'pass'
+ expect = '''
+ Interactive(
+ body=[
+ Pass()])
+ '''
+ for flag in ('-m=single', '--mode=single'):
+ with self.subTest(flag=flag):
+ self.check_output(source, expect, flag)
+
+ def test_eval_mode_flag(self):
+ # test 'python -m ast -m/--mode eval'
+ source = 'print(1, 2, 3)'
+ expect = '''
+ Expression(
+ body=Call(
+ func=Name(id='print', ctx=Load()),
+ args=[
+ Constant(value=1),
+ Constant(value=2),
+ Constant(value=3)]))
+ '''
+ for flag in ('-m=eval', '--mode=eval'):
+ with self.subTest(flag=flag):
+ self.check_output(source, expect, flag)
+
+ def test_func_type_mode_flag(self):
+ # test 'python -m ast -m/--mode func_type'
+ source = '(int, str) -> list[int]'
+ expect = '''
+ FunctionType(
+ argtypes=[
+ Name(id='int', ctx=Load()),
+ Name(id='str', ctx=Load())],
+ returns=Subscript(
+ value=Name(id='list', ctx=Load()),
+ slice=Name(id='int', ctx=Load()),
+ ctx=Load()))
+ '''
+ for flag in ('-m=func_type', '--mode=func_type'):
+ with self.subTest(flag=flag):
+ self.check_output(source, expect, flag)
+
+ def test_no_type_comments_flag(self):
+ # test 'python -m ast --no-type-comments'
+ source = 'x: bool = 1 # type: ignore[assignment]'
+ expect = '''
+ Module(
+ body=[
+ AnnAssign(
+ target=Name(id='x', ctx=Store()),
+ annotation=Name(id='bool', ctx=Load()),
+ value=Constant(value=1),
+ simple=1)])
+ '''
+ self.check_output(source, expect, '--no-type-comments')
+
+ def test_include_attributes_flag(self):
+ # test 'python -m ast -a/--include-attributes'
+ source = 'pass'
+ expect = '''
+ Module(
+ body=[
+ Pass(
+ lineno=1,
+ col_offset=0,
+ end_lineno=1,
+ end_col_offset=4)])
+ '''
+ for flag in ('-a', '--include-attributes'):
+ with self.subTest(flag=flag):
+ self.check_output(source, expect, flag)
+
+ def test_indent_flag(self):
+ # test 'python -m ast -i/--indent'
+ source = 'pass'
+ expect = '''
+ Module(
+ body=[
+ Pass()])
+ '''
+ for flag in ('-i=0', '--indent=0'):
+ with self.subTest(flag=flag):
+ self.check_output(source, expect, flag)
class ASTOptimiziationTests(unittest.TestCase):
diff --git a/Lib/test/test_asyncio/test_tools.py b/Lib/test/test_asyncio/test_tools.py
new file mode 100644
index 00000000000..2caf56172c9
--- /dev/null
+++ b/Lib/test/test_asyncio/test_tools.py
@@ -0,0 +1,839 @@
+import unittest
+
+from asyncio import tools
+
+
+# mock output of get_all_awaited_by function.
+TEST_INPUTS_TREE = [
+ [
+ # test case containing a task called timer being awaited in two
+ # different subtasks part of a TaskGroup (root1 and root2) which call
+ # awaiter functions.
+ (
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (
+ 3,
+ "timer",
+ [
+ [["awaiter3", "awaiter2", "awaiter"], 4],
+ [["awaiter1_3", "awaiter1_2", "awaiter1"], 5],
+ [["awaiter1_3", "awaiter1_2", "awaiter1"], 6],
+ [["awaiter3", "awaiter2", "awaiter"], 7],
+ ],
+ ),
+ (
+ 8,
+ "root1",
+ [[["_aexit", "__aexit__", "main"], 2]],
+ ),
+ (
+ 9,
+ "root2",
+ [[["_aexit", "__aexit__", "main"], 2]],
+ ),
+ (
+ 4,
+ "child1_1",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 8,
+ ]
+ ],
+ ),
+ (
+ 6,
+ "child2_1",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 8,
+ ]
+ ],
+ ),
+ (
+ 7,
+ "child1_2",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 9,
+ ]
+ ],
+ ),
+ (
+ 5,
+ "child2_2",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 9,
+ ]
+ ],
+ ),
+ ],
+ ),
+ (0, []),
+ ),
+ (
+ [
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " └── __aexit__",
+ " └── _aexit",
+ " ├── (T) root1",
+ " │ └── bloch",
+ " │ └── blocho_caller",
+ " │ └── __aexit__",
+ " │ └── _aexit",
+ " │ ├── (T) child1_1",
+ " │ │ └── awaiter",
+ " │ │ └── awaiter2",
+ " │ │ └── awaiter3",
+ " │ │ └── (T) timer",
+ " │ └── (T) child2_1",
+ " │ └── awaiter1",
+ " │ └── awaiter1_2",
+ " │ └── awaiter1_3",
+ " │ └── (T) timer",
+ " └── (T) root2",
+ " └── bloch",
+ " └── blocho_caller",
+ " └── __aexit__",
+ " └── _aexit",
+ " ├── (T) child1_2",
+ " │ └── awaiter",
+ " │ └── awaiter2",
+ " │ └── awaiter3",
+ " │ └── (T) timer",
+ " └── (T) child2_2",
+ " └── awaiter1",
+ " └── awaiter1_2",
+ " └── awaiter1_3",
+ " └── (T) timer",
+ ]
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots
+ (
+ (
+ 9,
+ [
+ (5, "Task-5", []),
+ (6, "Task-6", [[["main2"], 5]]),
+ (7, "Task-7", [[["main2"], 5]]),
+ (8, "Task-8", [[["main2"], 5]]),
+ ],
+ ),
+ (
+ 10,
+ [
+ (1, "Task-1", []),
+ (2, "Task-2", [[["main"], 1]]),
+ (3, "Task-3", [[["main"], 1]]),
+ (4, "Task-4", [[["main"], 1]]),
+ ],
+ ),
+ (11, []),
+ (0, []),
+ ),
+ (
+ [
+ [
+ "└── (T) Task-5",
+ " └── main2",
+ " ├── (T) Task-6",
+ " ├── (T) Task-7",
+ " └── (T) Task-8",
+ ],
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " ├── (T) Task-2",
+ " ├── (T) Task-3",
+ " └── (T) Task-4",
+ ],
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots, one of them without subtasks
+ (
+ [
+ (1, [(2, "Task-5", [])]),
+ (
+ 3,
+ [
+ (4, "Task-1", []),
+ (5, "Task-2", [[["main"], 4]]),
+ (6, "Task-3", [[["main"], 4]]),
+ (7, "Task-4", [[["main"], 4]]),
+ ],
+ ),
+ (8, []),
+ (0, []),
+ ]
+ ),
+ (
+ [
+ ["└── (T) Task-5"],
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " ├── (T) Task-2",
+ " ├── (T) Task-3",
+ " └── (T) Task-4",
+ ],
+ ]
+ ),
+ ],
+]
+
+TEST_INPUTS_CYCLES_TREE = [
+ [
+ # this test case contains a cycle: two tasks awaiting each other.
+ (
+ [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (
+ 3,
+ "a",
+ [[["awaiter2"], 4], [["main"], 2]],
+ ),
+ (4, "b", [[["awaiter"], 3]]),
+ ],
+ ),
+ (0, []),
+ ]
+ ),
+ ([[4, 3, 4]]),
+ ],
+ [
+ # this test case contains two cycles
+ (
+ [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (
+ 3,
+ "A",
+ [[["nested", "nested", "task_b"], 4]],
+ ),
+ (
+ 4,
+ "B",
+ [
+ [["nested", "nested", "task_c"], 5],
+ [["nested", "nested", "task_a"], 3],
+ ],
+ ),
+ (5, "C", [[["nested", "nested"], 6]]),
+ (
+ 6,
+ "Task-2",
+ [[["nested", "nested", "task_b"], 4]],
+ ),
+ ],
+ ),
+ (0, []),
+ ]
+ ),
+ ([[4, 3, 4], [4, 6, 5, 4]]),
+ ],
+]
+
+TEST_INPUTS_TABLE = [
+ [
+ # test case containing a task called timer being awaited in two
+ # different subtasks part of a TaskGroup (root1 and root2) which call
+ # awaiter functions.
+ (
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (
+ 3,
+ "timer",
+ [
+ [["awaiter3", "awaiter2", "awaiter"], 4],
+ [["awaiter1_3", "awaiter1_2", "awaiter1"], 5],
+ [["awaiter1_3", "awaiter1_2", "awaiter1"], 6],
+ [["awaiter3", "awaiter2", "awaiter"], 7],
+ ],
+ ),
+ (
+ 8,
+ "root1",
+ [[["_aexit", "__aexit__", "main"], 2]],
+ ),
+ (
+ 9,
+ "root2",
+ [[["_aexit", "__aexit__", "main"], 2]],
+ ),
+ (
+ 4,
+ "child1_1",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 8,
+ ]
+ ],
+ ),
+ (
+ 6,
+ "child2_1",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 8,
+ ]
+ ],
+ ),
+ (
+ 7,
+ "child1_2",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 9,
+ ]
+ ],
+ ),
+ (
+ 5,
+ "child2_2",
+ [
+ [
+ ["_aexit", "__aexit__", "blocho_caller", "bloch"],
+ 9,
+ ]
+ ],
+ ),
+ ],
+ ),
+ (0, []),
+ ),
+ (
+ [
+ [1, "0x2", "Task-1", "", "", "0x0"],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "awaiter3 -> awaiter2 -> awaiter",
+ "child1_1",
+ "0x4",
+ ],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "awaiter1_3 -> awaiter1_2 -> awaiter1",
+ "child2_2",
+ "0x5",
+ ],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "awaiter1_3 -> awaiter1_2 -> awaiter1",
+ "child2_1",
+ "0x6",
+ ],
+ [
+ 1,
+ "0x3",
+ "timer",
+ "awaiter3 -> awaiter2 -> awaiter",
+ "child1_2",
+ "0x7",
+ ],
+ [
+ 1,
+ "0x8",
+ "root1",
+ "_aexit -> __aexit__ -> main",
+ "Task-1",
+ "0x2",
+ ],
+ [
+ 1,
+ "0x9",
+ "root2",
+ "_aexit -> __aexit__ -> main",
+ "Task-1",
+ "0x2",
+ ],
+ [
+ 1,
+ "0x4",
+ "child1_1",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root1",
+ "0x8",
+ ],
+ [
+ 1,
+ "0x6",
+ "child2_1",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root1",
+ "0x8",
+ ],
+ [
+ 1,
+ "0x7",
+ "child1_2",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root2",
+ "0x9",
+ ],
+ [
+ 1,
+ "0x5",
+ "child2_2",
+ "_aexit -> __aexit__ -> blocho_caller -> bloch",
+ "root2",
+ "0x9",
+ ],
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots
+ (
+ (
+ 9,
+ [
+ (5, "Task-5", []),
+ (6, "Task-6", [[["main2"], 5]]),
+ (7, "Task-7", [[["main2"], 5]]),
+ (8, "Task-8", [[["main2"], 5]]),
+ ],
+ ),
+ (
+ 10,
+ [
+ (1, "Task-1", []),
+ (2, "Task-2", [[["main"], 1]]),
+ (3, "Task-3", [[["main"], 1]]),
+ (4, "Task-4", [[["main"], 1]]),
+ ],
+ ),
+ (11, []),
+ (0, []),
+ ),
+ (
+ [
+ [9, "0x5", "Task-5", "", "", "0x0"],
+ [9, "0x6", "Task-6", "main2", "Task-5", "0x5"],
+ [9, "0x7", "Task-7", "main2", "Task-5", "0x5"],
+ [9, "0x8", "Task-8", "main2", "Task-5", "0x5"],
+ [10, "0x1", "Task-1", "", "", "0x0"],
+ [10, "0x2", "Task-2", "main", "Task-1", "0x1"],
+ [10, "0x3", "Task-3", "main", "Task-1", "0x1"],
+ [10, "0x4", "Task-4", "main", "Task-1", "0x1"],
+ ]
+ ),
+ ],
+ [
+ # test case containing two roots, one of them without subtasks
+ (
+ [
+ (1, [(2, "Task-5", [])]),
+ (
+ 3,
+ [
+ (4, "Task-1", []),
+ (5, "Task-2", [[["main"], 4]]),
+ (6, "Task-3", [[["main"], 4]]),
+ (7, "Task-4", [[["main"], 4]]),
+ ],
+ ),
+ (8, []),
+ (0, []),
+ ]
+ ),
+ (
+ [
+ [1, "0x2", "Task-5", "", "", "0x0"],
+ [3, "0x4", "Task-1", "", "", "0x0"],
+ [3, "0x5", "Task-2", "main", "Task-1", "0x4"],
+ [3, "0x6", "Task-3", "main", "Task-1", "0x4"],
+ [3, "0x7", "Task-4", "main", "Task-1", "0x4"],
+ ]
+ ),
+ ],
+ # CASES WITH CYCLES
+ [
+ # this test case contains a cycle: two tasks awaiting each other.
+ (
+ [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (
+ 3,
+ "a",
+ [[["awaiter2"], 4], [["main"], 2]],
+ ),
+ (4, "b", [[["awaiter"], 3]]),
+ ],
+ ),
+ (0, []),
+ ]
+ ),
+ (
+ [
+ [1, "0x2", "Task-1", "", "", "0x0"],
+ [1, "0x3", "a", "awaiter2", "b", "0x4"],
+ [1, "0x3", "a", "main", "Task-1", "0x2"],
+ [1, "0x4", "b", "awaiter", "a", "0x3"],
+ ]
+ ),
+ ],
+ [
+ # this test case contains two cycles
+ (
+ [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (
+ 3,
+ "A",
+ [[["nested", "nested", "task_b"], 4]],
+ ),
+ (
+ 4,
+ "B",
+ [
+ [["nested", "nested", "task_c"], 5],
+ [["nested", "nested", "task_a"], 3],
+ ],
+ ),
+ (5, "C", [[["nested", "nested"], 6]]),
+ (
+ 6,
+ "Task-2",
+ [[["nested", "nested", "task_b"], 4]],
+ ),
+ ],
+ ),
+ (0, []),
+ ]
+ ),
+ (
+ [
+ [1, "0x2", "Task-1", "", "", "0x0"],
+ [
+ 1,
+ "0x3",
+ "A",
+ "nested -> nested -> task_b",
+ "B",
+ "0x4",
+ ],
+ [
+ 1,
+ "0x4",
+ "B",
+ "nested -> nested -> task_c",
+ "C",
+ "0x5",
+ ],
+ [
+ 1,
+ "0x4",
+ "B",
+ "nested -> nested -> task_a",
+ "A",
+ "0x3",
+ ],
+ [
+ 1,
+ "0x5",
+ "C",
+ "nested -> nested",
+ "Task-2",
+ "0x6",
+ ],
+ [
+ 1,
+ "0x6",
+ "Task-2",
+ "nested -> nested -> task_b",
+ "B",
+ "0x4",
+ ],
+ ]
+ ),
+ ],
+]
+
+
+class TestAsyncioToolsTree(unittest.TestCase):
+
+ def test_asyncio_utils(self):
+ for input_, tree in TEST_INPUTS_TREE:
+ with self.subTest(input_):
+ self.assertEqual(tools.build_async_tree(input_), tree)
+
+ def test_asyncio_utils_cycles(self):
+ for input_, cycles in TEST_INPUTS_CYCLES_TREE:
+ with self.subTest(input_):
+ try:
+ tools.build_async_tree(input_)
+ except tools.CycleFoundException as e:
+ self.assertEqual(e.cycles, cycles)
+
+
+class TestAsyncioToolsTable(unittest.TestCase):
+ def test_asyncio_utils(self):
+ for input_, table in TEST_INPUTS_TABLE:
+ with self.subTest(input_):
+ self.assertEqual(tools.build_task_table(input_), table)
+
+
+class TestAsyncioToolsBasic(unittest.TestCase):
+ def test_empty_input_tree(self):
+ """Test build_async_tree with empty input."""
+ result = []
+ expected_output = []
+ self.assertEqual(tools.build_async_tree(result), expected_output)
+
+ def test_empty_input_table(self):
+ """Test build_task_table with empty input."""
+ result = []
+ expected_output = []
+ self.assertEqual(tools.build_task_table(result), expected_output)
+
+ def test_only_independent_tasks_tree(self):
+ input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])]
+ expected = [["└── (T) taskA"], ["└── (T) taskB"]]
+ result = tools.build_async_tree(input_)
+ self.assertEqual(sorted(result), sorted(expected))
+
+ def test_only_independent_tasks_table(self):
+ input_ = [(1, [(10, "taskA", []), (11, "taskB", [])])]
+ self.assertEqual(
+ tools.build_task_table(input_),
+ [[1, "0xa", "taskA", "", "", "0x0"], [1, "0xb", "taskB", "", "", "0x0"]],
+ )
+
+ def test_single_task_tree(self):
+ """Test build_async_tree with a single task and no awaits."""
+ result = [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ ],
+ )
+ ]
+ expected_output = [
+ [
+ "└── (T) Task-1",
+ ]
+ ]
+ self.assertEqual(tools.build_async_tree(result), expected_output)
+
+ def test_single_task_table(self):
+ """Test build_task_table with a single task and no awaits."""
+ result = [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ ],
+ )
+ ]
+ expected_output = [[1, "0x2", "Task-1", "", "", "0x0"]]
+ self.assertEqual(tools.build_task_table(result), expected_output)
+
+ def test_cycle_detection(self):
+ """Test build_async_tree raises CycleFoundException for cyclic input."""
+ result = [
+ (
+ 1,
+ [
+ (2, "Task-1", [[["main"], 3]]),
+ (3, "Task-2", [[["main"], 2]]),
+ ],
+ )
+ ]
+ with self.assertRaises(tools.CycleFoundException) as context:
+ tools.build_async_tree(result)
+ self.assertEqual(context.exception.cycles, [[3, 2, 3]])
+
+ def test_complex_tree(self):
+ """Test build_async_tree with a more complex tree structure."""
+ result = [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (3, "Task-2", [[["main"], 2]]),
+ (4, "Task-3", [[["main"], 3]]),
+ ],
+ )
+ ]
+ expected_output = [
+ [
+ "└── (T) Task-1",
+ " └── main",
+ " └── (T) Task-2",
+ " └── main",
+ " └── (T) Task-3",
+ ]
+ ]
+ self.assertEqual(tools.build_async_tree(result), expected_output)
+
+ def test_complex_table(self):
+ """Test build_task_table with a more complex tree structure."""
+ result = [
+ (
+ 1,
+ [
+ (2, "Task-1", []),
+ (3, "Task-2", [[["main"], 2]]),
+ (4, "Task-3", [[["main"], 3]]),
+ ],
+ )
+ ]
+ expected_output = [
+ [1, "0x2", "Task-1", "", "", "0x0"],
+ [1, "0x3", "Task-2", "main", "Task-1", "0x2"],
+ [1, "0x4", "Task-3", "main", "Task-2", "0x3"],
+ ]
+ self.assertEqual(tools.build_task_table(result), expected_output)
+
+ def test_deep_coroutine_chain(self):
+ input_ = [
+ (
+ 1,
+ [
+ (10, "leaf", [[["c1", "c2", "c3", "c4", "c5"], 11]]),
+ (11, "root", []),
+ ],
+ )
+ ]
+ expected = [
+ [
+ "└── (T) root",
+ " └── c5",
+ " └── c4",
+ " └── c3",
+ " └── c2",
+ " └── c1",
+ " └── (T) leaf",
+ ]
+ ]
+ result = tools.build_async_tree(input_)
+ self.assertEqual(result, expected)
+
+ def test_multiple_cycles_same_node(self):
+ input_ = [
+ (
+ 1,
+ [
+ (1, "Task-A", [[["call1"], 2]]),
+ (2, "Task-B", [[["call2"], 3]]),
+ (3, "Task-C", [[["call3"], 1], [["call4"], 2]]),
+ ],
+ )
+ ]
+ with self.assertRaises(tools.CycleFoundException) as ctx:
+ tools.build_async_tree(input_)
+ cycles = ctx.exception.cycles
+ self.assertTrue(any(set(c) == {1, 2, 3} for c in cycles))
+
+ def test_table_output_format(self):
+ input_ = [(1, [(1, "Task-A", [[["foo"], 2]]), (2, "Task-B", [])])]
+ table = tools.build_task_table(input_)
+ for row in table:
+ self.assertEqual(len(row), 6)
+ self.assertIsInstance(row[0], int) # thread ID
+ self.assertTrue(
+ isinstance(row[1], str) and row[1].startswith("0x")
+ ) # hex task ID
+ self.assertIsInstance(row[2], str) # task name
+ self.assertIsInstance(row[3], str) # coroutine chain
+ self.assertIsInstance(row[4], str) # awaiter name
+ self.assertTrue(
+ isinstance(row[5], str) and row[5].startswith("0x")
+ ) # hex awaiter ID
+
+
+class TestAsyncioToolsEdgeCases(unittest.TestCase):
+
+ def test_task_awaits_self(self):
+ """A task directly awaits itself – should raise a cycle."""
+ input_ = [(1, [(1, "Self-Awaiter", [[["loopback"], 1]])])]
+ with self.assertRaises(tools.CycleFoundException) as ctx:
+ tools.build_async_tree(input_)
+ self.assertIn([1, 1], ctx.exception.cycles)
+
+ def test_task_with_missing_awaiter_id(self):
+ """Awaiter ID not in task list – should not crash, just show 'Unknown'."""
+ input_ = [(1, [(1, "Task-A", [[["coro"], 999]])])] # 999 not defined
+ table = tools.build_task_table(input_)
+ self.assertEqual(len(table), 1)
+ self.assertEqual(table[0][4], "Unknown")
+
+ def test_duplicate_coroutine_frames(self):
+ """Same coroutine frame repeated under a parent – should deduplicate."""
+ input_ = [
+ (
+ 1,
+ [
+ (1, "Task-1", [[["frameA"], 2], [["frameA"], 3]]),
+ (2, "Task-2", []),
+ (3, "Task-3", []),
+ ],
+ )
+ ]
+ tree = tools.build_async_tree(input_)
+ # Both children should be under the same coroutine node
+ flat = "\n".join(tree[0])
+ self.assertIn("frameA", flat)
+ self.assertIn("Task-2", flat)
+ self.assertIn("Task-1", flat)
+
+ flat = "\n".join(tree[1])
+ self.assertIn("frameA", flat)
+ self.assertIn("Task-3", flat)
+ self.assertIn("Task-1", flat)
+
+ def test_task_with_no_name(self):
+ """Task with no name in id2name – should still render with fallback."""
+ input_ = [(1, [(1, "root", [[["f1"], 2]]), (2, None, [])])]
+ # If name is None, fallback to string should not crash
+ tree = tools.build_async_tree(input_)
+ self.assertIn("(T) None", "\n".join(tree[0]))
+
+ def test_tree_rendering_with_custom_emojis(self):
+ """Pass custom emojis to the tree renderer."""
+ input_ = [(1, [(1, "MainTask", [[["f1", "f2"], 2]]), (2, "SubTask", [])])]
+ tree = tools.build_async_tree(input_, task_emoji="🧵", cor_emoji="🔁")
+ flat = "\n".join(tree[0])
+ self.assertIn("🧵 MainTask", flat)
+ self.assertIn("🔁 f1", flat)
+ self.assertIn("🔁 f2", flat)
+ self.assertIn("🧵 SubTask", flat)
diff --git a/Lib/test/test_cmd.py b/Lib/test/test_cmd.py
index 46ec82b7049..0ae44f3987d 100644
--- a/Lib/test/test_cmd.py
+++ b/Lib/test/test_cmd.py
@@ -289,6 +289,30 @@ class CmdTestReadline(unittest.TestCase):
self.assertIn(b'ab_completion_test', output)
self.assertIn(b'tab completion success', output)
+ def test_bang_completion_without_do_shell(self):
+ script = textwrap.dedent("""
+ import cmd
+ class simplecmd(cmd.Cmd):
+ def completedefault(self, text, line, begidx, endidx):
+ return ["hello"]
+
+ def default(self, line):
+ if line.replace(" ", "") == "!hello":
+ print('tab completion success')
+ else:
+ print('tab completion failure')
+ return True
+
+ simplecmd().cmdloop()
+ """)
+
+ # '! h' or '!h' and complete 'ello' to 'hello'
+ for input in [b"! h\t\n", b"!h\t\n"]:
+ with self.subTest(input=input):
+ output = run_pty(script, input)
+ self.assertIn(b'hello', output)
+ self.assertIn(b'tab completion success', output)
+
def load_tests(loader, tests, pattern):
tests.addTest(doctest.DocTestSuite())
return tests
diff --git a/Lib/test/test_curses.py b/Lib/test/test_curses.py
index 6fe0e7fd4b7..c307258e565 100644
--- a/Lib/test/test_curses.py
+++ b/Lib/test/test_curses.py
@@ -8,7 +8,8 @@ import unittest
from unittest.mock import MagicMock
from test.support import (requires, verbose, SaveSignals, cpython_only,
- check_disallow_instantiation, MISSING_C_DOCSTRINGS)
+ check_disallow_instantiation, MISSING_C_DOCSTRINGS,
+ gc_collect)
from test.support.import_helper import import_module
# Optionally test curses module. This currently requires that the
@@ -51,12 +52,6 @@ def requires_colors(test):
term = os.environ.get('TERM')
SHORT_MAX = 0x7fff
-DEFAULT_PAIR_CONTENTS = [
- (curses.COLOR_WHITE, curses.COLOR_BLACK),
- (0, 0),
- (-1, -1),
- (15, 0), # for xterm-256color (15 is for BRIGHT WHITE)
-]
# If newterm was supported we could use it instead of initscr and not exit
@unittest.skipIf(not term or term == 'unknown',
@@ -187,6 +182,14 @@ class TestCurses(unittest.TestCase):
self.assertEqual(win3.getparyx(), (2, 1))
self.assertEqual(win3.getmaxyx(), (6, 11))
+ def test_subwindows_references(self):
+ win = curses.newwin(5, 10)
+ win2 = win.subwin(3, 7)
+ del win
+ gc_collect()
+ del win2
+ gc_collect()
+
def test_move_cursor(self):
stdscr = self.stdscr
win = stdscr.subwin(10, 15, 2, 5)
@@ -948,8 +951,6 @@ class TestCurses(unittest.TestCase):
@requires_colors
def test_pair_content(self):
- if not hasattr(curses, 'use_default_colors'):
- self.assertIn(curses.pair_content(0), DEFAULT_PAIR_CONTENTS)
curses.pair_content(0)
maxpair = self.get_pair_limit() - 1
if maxpair > 0:
@@ -994,13 +995,27 @@ class TestCurses(unittest.TestCase):
@requires_curses_func('use_default_colors')
@requires_colors
def test_use_default_colors(self):
- old = curses.pair_content(0)
try:
curses.use_default_colors()
except curses.error:
self.skipTest('cannot change color (use_default_colors() failed)')
self.assertEqual(curses.pair_content(0), (-1, -1))
- self.assertIn(old, DEFAULT_PAIR_CONTENTS)
+
+ @requires_curses_func('assume_default_colors')
+ @requires_colors
+ def test_assume_default_colors(self):
+ try:
+ curses.assume_default_colors(-1, -1)
+ except curses.error:
+ self.skipTest('cannot change color (assume_default_colors() failed)')
+ self.assertEqual(curses.pair_content(0), (-1, -1))
+ curses.assume_default_colors(curses.COLOR_YELLOW, curses.COLOR_BLUE)
+ self.assertEqual(curses.pair_content(0), (curses.COLOR_YELLOW, curses.COLOR_BLUE))
+ curses.assume_default_colors(curses.COLOR_RED, -1)
+ self.assertEqual(curses.pair_content(0), (curses.COLOR_RED, -1))
+ curses.assume_default_colors(-1, curses.COLOR_GREEN)
+ self.assertEqual(curses.pair_content(0), (-1, curses.COLOR_GREEN))
+ curses.assume_default_colors(-1, -1)
def test_keyname(self):
# TODO: key_name()
diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py
index aa05db972f0..f787190b1ae 100644
--- a/Lib/test/test_external_inspection.py
+++ b/Lib/test/test_external_inspection.py
@@ -4,6 +4,8 @@ import textwrap
import importlib
import sys
import socket
+from asyncio import staggered, taskgroups
+from unittest.mock import ANY
from test.support import os_helper, SHORT_TIMEOUT, busy_retry
from test.support.script_helper import make_script
from test.support.socket_helper import find_unused_port
@@ -13,32 +15,38 @@ import subprocess
PROCESS_VM_READV_SUPPORTED = False
try:
- from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
- from _testexternalinspection import get_stack_trace
- from _testexternalinspection import get_async_stack_trace
- from _testexternalinspection import get_all_awaited_by
+ from _remotedebugging import PROCESS_VM_READV_SUPPORTED
+ from _remotedebugging import get_stack_trace
+ from _remotedebugging import get_async_stack_trace
+ from _remotedebugging import get_all_awaited_by
except ImportError:
- raise unittest.SkipTest(
- "Test only runs when _testexternalinspection is available")
+ raise unittest.SkipTest("Test only runs when _remotedebuggingmodule is available")
+
def _make_test_script(script_dir, script_basename, source):
to_return = make_script(script_dir, script_basename, source)
importlib.invalidate_caches()
return to_return
-skip_if_not_supported = unittest.skipIf((sys.platform != "darwin"
- and sys.platform != "linux"
- and sys.platform != "win32"),
- "Test only runs on Linux, Windows and MacOS")
+
+skip_if_not_supported = unittest.skipIf(
+ (sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32"),
+ "Test only runs on Linux, Windows and MacOS",
+)
+
+
class TestGetStackTrace(unittest.TestCase):
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import time, sys, socket
# Connect to the test process
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -56,7 +64,8 @@ class TestGetStackTrace(unittest.TestCase):
time.sleep(1000)
bar()
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -65,11 +74,11 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -87,22 +96,24 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
-
expected_stack_trace = [
- 'foo',
- 'baz',
- 'bar',
- '<module>'
+ ("foo", script_name, 15),
+ ("baz", script_name, 11),
+ ("bar", script_name, 9),
+ ("<module>", script_name, 17),
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import time
import sys
@@ -142,7 +153,8 @@ class TestGetStackTrace(unittest.TestCase):
return loop
asyncio.run(main(), loop_factory={{TASK_FACTORY}})
- """)
+ """
+ )
stack_trace = None
for task_factory_variant in "asyncio.new_event_loop", "new_eager_loop":
with (
@@ -153,25 +165,24 @@ class TestGetStackTrace(unittest.TestCase):
os.mkdir(script_dir)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
script_name = _make_test_script(
- script_dir, 'script',
- script.format(TASK_FACTORY=task_factory_variant))
+ script_dir,
+ "script",
+ script.format(TASK_FACTORY=task_factory_variant),
+ )
client_socket = None
try:
- p = subprocess.Popen(
- [sys.executable, script_name]
- )
+ p = subprocess.Popen([sys.executable, script_name])
client_socket, _ = server_socket.accept()
server_socket.close()
response = client_socket.recv(1024)
self.assertEqual(response, b"ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -184,23 +195,91 @@ class TestGetStackTrace(unittest.TestCase):
root_task = "Task-1"
expected_stack_trace = [
- ["c5", "c4", "c3", "c2"],
+ [
+ ("c5", script_name, 11),
+ ("c4", script_name, 15),
+ ("c3", script_name, 18),
+ ("c2", script_name, 21),
+ ],
"c2_root",
[
- [["main"], root_task, []],
- [["c1"], "sub_main_1", [[["main"], root_task, []]]],
- [["c1"], "sub_main_2", [[["main"], root_task, []]]],
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 27),
+ ],
+ "Task-1",
+ [],
+ ],
+ [
+ [("c1", script_name, 24)],
+ "sub_main_1",
+ [
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 27),
+ ],
+ "Task-1",
+ [],
+ ]
+ ],
+ ],
+ [
+ [("c1", script_name, 24)],
+ "sub_main_2",
+ [
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 27),
+ ],
+ "Task-1",
+ [],
+ ]
+ ],
+ ],
],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_asyncgen_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import time
import sys
@@ -224,7 +303,8 @@ class TestGetStackTrace(unittest.TestCase):
pass
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -232,10 +312,10 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -257,17 +337,26 @@ class TestGetStackTrace(unittest.TestCase):
stack_trace[2].sort(key=lambda x: x[1])
expected_stack_trace = [
- ['gen_nested_call', 'gen', 'main'], 'Task-1', []
+ [
+ ("gen_nested_call", script_name, 11),
+ ("gen", script_name, 17),
+ ("main", script_name, 20),
+ ],
+ "Task-1",
+ [],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_gather_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import time
import sys
@@ -292,7 +381,8 @@ class TestGetStackTrace(unittest.TestCase):
await asyncio.gather(c1(), c2())
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -300,10 +390,10 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -313,8 +403,7 @@ class TestGetStackTrace(unittest.TestCase):
self.assertEqual(response, b"ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -325,18 +414,23 @@ class TestGetStackTrace(unittest.TestCase):
# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])
- expected_stack_trace = [
- ['deep', 'c1'], 'Task-2', [[['main'], 'Task-1', []]]
+ expected_stack_trace = [
+ [("deep", script_name, ANY), ("c1", script_name, 16)],
+ "Task-2",
+ [[[("main", script_name, 22)], "Task-1", []]],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_staggered_race_remote_stack_trace(self):
# Spawn a process with some realistic Python code
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio.staggered
import time
import sys
@@ -364,7 +458,8 @@ class TestGetStackTrace(unittest.TestCase):
)
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -372,10 +467,10 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
@@ -385,8 +480,7 @@ class TestGetStackTrace(unittest.TestCase):
self.assertEqual(response, b"ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -396,18 +490,35 @@ class TestGetStackTrace(unittest.TestCase):
# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])
-
- expected_stack_trace = [
- ['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]]
+ expected_stack_trace = [
+ [
+ ("deep", script_name, ANY),
+ ("c1", script_name, 16),
+ ("staggered_race.<locals>.run_one_coro", staggered.__file__, ANY),
+ ],
+ "Task-2",
+ [
+ [
+ [
+ ("staggered_race", staggered.__file__, ANY),
+ ("main", script_name, 22),
+ ],
+ "Task-1",
+ [],
+ ]
+ ],
]
self.assertEqual(stack_trace, expected_stack_trace)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_async_global_awaited_by(self):
port = find_unused_port()
- script = textwrap.dedent(f"""\
+ script = textwrap.dedent(
+ f"""\
import asyncio
import os
import random
@@ -443,6 +554,8 @@ class TestGetStackTrace(unittest.TestCase):
assert message == data.decode()
writer.close()
await writer.wait_closed()
+ # Signal we are ready to sleep
+ sock.sendall(b"ready")
await asyncio.sleep(SHORT_TIMEOUT)
async def echo_client_spam(server):
@@ -452,8 +565,10 @@ class TestGetStackTrace(unittest.TestCase):
random.shuffle(msg)
tg.create_task(echo_client("".join(msg)))
await asyncio.sleep(0)
- # at least a 1000 tasks created
- sock.sendall(b"ready")
+ # at least a 1000 tasks created. Each task will signal
+ # when is ready to avoid the race caused by the fact that
+ # tasks are waited on tg.__exit__ and we cannot signal when
+ # that happens otherwise
# at this point all client tasks completed without assertion errors
# let's wrap up the test
server.close()
@@ -468,7 +583,8 @@ class TestGetStackTrace(unittest.TestCase):
tg.create_task(echo_client_spam(server), name="echo client spam")
asyncio.run(main())
- """)
+ """
+ )
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
@@ -476,17 +592,19 @@ class TestGetStackTrace(unittest.TestCase):
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind(('localhost', port))
+ server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
- script_name = _make_test_script(script_dir, 'script', script)
+ script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
client_socket, _ = server_socket.accept()
server_socket.close()
- response = client_socket.recv(1024)
- self.assertEqual(response, b"ready")
+ for _ in range(1000):
+ expected_response = b"ready"
+ response = client_socket.recv(len(expected_response))
+ self.assertEqual(response, expected_response)
for _ in busy_retry(SHORT_TIMEOUT):
try:
all_awaited_by = get_all_awaited_by(p.pid)
@@ -497,7 +615,9 @@ class TestGetStackTrace(unittest.TestCase):
msg = str(re)
if msg.startswith("Task list appears corrupted"):
continue
- elif msg.startswith("Invalid linked list structure reading remote memory"):
+ elif msg.startswith(
+ "Invalid linked list structure reading remote memory"
+ ):
continue
elif msg.startswith("Unknown error reading memory"):
continue
@@ -516,22 +636,62 @@ class TestGetStackTrace(unittest.TestCase):
# expected: at least 1000 pending tasks
self.assertGreaterEqual(len(entries), 1000)
# the first three tasks stem from the code structure
- self.assertIn(('Task-1', []), entries)
- self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries)
- self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries)
+ self.assertIn((ANY, "Task-1", []), entries)
+ main_stack = [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("main", script_name, 60),
+ ]
+ self.assertIn(
+ (ANY, "server task", [[main_stack, ANY]]),
+ entries,
+ )
+ self.assertIn(
+ (ANY, "echo client spam", [[main_stack, ANY]]),
+ entries,
+ )
- expected_stack = [[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]]
- tasks_with_stack = [task for task in entries if task[1] == expected_stack]
+ expected_stack = [
+ [
+ [
+ (
+ "TaskGroup._aexit",
+ taskgroups.__file__,
+ ANY,
+ ),
+ (
+ "TaskGroup.__aexit__",
+ taskgroups.__file__,
+ ANY,
+ ),
+ ("echo_client_spam", script_name, 41),
+ ],
+ ANY,
+ ]
+ ]
+ tasks_with_stack = [
+ task for task in entries if task[2] == expected_stack
+ ]
self.assertGreaterEqual(len(tasks_with_stack), 1000)
# the final task will have some random number, but it should for
# sure be one of the echo client spam horde (In windows this is not true
# for some reason)
if sys.platform != "win32":
- self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1])
+ self.assertEqual(
+ expected_stack,
+ entries[-1][2],
+ )
except PermissionError:
- self.skipTest(
- "Insufficient permissions to read the stack trace")
+ self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
@@ -540,12 +700,21 @@ class TestGetStackTrace(unittest.TestCase):
p.wait(timeout=SHORT_TIMEOUT)
@skip_if_not_supported
- @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
- "Test only runs on Linux with process_vm_readv support")
+ @unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+ )
def test_self_trace(self):
stack_trace = get_stack_trace(os.getpid())
- print(stack_trace)
- self.assertEqual(stack_trace[0], "test_self_trace")
+ self.assertEqual(
+ stack_trace[0],
+ (
+ "TestGetStackTrace.test_self_trace",
+ __file__,
+ self.test_self_trace.__code__.co_firstlineno + 6,
+ ),
+ )
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_genericalias.py b/Lib/test/test_genericalias.py
index 5c13897b8d9..8d21ded4501 100644
--- a/Lib/test/test_genericalias.py
+++ b/Lib/test/test_genericalias.py
@@ -137,7 +137,9 @@ class BaseTest(unittest.TestCase):
Future, _WorkItem,
Morsel,
DictReader, DictWriter,
- array]
+ array,
+ staticmethod,
+ classmethod]
if ctypes is not None:
generic_types.extend((ctypes.Array, ctypes.LibraryLoader, ctypes.py_object))
if ValueProxy is not None:
diff --git a/Lib/test/test_inspect/test_inspect.py b/Lib/test/test_inspect/test_inspect.py
index 4950af42cfe..c9b37fcd8f6 100644
--- a/Lib/test/test_inspect/test_inspect.py
+++ b/Lib/test/test_inspect/test_inspect.py
@@ -5844,7 +5844,7 @@ class TestSignatureDefinitions(unittest.TestCase):
self._test_module_has_signatures(operator)
def test_os_module_has_signatures(self):
- unsupported_signature = {'chmod', 'utime'}
+ unsupported_signature = {'chmod', 'link', 'utime'}
unsupported_signature |= {name for name in
['get_terminal_size', 'posix_spawn', 'posix_spawnp',
'register_at_fork', 'startfile']
diff --git a/Lib/test/test_interpreters/test_stress.py b/Lib/test/test_interpreters/test_stress.py
index 56bfc172199..fae2f38cb55 100644
--- a/Lib/test/test_interpreters/test_stress.py
+++ b/Lib/test/test_interpreters/test_stress.py
@@ -21,21 +21,29 @@ class StressTests(TestBase):
for _ in range(100):
interp = interpreters.create()
alive.append(interp)
+ del alive
+ support.gc_collect()
- @support.requires_resource('cpu')
- @threading_helper.requires_working_threading()
- def test_create_many_threaded(self):
+ @support.bigmemtest(size=200, memuse=32*2**20, dry_run=False)
+ def test_create_many_threaded(self, size):
alive = []
+ start = threading.Event()
def task():
+ # try to create all interpreters simultaneously
+ if not start.wait(support.SHORT_TIMEOUT):
+ raise TimeoutError
interp = interpreters.create()
alive.append(interp)
- threads = (threading.Thread(target=task) for _ in range(200))
+ threads = [threading.Thread(target=task) for _ in range(size)]
with threading_helper.start_threads(threads):
- pass
+ start.set()
+ del alive
+ support.gc_collect()
- @support.requires_resource('cpu')
@threading_helper.requires_working_threading()
- def test_many_threads_running_interp_in_other_interp(self):
+ @support.bigmemtest(size=200, memuse=34*2**20, dry_run=False)
+ def test_many_threads_running_interp_in_other_interp(self, size):
+ start = threading.Event()
interp = interpreters.create()
script = f"""if True:
@@ -47,6 +55,9 @@ class StressTests(TestBase):
interp = interpreters.create()
alreadyrunning = (f'{interpreters.InterpreterError}: '
'interpreter already running')
+ # try to run all interpreters simultaneously
+ if not start.wait(support.SHORT_TIMEOUT):
+ raise TimeoutError
success = False
while not success:
try:
@@ -58,9 +69,10 @@ class StressTests(TestBase):
else:
success = True
- threads = (threading.Thread(target=run) for _ in range(200))
+ threads = [threading.Thread(target=run) for _ in range(size)]
with threading_helper.start_threads(threads):
- pass
+ start.set()
+ support.gc_collect()
if __name__ == '__main__':
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 545643aa455..5a8f1949baa 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -1373,6 +1373,28 @@ class CommonBufferedTests:
with self.assertRaises(AttributeError):
buf.raw = x
+ def test_pickling_subclass(self):
+ global MyBufferedIO
+ class MyBufferedIO(self.tp):
+ def __init__(self, raw, tag):
+ super().__init__(raw)
+ self.tag = tag
+ def __getstate__(self):
+ return self.tag, self.raw.getvalue()
+ def __setstate__(slf, state):
+ tag, value = state
+ slf.__init__(self.BytesIO(value), tag)
+
+ raw = self.BytesIO(b'data')
+ buf = MyBufferedIO(raw, tag='ham')
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=proto):
+ pickled = pickle.dumps(buf, proto)
+ newbuf = pickle.loads(pickled)
+ self.assertEqual(newbuf.raw.getvalue(), b'data')
+ self.assertEqual(newbuf.tag, 'ham')
+ del MyBufferedIO
+
class SizeofTest:
@@ -3950,6 +3972,28 @@ class TextIOWrapperTest(unittest.TestCase):
f.write(res)
self.assertEqual(res + f.readline(), 'foo\nbar\n')
+ def test_pickling_subclass(self):
+ global MyTextIO
+ class MyTextIO(self.TextIOWrapper):
+ def __init__(self, raw, tag):
+ super().__init__(raw)
+ self.tag = tag
+ def __getstate__(self):
+ return self.tag, self.buffer.getvalue()
+ def __setstate__(slf, state):
+ tag, value = state
+ slf.__init__(self.BytesIO(value), tag)
+
+ raw = self.BytesIO(b'data')
+ txt = MyTextIO(raw, 'ham')
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=proto):
+ pickled = pickle.dumps(txt, proto)
+ newtxt = pickle.loads(pickled)
+ self.assertEqual(newtxt.buffer.getvalue(), b'data')
+ self.assertEqual(newtxt.tag, 'ham')
+ del MyTextIO
+
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_read_non_blocking(self):
import os
diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py
index c9cbe1541e7..b6a07f214fa 100644
--- a/Lib/test/test_posix.py
+++ b/Lib/test/test_posix.py
@@ -1521,6 +1521,51 @@ class PosixTester(unittest.TestCase):
self.assertEqual(cm.exception.errno, errno.EINVAL)
os.close(os.pidfd_open(os.getpid(), 0))
+ @unittest.skipUnless(hasattr(os, "link"), "test needs os.link()")
+ @support.skip_android_selinux('hard links to symbolic links')
+ def test_link_follow_symlinks(self):
+ default_follow = sys.platform.startswith(
+ ('darwin', 'freebsd', 'netbsd', 'openbsd', 'dragonfly', 'sunos5'))
+ default_no_follow = sys.platform.startswith(('win32', 'linux'))
+ orig = os_helper.TESTFN
+ symlink = orig + 'symlink'
+ posix.symlink(orig, symlink)
+ self.addCleanup(os_helper.unlink, symlink)
+
+ with self.subTest('no follow_symlinks'):
+ # no follow_symlinks -> platform depending
+ link = orig + 'link'
+ posix.link(symlink, link)
+ self.addCleanup(os_helper.unlink, link)
+ if os.link in os.supports_follow_symlinks or default_follow:
+ self.assertEqual(posix.lstat(link), posix.lstat(orig))
+ elif default_no_follow:
+ self.assertEqual(posix.lstat(link), posix.lstat(symlink))
+
+ with self.subTest('follow_symlinks=False'):
+ # follow_symlinks=False -> duplicate the symlink itself
+ link = orig + 'link_nofollow'
+ try:
+ posix.link(symlink, link, follow_symlinks=False)
+ except NotImplementedError:
+ if os.link in os.supports_follow_symlinks or default_no_follow:
+ raise
+ else:
+ self.addCleanup(os_helper.unlink, link)
+ self.assertEqual(posix.lstat(link), posix.lstat(symlink))
+
+ with self.subTest('follow_symlinks=True'):
+ # follow_symlinks=True -> duplicate the target file
+ link = orig + 'link_following'
+ try:
+ posix.link(symlink, link, follow_symlinks=True)
+ except NotImplementedError:
+ if os.link in os.supports_follow_symlinks or default_follow:
+ raise
+ else:
+ self.addCleanup(os_helper.unlink, link)
+ self.assertEqual(posix.lstat(link), posix.lstat(orig))
+
# tests for the posix *at functions follow
class TestPosixDirFd(unittest.TestCase):
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py
index e4c44c78d4a..9fbe94fcdd6 100644
--- a/Lib/test/test_remote_pdb.py
+++ b/Lib/test/test_remote_pdb.py
@@ -531,6 +531,44 @@ class PdbClientTestCase(unittest.TestCase):
expected_state={"state": "pdb"},
)
+ def test_multiline_completion_in_pdb_state(self):
+ """Test requesting tab completions at a (Pdb) continuation prompt."""
+ # GIVEN
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "if True:"}),
+ (
+ "user",
+ {
+ "prompt": "... ",
+ "completion_request": {
+ "line": " b",
+ "begidx": 4,
+ "endidx": 5,
+ },
+ "input": " bool()",
+ },
+ ),
+ ("server", {"completions": ["bin", "bool", "bytes"]}),
+ ("user", {"prompt": "... ", "input": ""}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "b",
+ "line": "! b",
+ "begidx": 2,
+ "endidx": 3,
+ }
+ },
+ {"reply": "if True:\n bool()\n"},
+ ],
+ expected_completions=["bin", "bool", "bytes"],
+ expected_state={"state": "pdb"},
+ )
+
def test_completion_in_interact_state(self):
"""Test requesting tab completions at a >>> prompt."""
incoming = [
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index 56413d00823..10c3e0e9a1d 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -1960,7 +1960,7 @@ def _supports_remote_attaching():
PROCESS_VM_READV_SUPPORTED = False
try:
- from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
+ from _remotedebuggingmodule import PROCESS_VM_READV_SUPPORTED
except ImportError:
pass
diff --git a/Lib/typing.py b/Lib/typing.py
index f70dcd0b5b7..e019c597580 100644
--- a/Lib/typing.py
+++ b/Lib/typing.py
@@ -2906,7 +2906,7 @@ class NamedTupleMeta(type):
types = ns["__annotations__"]
field_names = list(types)
annotate = _make_eager_annotate(types)
- elif (original_annotate := _lazy_annotationlib.get_annotate_function(ns)) is not None:
+ elif (original_annotate := _lazy_annotationlib.get_annotate_from_class_namespace(ns)) is not None:
types = _lazy_annotationlib.call_annotate_function(
original_annotate, _lazy_annotationlib.Format.FORWARDREF)
field_names = list(types)
@@ -3092,7 +3092,7 @@ class _TypedDictMeta(type):
if "__annotations__" in ns:
own_annotate = None
own_annotations = ns["__annotations__"]
- elif (own_annotate := _lazy_annotationlib.get_annotate_function(ns)) is not None:
+ elif (own_annotate := _lazy_annotationlib.get_annotate_from_class_namespace(ns)) is not None:
own_annotations = _lazy_annotationlib.call_annotate_function(
own_annotate, _lazy_annotationlib.Format.FORWARDREF, owner=tp_dict
)