aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/_code_definitions.py20
-rw-r--r--Lib/test/libregrtest/main.py7
-rw-r--r--Lib/test/libregrtest/setup.py9
-rw-r--r--Lib/test/subprocessdata/fd_status.py4
-rw-r--r--Lib/test/support/__init__.py29
-rw-r--r--Lib/test/support/interpreters/__init__.py31
-rw-r--r--Lib/test/test_ast/test_ast.py95
-rw-r--r--Lib/test/test_asyncgen.py9
-rw-r--r--Lib/test/test_capi/test_opt.py11
-rw-r--r--Lib/test/test_capi/test_unicode.py21
-rw-r--r--Lib/test/test_class.py1
-rw-r--r--Lib/test/test_code.py48
-rw-r--r--Lib/test/test_csv.py48
-rw-r--r--Lib/test/test_ctypes/test_incomplete.py10
-rw-r--r--Lib/test/test_dbm.py61
-rw-r--r--Lib/test/test_dbm_gnu.py27
-rw-r--r--Lib/test/test_decimal.py2
-rw-r--r--Lib/test/test_dis.py2
-rw-r--r--Lib/test/test_doctest/test_doctest.py444
-rw-r--r--Lib/test/test_email/test__header_value_parser.py45
-rw-r--r--Lib/test/test_external_inspection.py76
-rw-r--r--Lib/test/test_fractions.py8
-rw-r--r--Lib/test/test_free_threading/test_itertools.py (renamed from Lib/test/test_free_threading/test_itertools_batched.py)32
-rw-r--r--Lib/test/test_grammar.py14
-rw-r--r--Lib/test/test_hashlib.py95
-rw-r--r--Lib/test/test_http_cookiejar.py187
-rw-r--r--Lib/test/test_interpreters/test_api.py541
-rw-r--r--Lib/test/test_io.py31
-rw-r--r--Lib/test/test_json/test_dump.py8
-rw-r--r--Lib/test/test_math.py22
-rw-r--r--Lib/test/test_monitoring.py15
-rw-r--r--Lib/test/test_ntpath.py198
-rw-r--r--Lib/test/test_peepholer.py92
-rw-r--r--Lib/test/test_posixpath.py236
-rw-r--r--Lib/test/test_pyrepl/test_pyrepl.py11
-rw-r--r--Lib/test/test_queue.py20
-rw-r--r--Lib/test/test_random.py28
-rw-r--r--Lib/test/test_regrtest.py49
-rw-r--r--Lib/test/test_sqlite3/test_cli.py98
-rw-r--r--Lib/test/test_ssl.py43
-rw-r--r--Lib/test/test_syntax.py28
-rw-r--r--Lib/test/test_sys.py26
-rw-r--r--Lib/test/test_tarfile.py310
-rw-r--r--Lib/test/test_traceback.py29
-rw-r--r--Lib/test/test_urlparse.py398
-rw-r--r--Lib/test/test_zstd.py207
46 files changed, 2742 insertions, 984 deletions
diff --git a/Lib/test/_code_definitions.py b/Lib/test/_code_definitions.py
index 733a15b25f6..274beb65a6d 100644
--- a/Lib/test/_code_definitions.py
+++ b/Lib/test/_code_definitions.py
@@ -57,6 +57,15 @@ def spam_with_globals_and_builtins():
print(res)
+def spam_full_args(a, b, /, c, d, *args, e, f, **kwargs):
+ return (a, b, c, d, e, f, args, kwargs)
+
+
+def spam_full_args_with_defaults(a=-1, b=-2, /, c=-3, d=-4, *args,
+ e=-5, f=-6, **kwargs):
+ return (a, b, c, d, e, f, args, kwargs)
+
+
def spam_args_attrs_and_builtins(a, b, /, c, d, *args, e, f, **kwargs):
if args.__len__() > 2:
return None
@@ -67,6 +76,10 @@ def spam_returns_arg(x):
return x
+def spam_raises():
+ raise Exception('spam!')
+
+
def spam_with_inner_not_closure():
def eggs():
pass
@@ -177,8 +190,11 @@ TOP_FUNCTIONS = [
spam_minimal,
spam_with_builtins,
spam_with_globals_and_builtins,
+ spam_full_args,
+ spam_full_args_with_defaults,
spam_args_attrs_and_builtins,
spam_returns_arg,
+ spam_raises,
spam_with_inner_not_closure,
spam_with_inner_closure,
spam_annotated,
@@ -219,8 +235,10 @@ STATELESS_FUNCTIONS = [
spam,
spam_minimal,
spam_with_builtins,
+ spam_full_args,
spam_args_attrs_and_builtins,
spam_returns_arg,
+ spam_raises,
spam_annotated,
spam_with_inner_not_closure,
spam_with_inner_closure,
@@ -238,6 +256,7 @@ STATELESS_FUNCTIONS = [
STATELESS_CODE = [
*STATELESS_FUNCTIONS,
script_with_globals,
+ spam_full_args_with_defaults,
spam_with_globals_and_builtins,
spam_full,
]
@@ -248,6 +267,7 @@ PURE_SCRIPT_FUNCTIONS = [
script_with_explicit_empty_return,
spam_minimal,
spam_with_builtins,
+ spam_raises,
spam_with_inner_not_closure,
spam_with_inner_closure,
]
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 713cbedb299..0d9c059a938 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -543,8 +543,6 @@ class Regrtest:
self.first_runtests = runtests
self.logger.set_tests(runtests)
- setup_process()
-
if (runtests.hunt_refleak is not None) and (not self.num_workers):
# gh-109739: WindowsLoadTracker thread interferes with refleak check
use_load_tracker = False
@@ -721,10 +719,7 @@ class Regrtest:
self._execute_python(cmd, environ)
def _init(self):
- # Set sys.stdout encoder error handler to backslashreplace,
- # similar to sys.stderr error handler, to avoid UnicodeEncodeError
- # when printing a traceback or any other non-encodable character.
- sys.stdout.reconfigure(errors="backslashreplace")
+ setup_process()
if self.junit_filename and not os.path.isabs(self.junit_filename):
self.junit_filename = os.path.abspath(self.junit_filename)
diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py
index c3d1f60a400..9bfc414cd61 100644
--- a/Lib/test/libregrtest/setup.py
+++ b/Lib/test/libregrtest/setup.py
@@ -1,5 +1,6 @@
import faulthandler
import gc
+import io
import os
import random
import signal
@@ -52,6 +53,14 @@ def setup_process() -> None:
support.record_original_stdout(sys.stdout)
+ # Set sys.stdout encoder error handler to backslashreplace,
+ # similar to sys.stderr error handler, to avoid UnicodeEncodeError
+ # when printing a traceback or any other non-encodable character.
+ #
+ # Use an assertion to fix mypy error.
+ assert isinstance(sys.stdout, io.TextIOWrapper)
+ sys.stdout.reconfigure(errors="backslashreplace")
+
# Some times __path__ and __file__ are not absolute (e.g. while running from
# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
# imports might fail. This affects only the modules imported before os.chdir().
diff --git a/Lib/test/subprocessdata/fd_status.py b/Lib/test/subprocessdata/fd_status.py
index d12bd95abee..90e785981ae 100644
--- a/Lib/test/subprocessdata/fd_status.py
+++ b/Lib/test/subprocessdata/fd_status.py
@@ -2,7 +2,7 @@
file descriptors on stdout.
Usage:
-fd_stats.py: check all file descriptors
+fd_status.py: check all file descriptors (up to 255)
fd_status.py fd1 fd2 ...: check only specified file descriptors
"""
@@ -18,7 +18,7 @@ if __name__ == "__main__":
_MAXFD = os.sysconf("SC_OPEN_MAX")
except:
_MAXFD = 256
- test_fds = range(0, _MAXFD)
+ test_fds = range(0, min(_MAXFD, 256))
else:
test_fds = map(int, sys.argv[1:])
for fd in test_fds:
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 351d832a26d..48e74adcce3 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -945,6 +945,31 @@ def check_sizeof(test, o, size):
% (type(o), result, size)
test.assertEqual(result, size, msg)
+def subTests(arg_names, arg_values, /, *, _do_cleanups=False):
+ """Run multiple subtests with different parameters.
+ """
+ single_param = False
+ if isinstance(arg_names, str):
+ arg_names = arg_names.replace(',',' ').split()
+ if len(arg_names) == 1:
+ single_param = True
+ arg_values = tuple(arg_values)
+ def decorator(func):
+ if isinstance(func, type):
+ raise TypeError('subTests() can only decorate methods, not classes')
+ @functools.wraps(func)
+ def wrapper(self, /, *args, **kwargs):
+ for values in arg_values:
+ if single_param:
+ values = (values,)
+ subtest_kwargs = dict(zip(arg_names, values))
+ with self.subTest(**subtest_kwargs):
+ func(self, *args, **kwargs, **subtest_kwargs)
+ if _do_cleanups:
+ self.doCleanups()
+ return wrapper
+ return decorator
+
#=======================================================================
# Decorator/context manager for running a code in a different locale,
# correctly resetting it afterwards.
@@ -1084,7 +1109,7 @@ def set_memlimit(limit: str) -> None:
global real_max_memuse
memlimit = _parse_memlimit(limit)
if memlimit < _2G - 1:
- raise ValueError('Memory limit {limit!r} too low to be useful')
+ raise ValueError(f'Memory limit {limit!r} too low to be useful')
real_max_memuse = memlimit
memlimit = min(memlimit, MAX_Py_ssize_t)
@@ -2358,7 +2383,7 @@ def infinite_recursion(max_depth=None):
# very deep recursion.
max_depth = 20_000
elif max_depth < 3:
- raise ValueError("max_depth must be at least 3, got {max_depth}")
+ raise ValueError(f"max_depth must be at least 3, got {max_depth}")
depth = get_recursion_depth()
depth = max(depth - 1, 1) # Ignore infinite_recursion() frame.
limit = depth + max_depth
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py
index e067f259364..6d1b0690805 100644
--- a/Lib/test/support/interpreters/__init__.py
+++ b/Lib/test/support/interpreters/__init__.py
@@ -226,33 +226,32 @@ class Interpreter:
if excinfo is not None:
raise ExecutionFailed(excinfo)
- def call(self, callable, /):
- """Call the object in the interpreter with given args/kwargs.
+ def _call(self, callable, args, kwargs):
+ res, excinfo = _interpreters.call(self._id, callable, args, kwargs, restrict=True)
+ if excinfo is not None:
+ raise ExecutionFailed(excinfo)
+ return res
- Only functions that take no arguments and have no closure
- are supported.
+ def call(self, callable, /, *args, **kwargs):
+ """Call the object in the interpreter with given args/kwargs.
- The return value is discarded.
+ Nearly all callables, args, kwargs, and return values are
+ supported. All "shareable" objects are supported, as are
+ "stateless" functions (meaning non-closures that do not use
+ any globals). This method will fall back to pickle.
If the callable raises an exception then the error display
- (including full traceback) is send back between the interpreters
+ (including full traceback) is sent back between the interpreters
and an ExecutionFailed exception is raised, much like what
happens with Interpreter.exec().
"""
- # XXX Support args and kwargs.
- # XXX Support arbitrary callables.
- # XXX Support returning the return value (e.g. via pickle).
- excinfo = _interpreters.call(self._id, callable, restrict=True)
- if excinfo is not None:
- raise ExecutionFailed(excinfo)
+ return self._call(callable, args, kwargs)
- def call_in_thread(self, callable, /):
+ def call_in_thread(self, callable, /, *args, **kwargs):
"""Return a new thread that calls the object in the interpreter.
The return value and any raised exception are discarded.
"""
- def task():
- self.call(callable)
- t = threading.Thread(target=task)
+ t = threading.Thread(target=self._call, args=(callable, args, kwargs))
t.start()
return t
diff --git a/Lib/test/test_ast/test_ast.py b/Lib/test/test_ast/test_ast.py
index 46745cfa8f8..cc46529c0ef 100644
--- a/Lib/test/test_ast/test_ast.py
+++ b/Lib/test/test_ast/test_ast.py
@@ -1372,17 +1372,17 @@ class ASTHelpers_Test(unittest.TestCase):
def test_dump(self):
node = ast.parse('spam(eggs, "and cheese")')
self.assertEqual(ast.dump(node),
- "Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), "
- "args=[Name(id='eggs', ctx=Load()), Constant(value='and cheese')]))])"
+ "Module(body=[Expr(value=Call(func=Name(id='spam'), "
+ "args=[Name(id='eggs'), Constant(value='and cheese')]))])"
)
self.assertEqual(ast.dump(node, annotate_fields=False),
- "Module([Expr(Call(Name('spam', Load()), [Name('eggs', Load()), "
+ "Module([Expr(Call(Name('spam'), [Name('eggs'), "
"Constant('and cheese')]))])"
)
self.assertEqual(ast.dump(node, include_attributes=True),
- "Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load(), "
+ "Module(body=[Expr(value=Call(func=Name(id='spam', "
"lineno=1, col_offset=0, end_lineno=1, end_col_offset=4), "
- "args=[Name(id='eggs', ctx=Load(), lineno=1, col_offset=5, "
+ "args=[Name(id='eggs', lineno=1, col_offset=5, "
"end_lineno=1, end_col_offset=9), Constant(value='and cheese', "
"lineno=1, col_offset=11, end_lineno=1, end_col_offset=23)], "
"lineno=1, col_offset=0, end_lineno=1, end_col_offset=24), "
@@ -1396,18 +1396,18 @@ Module(
body=[
Expr(
value=Call(
- func=Name(id='spam', ctx=Load()),
+ func=Name(id='spam'),
args=[
- Name(id='eggs', ctx=Load()),
+ Name(id='eggs'),
Constant(value='and cheese')]))])""")
self.assertEqual(ast.dump(node, annotate_fields=False, indent='\t'), """\
Module(
\t[
\t\tExpr(
\t\t\tCall(
-\t\t\t\tName('spam', Load()),
+\t\t\t\tName('spam'),
\t\t\t\t[
-\t\t\t\t\tName('eggs', Load()),
+\t\t\t\t\tName('eggs'),
\t\t\t\t\tConstant('and cheese')]))])""")
self.assertEqual(ast.dump(node, include_attributes=True, indent=3), """\
Module(
@@ -1416,7 +1416,6 @@ Module(
value=Call(
func=Name(
id='spam',
- ctx=Load(),
lineno=1,
col_offset=0,
end_lineno=1,
@@ -1424,7 +1423,6 @@ Module(
args=[
Name(
id='eggs',
- ctx=Load(),
lineno=1,
col_offset=5,
end_lineno=1,
@@ -1454,23 +1452,23 @@ Module(
)
node = ast.Raise(exc=ast.Name(id='e', ctx=ast.Load()), lineno=3, col_offset=4)
self.assertEqual(ast.dump(node),
- "Raise(exc=Name(id='e', ctx=Load()))"
+ "Raise(exc=Name(id='e'))"
)
self.assertEqual(ast.dump(node, annotate_fields=False),
- "Raise(Name('e', Load()))"
+ "Raise(Name('e'))"
)
self.assertEqual(ast.dump(node, include_attributes=True),
- "Raise(exc=Name(id='e', ctx=Load()), lineno=3, col_offset=4)"
+ "Raise(exc=Name(id='e'), lineno=3, col_offset=4)"
)
self.assertEqual(ast.dump(node, annotate_fields=False, include_attributes=True),
- "Raise(Name('e', Load()), lineno=3, col_offset=4)"
+ "Raise(Name('e'), lineno=3, col_offset=4)"
)
node = ast.Raise(cause=ast.Name(id='e', ctx=ast.Load()))
self.assertEqual(ast.dump(node),
- "Raise(cause=Name(id='e', ctx=Load()))"
+ "Raise(cause=Name(id='e'))"
)
self.assertEqual(ast.dump(node, annotate_fields=False),
- "Raise(cause=Name('e', Load()))"
+ "Raise(cause=Name('e'))"
)
# Arguments:
node = ast.arguments(args=[ast.arg("x")])
@@ -1502,10 +1500,10 @@ Module(
[ast.Name('dataclass', ctx=ast.Load())],
)
self.assertEqual(ast.dump(node),
- "ClassDef(name='T', keywords=[keyword(arg='a', value=Constant(value=None))], decorator_list=[Name(id='dataclass', ctx=Load())])",
+ "ClassDef(name='T', keywords=[keyword(arg='a', value=Constant(value=None))], decorator_list=[Name(id='dataclass')])",
)
self.assertEqual(ast.dump(node, annotate_fields=False),
- "ClassDef('T', [], [keyword('a', Constant(None))], [], [Name('dataclass', Load())])",
+ "ClassDef('T', [], [keyword('a', Constant(None))], [], [Name('dataclass')])",
)
def test_dump_show_empty(self):
@@ -1533,7 +1531,7 @@ Module(
check_node(
# Corner case: there are no real `Name` instances with `id=''`:
ast.Name(id='', ctx=ast.Load()),
- empty="Name(id='', ctx=Load())",
+ empty="Name(id='')",
full="Name(id='', ctx=Load())",
)
@@ -1544,39 +1542,63 @@ Module(
)
check_node(
+ ast.MatchSingleton(value=[]),
+ empty="MatchSingleton(value=[])",
+ full="MatchSingleton(value=[])",
+ )
+
+ check_node(
ast.Constant(value=None),
empty="Constant(value=None)",
full="Constant(value=None)",
)
check_node(
+ ast.Constant(value=[]),
+ empty="Constant(value=[])",
+ full="Constant(value=[])",
+ )
+
+ check_node(
ast.Constant(value=''),
empty="Constant(value='')",
full="Constant(value='')",
)
+ check_node(
+ ast.Interpolation(value=ast.Constant(42), str=None, conversion=-1),
+ empty="Interpolation(value=Constant(value=42), str=None, conversion=-1)",
+ full="Interpolation(value=Constant(value=42), str=None, conversion=-1)",
+ )
+
+ check_node(
+ ast.Interpolation(value=ast.Constant(42), str=[], conversion=-1),
+ empty="Interpolation(value=Constant(value=42), str=[], conversion=-1)",
+ full="Interpolation(value=Constant(value=42), str=[], conversion=-1)",
+ )
+
check_text(
"def a(b: int = 0, *, c): ...",
- empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int', ctx=Load()))], kwonlyargs=[arg(arg='c')], kw_defaults=[None], defaults=[Constant(value=0)]), body=[Expr(value=Constant(value=Ellipsis))])])",
+ empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int'))], kwonlyargs=[arg(arg='c')], kw_defaults=[None], defaults=[Constant(value=0)]), body=[Expr(value=Constant(value=Ellipsis))])])",
full="Module(body=[FunctionDef(name='a', args=arguments(posonlyargs=[], args=[arg(arg='b', annotation=Name(id='int', ctx=Load()))], kwonlyargs=[arg(arg='c')], kw_defaults=[None], defaults=[Constant(value=0)]), body=[Expr(value=Constant(value=Ellipsis))], decorator_list=[], type_params=[])], type_ignores=[])",
)
check_text(
"def a(b: int = 0, *, c): ...",
- empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=9, end_lineno=1, end_col_offset=12), lineno=1, col_offset=6, end_lineno=1, end_col_offset=12)], kwonlyargs=[arg(arg='c', lineno=1, col_offset=21, end_lineno=1, end_col_offset=22)], kw_defaults=[None], defaults=[Constant(value=0, lineno=1, col_offset=15, end_lineno=1, end_col_offset=16)]), body=[Expr(value=Constant(value=Ellipsis, lineno=1, col_offset=25, end_lineno=1, end_col_offset=28), lineno=1, col_offset=25, end_lineno=1, end_col_offset=28)], lineno=1, col_offset=0, end_lineno=1, end_col_offset=28)])",
+ empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int', lineno=1, col_offset=9, end_lineno=1, end_col_offset=12), lineno=1, col_offset=6, end_lineno=1, end_col_offset=12)], kwonlyargs=[arg(arg='c', lineno=1, col_offset=21, end_lineno=1, end_col_offset=22)], kw_defaults=[None], defaults=[Constant(value=0, lineno=1, col_offset=15, end_lineno=1, end_col_offset=16)]), body=[Expr(value=Constant(value=Ellipsis, lineno=1, col_offset=25, end_lineno=1, end_col_offset=28), lineno=1, col_offset=25, end_lineno=1, end_col_offset=28)], lineno=1, col_offset=0, end_lineno=1, end_col_offset=28)])",
full="Module(body=[FunctionDef(name='a', args=arguments(posonlyargs=[], args=[arg(arg='b', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=9, end_lineno=1, end_col_offset=12), lineno=1, col_offset=6, end_lineno=1, end_col_offset=12)], kwonlyargs=[arg(arg='c', lineno=1, col_offset=21, end_lineno=1, end_col_offset=22)], kw_defaults=[None], defaults=[Constant(value=0, lineno=1, col_offset=15, end_lineno=1, end_col_offset=16)]), body=[Expr(value=Constant(value=Ellipsis, lineno=1, col_offset=25, end_lineno=1, end_col_offset=28), lineno=1, col_offset=25, end_lineno=1, end_col_offset=28)], decorator_list=[], type_params=[], lineno=1, col_offset=0, end_lineno=1, end_col_offset=28)], type_ignores=[])",
include_attributes=True,
)
check_text(
'spam(eggs, "and cheese")',
- empty="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load()), Constant(value='and cheese')]))])",
+ empty="Module(body=[Expr(value=Call(func=Name(id='spam'), args=[Name(id='eggs'), Constant(value='and cheese')]))])",
full="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load()), Constant(value='and cheese')], keywords=[]))], type_ignores=[])",
)
check_text(
'spam(eggs, text="and cheese")',
- empty="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load())], keywords=[keyword(arg='text', value=Constant(value='and cheese'))]))])",
+ empty="Module(body=[Expr(value=Call(func=Name(id='spam'), args=[Name(id='eggs')], keywords=[keyword(arg='text', value=Constant(value='and cheese'))]))])",
full="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load())], keywords=[keyword(arg='text', value=Constant(value='and cheese'))]))], type_ignores=[])",
)
@@ -1610,12 +1632,12 @@ Module(
self.assertEqual(src, ast.fix_missing_locations(src))
self.maxDiff = None
self.assertEqual(ast.dump(src, include_attributes=True),
- "Module(body=[Expr(value=Call(func=Name(id='write', ctx=Load(), "
+ "Module(body=[Expr(value=Call(func=Name(id='write', "
"lineno=1, col_offset=0, end_lineno=1, end_col_offset=5), "
"args=[Constant(value='spam', lineno=1, col_offset=6, end_lineno=1, "
"end_col_offset=12)], lineno=1, col_offset=0, end_lineno=1, "
"end_col_offset=13), lineno=1, col_offset=0, end_lineno=1, "
- "end_col_offset=13), Expr(value=Call(func=Name(id='spam', ctx=Load(), "
+ "end_col_offset=13), Expr(value=Call(func=Name(id='spam', "
"lineno=1, col_offset=0, end_lineno=1, end_col_offset=0), "
"args=[Constant(value='eggs', lineno=1, col_offset=0, end_lineno=1, "
"end_col_offset=0)], lineno=1, col_offset=0, end_lineno=1, "
@@ -3335,7 +3357,7 @@ class CommandLineTests(unittest.TestCase):
body=[
AnnAssign(
target=Name(id='x', ctx=Store()),
- annotation=Name(id='bool', ctx=Load()),
+ annotation=Name(id='bool'),
value=Constant(value=1),
simple=1)],
type_ignores=[
@@ -3363,7 +3385,7 @@ class CommandLineTests(unittest.TestCase):
expect = '''
Expression(
body=Call(
- func=Name(id='print', ctx=Load()),
+ func=Name(id='print'),
args=[
Constant(value=1),
Constant(value=2),
@@ -3379,12 +3401,11 @@ class CommandLineTests(unittest.TestCase):
expect = '''
FunctionType(
argtypes=[
- Name(id='int', ctx=Load()),
- Name(id='str', ctx=Load())],
+ Name(id='int'),
+ Name(id='str')],
returns=Subscript(
- value=Name(id='list', ctx=Load()),
- slice=Name(id='int', ctx=Load()),
- ctx=Load()))
+ value=Name(id='list'),
+ slice=Name(id='int')))
'''
for flag in ('-m=func_type', '--mode=func_type'):
with self.subTest(flag=flag):
@@ -3398,7 +3419,7 @@ class CommandLineTests(unittest.TestCase):
body=[
AnnAssign(
target=Name(id='x', ctx=Store()),
- annotation=Name(id='bool', ctx=Load()),
+ annotation=Name(id='bool'),
value=Constant(value=1),
simple=1)])
'''
@@ -3443,7 +3464,7 @@ class CommandLineTests(unittest.TestCase):
Module(
body=[
Match(
- subject=Name(id='x', ctx=Load()),
+ subject=Name(id='x'),
cases=[
match_case(
pattern=MatchValue(
@@ -3466,7 +3487,7 @@ class CommandLineTests(unittest.TestCase):
Module(
body=[
Match(
- subject=Name(id='a', ctx=Load()),
+ subject=Name(id='a'),
cases=[
match_case(
pattern=MatchValue(
@@ -3492,7 +3513,7 @@ class CommandLineTests(unittest.TestCase):
Module(
body=[
Match(
- subject=Name(id='a', ctx=Load()),
+ subject=Name(id='a'),
cases=[
match_case(
pattern=MatchValue(
diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py
index 2c44647bf3e..636cb33dd98 100644
--- a/Lib/test/test_asyncgen.py
+++ b/Lib/test/test_asyncgen.py
@@ -2021,6 +2021,15 @@ class TestUnawaitedWarnings(unittest.TestCase):
g.athrow(RuntimeError)
gc_collect()
+ def test_athrow_throws_immediately(self):
+ async def gen():
+ yield 1
+
+ g = gen()
+ msg = "athrow expected at least 1 argument, got 0"
+ with self.assertRaisesRegex(TypeError, msg):
+ g.athrow()
+
def test_aclose(self):
async def gen():
yield 1
diff --git a/Lib/test/test_capi/test_opt.py b/Lib/test/test_capi/test_opt.py
index cb6eae48414..a292ebcc7f4 100644
--- a/Lib/test/test_capi/test_opt.py
+++ b/Lib/test/test_capi/test_opt.py
@@ -1183,6 +1183,17 @@ class TestUopsOptimization(unittest.TestCase):
self.assertIsNotNone(ex)
self.assertIn("_RETURN_GENERATOR", get_opnames(ex))
+ def test_for_iter(self):
+ def testfunc(n):
+ t = 0
+ for i in set(range(n)):
+ t += i
+ return t
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD * (TIER2_THRESHOLD - 1) // 2)
+ self.assertIsNotNone(ex)
+ self.assertIn("_FOR_ITER_TIER_TWO", get_opnames(ex))
+
@unittest.skip("Tracing into generators currently isn't supported.")
def test_for_iter_gen(self):
def gen(n):
diff --git a/Lib/test/test_capi/test_unicode.py b/Lib/test/test_capi/test_unicode.py
index 3408c10f426..6a9c60f3a6d 100644
--- a/Lib/test/test_capi/test_unicode.py
+++ b/Lib/test/test_capi/test_unicode.py
@@ -1739,6 +1739,20 @@ class CAPITest(unittest.TestCase):
# Check that the second call returns the same result
self.assertEqual(getargs_s_hash(s), chr(k).encode() * (i + 1))
+ @support.cpython_only
+ @unittest.skipIf(_testcapi is None, 'need _testcapi module')
+ def test_GET_CACHED_HASH(self):
+ from _testcapi import unicode_GET_CACHED_HASH
+ content_bytes = b'some new string'
+ # avoid parser interning & constant folding
+ obj = str(content_bytes, 'ascii')
+ # impl detail: fresh strings do not have cached hash
+ self.assertEqual(unicode_GET_CACHED_HASH(obj), -1)
+ # impl detail: adding string to a dict caches its hash
+ {obj: obj}
+ # impl detail: ASCII string hashes are equal to bytes ones
+ self.assertEqual(unicode_GET_CACHED_HASH(obj), hash(content_bytes))
+
class PyUnicodeWriterTest(unittest.TestCase):
def create_writer(self, size):
@@ -1776,6 +1790,13 @@ class PyUnicodeWriterTest(unittest.TestCase):
self.assertEqual(writer.finish(),
"ascii-latin1=\xE9-euro=\u20AC.")
+ def test_ascii(self):
+ writer = self.create_writer(0)
+ writer.write_ascii(b"Hello ", -1)
+ writer.write_ascii(b"", 0)
+ writer.write_ascii(b"Python! <truncated>", 6)
+ self.assertEqual(writer.finish(), "Hello Python")
+
def test_invalid_utf8(self):
writer = self.create_writer(0)
with self.assertRaises(UnicodeDecodeError):
diff --git a/Lib/test/test_class.py b/Lib/test/test_class.py
index 4c12d43556f..8c7a62a74ba 100644
--- a/Lib/test/test_class.py
+++ b/Lib/test/test_class.py
@@ -652,6 +652,7 @@ class ClassTests(unittest.TestCase):
a = A(hash(A.f)^(-1))
hash(a.f)
+ @cpython_only
def testSetattrWrapperNameIntern(self):
# Issue #25794: __setattr__ should intern the attribute name
class A:
diff --git a/Lib/test/test_code.py b/Lib/test/test_code.py
index 32cf8aacaf6..9fc2b047bef 100644
--- a/Lib/test/test_code.py
+++ b/Lib/test/test_code.py
@@ -701,6 +701,26 @@ class CodeTest(unittest.TestCase):
'checks': CO_FAST_LOCAL,
'res': CO_FAST_LOCAL,
},
+ defs.spam_full_args: {
+ 'a': POSONLY,
+ 'b': POSONLY,
+ 'c': POSORKW,
+ 'd': POSORKW,
+ 'e': KWONLY,
+ 'f': KWONLY,
+ 'args': VARARGS,
+ 'kwargs': VARKWARGS,
+ },
+ defs.spam_full_args_with_defaults: {
+ 'a': POSONLY,
+ 'b': POSONLY,
+ 'c': POSORKW,
+ 'd': POSORKW,
+ 'e': KWONLY,
+ 'f': KWONLY,
+ 'args': VARARGS,
+ 'kwargs': VARKWARGS,
+ },
defs.spam_args_attrs_and_builtins: {
'a': POSONLY,
'b': POSONLY,
@@ -714,6 +734,7 @@ class CodeTest(unittest.TestCase):
defs.spam_returns_arg: {
'x': POSORKW,
},
+ defs.spam_raises: {},
defs.spam_with_inner_not_closure: {
'eggs': CO_FAST_LOCAL,
},
@@ -934,6 +955,20 @@ class CodeTest(unittest.TestCase):
purelocals=5,
globalvars=6,
),
+ defs.spam_full_args: new_var_counts(
+ posonly=2,
+ posorkw=2,
+ kwonly=2,
+ varargs=1,
+ varkwargs=1,
+ ),
+ defs.spam_full_args_with_defaults: new_var_counts(
+ posonly=2,
+ posorkw=2,
+ kwonly=2,
+ varargs=1,
+ varkwargs=1,
+ ),
defs.spam_args_attrs_and_builtins: new_var_counts(
posonly=2,
posorkw=2,
@@ -945,6 +980,9 @@ class CodeTest(unittest.TestCase):
defs.spam_returns_arg: new_var_counts(
posorkw=1,
),
+ defs.spam_raises: new_var_counts(
+ globalvars=1,
+ ),
defs.spam_with_inner_not_closure: new_var_counts(
purelocals=1,
),
@@ -1097,10 +1135,16 @@ class CodeTest(unittest.TestCase):
def test_stateless(self):
self.maxDiff = None
+ STATELESS_FUNCTIONS = [
+ *defs.STATELESS_FUNCTIONS,
+ # stateless with defaults
+ defs.spam_full_args_with_defaults,
+ ]
+
for func in defs.STATELESS_CODE:
with self.subTest((func, '(code)')):
_testinternalcapi.verify_stateless_code(func.__code__)
- for func in defs.STATELESS_FUNCTIONS:
+ for func in STATELESS_FUNCTIONS:
with self.subTest((func, '(func)')):
_testinternalcapi.verify_stateless_code(func)
@@ -1110,7 +1154,7 @@ class CodeTest(unittest.TestCase):
with self.assertRaises(Exception):
_testinternalcapi.verify_stateless_code(func.__code__)
- if func not in defs.STATELESS_FUNCTIONS:
+ if func not in STATELESS_FUNCTIONS:
with self.subTest((func, '(func)')):
with self.assertRaises(Exception):
_testinternalcapi.verify_stateless_code(func)
diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py
index 9aace57633b..60feab225a1 100644
--- a/Lib/test/test_csv.py
+++ b/Lib/test/test_csv.py
@@ -1122,19 +1122,22 @@ class TestDialectValidity(unittest.TestCase):
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"quotechar" must be a 1-character string')
+ '"quotechar" must be a unicode character or None, '
+ 'not a string of length 0')
mydialect.quotechar = "''"
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"quotechar" must be a 1-character string')
+ '"quotechar" must be a unicode character or None, '
+ 'not a string of length 2')
mydialect.quotechar = 4
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"quotechar" must be string or None, not int')
+ '"quotechar" must be a unicode character or None, '
+ 'not int')
def test_delimiter(self):
class mydialect(csv.Dialect):
@@ -1151,31 +1154,32 @@ class TestDialectValidity(unittest.TestCase):
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"delimiter" must be a 1-character string')
+ '"delimiter" must be a unicode character, '
+ 'not a string of length 3')
mydialect.delimiter = ""
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"delimiter" must be a 1-character string')
+ '"delimiter" must be a unicode character, not a string of length 0')
mydialect.delimiter = b","
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"delimiter" must be string, not bytes')
+ '"delimiter" must be a unicode character, not bytes')
mydialect.delimiter = 4
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"delimiter" must be string, not int')
+ '"delimiter" must be a unicode character, not int')
mydialect.delimiter = None
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"delimiter" must be string, not NoneType')
+ '"delimiter" must be a unicode character, not NoneType')
def test_escapechar(self):
class mydialect(csv.Dialect):
@@ -1189,20 +1193,32 @@ class TestDialectValidity(unittest.TestCase):
self.assertEqual(d.escapechar, "\\")
mydialect.escapechar = ""
- with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'):
+ with self.assertRaises(csv.Error) as cm:
mydialect()
+ self.assertEqual(str(cm.exception),
+ '"escapechar" must be a unicode character or None, '
+ 'not a string of length 0')
mydialect.escapechar = "**"
- with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'):
+ with self.assertRaises(csv.Error) as cm:
mydialect()
+ self.assertEqual(str(cm.exception),
+ '"escapechar" must be a unicode character or None, '
+ 'not a string of length 2')
mydialect.escapechar = b"*"
- with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not bytes'):
+ with self.assertRaises(csv.Error) as cm:
mydialect()
+ self.assertEqual(str(cm.exception),
+ '"escapechar" must be a unicode character or None, '
+ 'not bytes')
mydialect.escapechar = 4
- with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not int'):
+ with self.assertRaises(csv.Error) as cm:
mydialect()
+ self.assertEqual(str(cm.exception),
+ '"escapechar" must be a unicode character or None, '
+ 'not int')
def test_lineterminator(self):
class mydialect(csv.Dialect):
@@ -1223,7 +1239,13 @@ class TestDialectValidity(unittest.TestCase):
with self.assertRaises(csv.Error) as cm:
mydialect()
self.assertEqual(str(cm.exception),
- '"lineterminator" must be a string')
+ '"lineterminator" must be a string, not int')
+
+ mydialect.lineterminator = None
+ with self.assertRaises(csv.Error) as cm:
+ mydialect()
+ self.assertEqual(str(cm.exception),
+ '"lineterminator" must be a string, not NoneType')
def test_invalid_chars(self):
def create_invalid(field_name, value, **kwargs):
diff --git a/Lib/test/test_ctypes/test_incomplete.py b/Lib/test/test_ctypes/test_incomplete.py
index fefdfe9102e..3189fcd1bd1 100644
--- a/Lib/test/test_ctypes/test_incomplete.py
+++ b/Lib/test/test_ctypes/test_incomplete.py
@@ -1,6 +1,5 @@
import ctypes
import unittest
-import warnings
from ctypes import Structure, POINTER, pointer, c_char_p
# String-based "incomplete pointers" were implemented in ctypes 0.6.3 (2003, when
@@ -21,9 +20,7 @@ class TestSetPointerType(unittest.TestCase):
_fields_ = [("name", c_char_p),
("next", lpcell)]
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- ctypes.SetPointerType(lpcell, cell)
+ lpcell.set_type(cell)
self.assertIs(POINTER(cell), lpcell)
@@ -50,10 +47,9 @@ class TestSetPointerType(unittest.TestCase):
_fields_ = [("name", c_char_p),
("next", lpcell)]
- with self.assertWarns(DeprecationWarning):
- ctypes.SetPointerType(lpcell, cell)
-
+ lpcell.set_type(cell)
self.assertIs(POINTER(cell), lpcell)
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_dbm.py b/Lib/test/test_dbm.py
index a10922a403e..7e8d78b8940 100644
--- a/Lib/test/test_dbm.py
+++ b/Lib/test/test_dbm.py
@@ -135,6 +135,67 @@ class AnyDBMTestCase:
assert(f[key] == b"Python:")
f.close()
+ def test_anydbm_readonly_reorganize(self):
+ self.init_db()
+ with dbm.open(_fname, 'r') as d:
+ # Early stopping.
+ if not hasattr(d, 'reorganize'):
+ self.skipTest("method reorganize not available this dbm submodule")
+
+ self.assertRaises(dbm.error, lambda: d.reorganize())
+
+ def test_anydbm_reorganize_not_changed_content(self):
+ self.init_db()
+ with dbm.open(_fname, 'c') as d:
+ # Early stopping.
+ if not hasattr(d, 'reorganize'):
+ self.skipTest("method reorganize not available this dbm submodule")
+
+ keys_before = sorted(d.keys())
+ values_before = [d[k] for k in keys_before]
+ d.reorganize()
+ keys_after = sorted(d.keys())
+ values_after = [d[k] for k in keys_before]
+ self.assertEqual(keys_before, keys_after)
+ self.assertEqual(values_before, values_after)
+
+ def test_anydbm_reorganize_decreased_size(self):
+
+ def _calculate_db_size(db_path):
+ if os.path.isfile(db_path):
+ return os.path.getsize(db_path)
+ total_size = 0
+ for root, _, filenames in os.walk(db_path):
+ for filename in filenames:
+ file_path = os.path.join(root, filename)
+ total_size += os.path.getsize(file_path)
+ return total_size
+
+ # This test requires relatively large databases to reliably show difference in size before and after reorganizing.
+ with dbm.open(_fname, 'n') as f:
+ # Early stopping.
+ if not hasattr(f, 'reorganize'):
+ self.skipTest("method reorganize not available this dbm submodule")
+
+ for k in self._dict:
+ f[k.encode('ascii')] = self._dict[k] * 100000
+ db_keys = list(f.keys())
+
+ # Make sure to calculate size of database only after file is closed to ensure file content are flushed to disk.
+ size_before = _calculate_db_size(os.path.dirname(_fname))
+
+ # Delete some elements from the start of the database.
+ keys_to_delete = db_keys[:len(db_keys) // 2]
+ with dbm.open(_fname, 'c') as f:
+ for k in keys_to_delete:
+ del f[k]
+ f.reorganize()
+
+ # Make sure to calculate size of database only after file is closed to ensure file content are flushed to disk.
+ size_after = _calculate_db_size(os.path.dirname(_fname))
+
+ self.assertLess(size_after, size_before)
+
def test_open_with_bytes(self):
dbm.open(os.fsencode(_fname), "c").close()
diff --git a/Lib/test/test_dbm_gnu.py b/Lib/test/test_dbm_gnu.py
index 66268c42a30..e0b988b7b95 100644
--- a/Lib/test/test_dbm_gnu.py
+++ b/Lib/test/test_dbm_gnu.py
@@ -74,12 +74,12 @@ class TestGdbm(unittest.TestCase):
# Test the flag parameter open() by trying all supported flag modes.
all = set(gdbm.open_flags)
# Test standard flags (presumably "crwn").
- modes = all - set('fsu')
+ modes = all - set('fsum')
for mode in sorted(modes): # put "c" mode first
self.g = gdbm.open(filename, mode)
self.g.close()
- # Test additional flags (presumably "fsu").
+ # Test additional flags (presumably "fsum").
flags = all - set('crwn')
for mode in modes:
for flag in flags:
@@ -217,6 +217,29 @@ class TestGdbm(unittest.TestCase):
create_empty_file(os.path.join(d, 'test'))
self.assertRaises(gdbm.error, gdbm.open, filename, 'r')
+ @unittest.skipUnless('m' in gdbm.open_flags, "requires 'm' in open_flags")
+ def test_nommap_no_crash(self):
+ self.g = g = gdbm.open(filename, 'nm')
+ os.truncate(filename, 0)
+
+ g.get(b'a', b'c')
+ g.keys()
+ g.firstkey()
+ g.nextkey(b'a')
+ with self.assertRaises(KeyError):
+ g[b'a']
+ with self.assertRaises(gdbm.error):
+ len(g)
+
+ with self.assertRaises(gdbm.error):
+ g[b'a'] = b'c'
+ with self.assertRaises(gdbm.error):
+ del g[b'a']
+ with self.assertRaises(gdbm.error):
+ g.setdefault(b'a', b'c')
+ with self.assertRaises(gdbm.error):
+ g.reorganize()
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_decimal.py b/Lib/test/test_decimal.py
index c0a1e378583..ef64b878805 100644
--- a/Lib/test/test_decimal.py
+++ b/Lib/test/test_decimal.py
@@ -981,6 +981,7 @@ class FormatTest:
('.0f', '0e-2', '0'),
('.0f', '3.14159265', '3'),
('.1f', '3.14159265', '3.1'),
+ ('.01f', '3.14159265', '3.1'), # leading zero in precision
('.4f', '3.14159265', '3.1416'),
('.6f', '3.14159265', '3.141593'),
('.7f', '3.14159265', '3.1415926'), # round-half-even!
@@ -1066,6 +1067,7 @@ class FormatTest:
('8,', '123456', ' 123,456'),
('08,', '123456', '0,123,456'), # special case: extra 0 needed
('+08,', '123456', '+123,456'), # but not if there's a sign
+ ('008,', '123456', '0,123,456'), # leading zero in width
(' 08,', '123456', ' 123,456'),
('08,', '-123456', '-123,456'),
('+09,', '123456', '+0,123,456'),
diff --git a/Lib/test/test_dis.py b/Lib/test/test_dis.py
index ec930a728aa..355990ed58e 100644
--- a/Lib/test/test_dis.py
+++ b/Lib/test/test_dis.py
@@ -606,7 +606,7 @@ dis_asyncwith = """\
POP_TOP
L1: RESUME 0
-%4d LOAD_FAST_BORROW 0 (c)
+%4d LOAD_FAST 0 (c)
COPY 1
LOAD_SPECIAL 3 (__aexit__)
SWAP 2
diff --git a/Lib/test/test_doctest/test_doctest.py b/Lib/test/test_doctest/test_doctest.py
index c5b247797c3..72763d4a013 100644
--- a/Lib/test/test_doctest/test_doctest.py
+++ b/Lib/test/test_doctest/test_doctest.py
@@ -2269,20 +2269,22 @@ def test_DocTestSuite():
>>> suite = doctest.DocTestSuite(test.test_doctest.sample_doctest)
>>> result = suite.run(unittest.TestResult())
>>> result
- <unittest.result.TestResult run=9 errors=0 failures=4>
+ <unittest.result.TestResult run=9 errors=2 failures=2>
>>> for tst, _ in result.failures:
... print(tst)
- bad (test.test_doctest.sample_doctest.__test__)
- foo (test.test_doctest.sample_doctest)
- test_silly_setup (test.test_doctest.sample_doctest)
- y_is_one (test.test_doctest.sample_doctest)
+ bad (test.test_doctest.sample_doctest.__test__) [0]
+ foo (test.test_doctest.sample_doctest) [0]
+ >>> for tst, _ in result.errors:
+ ... print(tst)
+ test_silly_setup (test.test_doctest.sample_doctest) [1]
+ y_is_one (test.test_doctest.sample_doctest) [0]
We can also supply the module by name:
>>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest')
>>> result = suite.run(unittest.TestResult())
>>> result
- <unittest.result.TestResult run=9 errors=0 failures=4>
+ <unittest.result.TestResult run=9 errors=2 failures=2>
The module need not contain any doctest examples:
@@ -2304,21 +2306,26 @@ def test_DocTestSuite():
>>> result
<unittest.result.TestResult run=6 errors=0 failures=2>
>>> len(result.skipped)
- 2
+ 7
>>> for tst, _ in result.skipped:
... print(tst)
+ double_skip (test.test_doctest.sample_doctest_skip) [0]
+ double_skip (test.test_doctest.sample_doctest_skip) [1]
double_skip (test.test_doctest.sample_doctest_skip)
+ partial_skip_fail (test.test_doctest.sample_doctest_skip) [0]
+ partial_skip_pass (test.test_doctest.sample_doctest_skip) [0]
+ single_skip (test.test_doctest.sample_doctest_skip) [0]
single_skip (test.test_doctest.sample_doctest_skip)
>>> for tst, _ in result.failures:
... print(tst)
- no_skip_fail (test.test_doctest.sample_doctest_skip)
- partial_skip_fail (test.test_doctest.sample_doctest_skip)
+ no_skip_fail (test.test_doctest.sample_doctest_skip) [0]
+ partial_skip_fail (test.test_doctest.sample_doctest_skip) [1]
We can use the current module:
>>> suite = test.test_doctest.sample_doctest.test_suite()
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=4>
+ <unittest.result.TestResult run=9 errors=2 failures=2>
We can also provide a DocTestFinder:
@@ -2326,7 +2333,7 @@ def test_DocTestSuite():
>>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest',
... test_finder=finder)
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=4>
+ <unittest.result.TestResult run=9 errors=2 failures=2>
The DocTestFinder need not return any tests:
@@ -2342,7 +2349,7 @@ def test_DocTestSuite():
>>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', globs={})
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=5>
+ <unittest.result.TestResult run=9 errors=3 failures=2>
Alternatively, we can provide extra globals. Here we'll make an
error go away by providing an extra global variable:
@@ -2350,7 +2357,7 @@ def test_DocTestSuite():
>>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest',
... extraglobs={'y': 1})
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=3>
+ <unittest.result.TestResult run=9 errors=1 failures=2>
You can pass option flags. Here we'll cause an extra error
by disabling the blank-line feature:
@@ -2358,7 +2365,7 @@ def test_DocTestSuite():
>>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest',
... optionflags=doctest.DONT_ACCEPT_BLANKLINE)
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=5>
+ <unittest.result.TestResult run=9 errors=2 failures=3>
You can supply setUp and tearDown functions:
@@ -2375,7 +2382,7 @@ def test_DocTestSuite():
>>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest',
... setUp=setUp, tearDown=tearDown)
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=3>
+ <unittest.result.TestResult run=9 errors=1 failures=2>
But the tearDown restores sanity:
@@ -2393,7 +2400,7 @@ def test_DocTestSuite():
>>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', setUp=setUp)
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=3>
+ <unittest.result.TestResult run=9 errors=1 failures=2>
Here, we didn't need to use a tearDown function because we
modified the test globals, which are a copy of the
@@ -2409,162 +2416,97 @@ def test_DocTestSuite_errors():
>>> suite = doctest.DocTestSuite(mod)
>>> result = suite.run(unittest.TestResult())
>>> result
- <unittest.result.TestResult run=4 errors=0 failures=4>
+ <unittest.result.TestResult run=4 errors=6 failures=3>
>>> print(result.failures[0][1]) # doctest: +ELLIPSIS
Traceback (most recent call last):
- File ...
- raise self.failureException(self.format_failure(new.getvalue()))
- AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors
- File "...sample_doctest_errors.py", line 0, in sample_doctest_errors
- <BLANKLINE>
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line 5, in test.test_doctest.sample_doctest_errors
- Failed example:
+ File "...sample_doctest_errors.py", line 5, in test.test_doctest.sample_doctest_errors
+ >...>> 2 + 2
+ AssertionError: Failed example:
2 + 2
Expected:
5
Got:
4
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line 7, in test.test_doctest.sample_doctest_errors
- Failed example:
- 1/0
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File "<doctest test.test_doctest.sample_doctest_errors[1]>", line 1, in <module>
- 1/0
- ~^~
- ZeroDivisionError: division by zero
- <BLANKLINE>
<BLANKLINE>
>>> print(result.failures[1][1]) # doctest: +ELLIPSIS
Traceback (most recent call last):
- File ...
- raise self.failureException(self.format_failure(new.getvalue()))
- AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors.__test__.bad
- File "...sample_doctest_errors.py", line unknown line number, in bad
- <BLANKLINE>
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line ?, in test.test_doctest.sample_doctest_errors.__test__.bad
- Failed example:
+ File "...sample_doctest_errors.py", line None, in test.test_doctest.sample_doctest_errors.__test__.bad
+ AssertionError: Failed example:
2 + 2
Expected:
5
Got:
4
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line ?, in test.test_doctest.sample_doctest_errors.__test__.bad
- Failed example:
- 1/0
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File "<doctest test.test_doctest.sample_doctest_errors.__test__.bad[1]>", line 1, in <module>
- 1/0
- ~^~
- ZeroDivisionError: division by zero
- <BLANKLINE>
<BLANKLINE>
>>> print(result.failures[2][1]) # doctest: +ELLIPSIS
Traceback (most recent call last):
- File ...
- raise self.failureException(self.format_failure(new.getvalue()))
- AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors.errors
- File "...sample_doctest_errors.py", line 14, in errors
- <BLANKLINE>
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line 16, in test.test_doctest.sample_doctest_errors.errors
- Failed example:
+ File "...sample_doctest_errors.py", line 16, in test.test_doctest.sample_doctest_errors.errors
+ >...>> 2 + 2
+ AssertionError: Failed example:
2 + 2
Expected:
5
Got:
4
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line 18, in test.test_doctest.sample_doctest_errors.errors
- Failed example:
+ <BLANKLINE>
+ >>> print(result.errors[0][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...sample_doctest_errors.py", line 7, in test.test_doctest.sample_doctest_errors
+ >...>> 1/0
+ File "<doctest test.test_doctest.sample_doctest_errors[1]>", line 1, in <module>
1/0
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File "<doctest test.test_doctest.sample_doctest_errors.errors[1]>", line 1, in <module>
- 1/0
- ~^~
- ZeroDivisionError: division by zero
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line 23, in test.test_doctest.sample_doctest_errors.errors
- Failed example:
- f()
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File "<doctest test.test_doctest.sample_doctest_errors.errors[3]>", line 1, in <module>
- f()
- ~^^
- File "<doctest test.test_doctest.sample_doctest_errors.errors[2]>", line 2, in f
- 2 + '2'
- ~~^~~~~
- TypeError: ...
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line 25, in test.test_doctest.sample_doctest_errors.errors
- Failed example:
- g()
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File "<doctest test.test_doctest.sample_doctest_errors.errors[4]>", line 1, in <module>
- g()
- ~^^
- File "...sample_doctest_errors.py", line 12, in g
- [][0] # line 12
- ~~^^^
- IndexError: list index out of range
+ ~^~
+ ZeroDivisionError: division by zero
<BLANKLINE>
+ >>> print(result.errors[1][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...sample_doctest_errors.py", line None, in test.test_doctest.sample_doctest_errors.__test__.bad
+ File "<doctest test.test_doctest.sample_doctest_errors.__test__.bad[1]>", line 1, in <module>
+ 1/0
+ ~^~
+ ZeroDivisionError: division by zero
<BLANKLINE>
- >>> print(result.failures[3][1]) # doctest: +ELLIPSIS
+ >>> print(result.errors[2][1]) # doctest: +ELLIPSIS
Traceback (most recent call last):
- File ...
- raise self.failureException(self.format_failure(new.getvalue()))
- AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors.syntax_error
- File "...sample_doctest_errors.py", line 29, in syntax_error
+ File "...sample_doctest_errors.py", line 18, in test.test_doctest.sample_doctest_errors.errors
+ >...>> 1/0
+ File "<doctest test.test_doctest.sample_doctest_errors.errors[1]>", line 1, in <module>
+ 1/0
+ ~^~
+ ZeroDivisionError: division by zero
<BLANKLINE>
- ----------------------------------------------------------------------
- File "...sample_doctest_errors.py", line 31, in test.test_doctest.sample_doctest_errors.syntax_error
- Failed example:
- 2+*3
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^
- File "<doctest test.test_doctest.sample_doctest_errors.syntax_error[0]>", line 1
- 2+*3
- ^
- SyntaxError: invalid syntax
+ >>> print(result.errors[3][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...sample_doctest_errors.py", line 23, in test.test_doctest.sample_doctest_errors.errors
+ >...>> f()
+ File "<doctest test.test_doctest.sample_doctest_errors.errors[3]>", line 1, in <module>
+ f()
+ ~^^
+ File "<doctest test.test_doctest.sample_doctest_errors.errors[2]>", line 2, in f
+ 2 + '2'
+ ~~^~~~~
+ TypeError: ...
<BLANKLINE>
+ >>> print(result.errors[4][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...sample_doctest_errors.py", line 25, in test.test_doctest.sample_doctest_errors.errors
+ >...>> g()
+ File "<doctest test.test_doctest.sample_doctest_errors.errors[4]>", line 1, in <module>
+ g()
+ ~^^
+ File "...sample_doctest_errors.py", line 12, in g
+ [][0] # line 12
+ ~~^^^
+ IndexError: list index out of range
+ <BLANKLINE>
+ >>> print(result.errors[5][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...sample_doctest_errors.py", line 31, in test.test_doctest.sample_doctest_errors.syntax_error
+ >...>> 2+*3
+ File "<doctest test.test_doctest.sample_doctest_errors.syntax_error[0]>", line 1
+ 2+*3
+ ^
+ SyntaxError: invalid syntax
<BLANKLINE>
"""
@@ -2579,7 +2521,7 @@ def test_DocFileSuite():
... 'test_doctest2.txt',
... 'test_doctest4.txt')
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=3 errors=0 failures=2>
+ <unittest.result.TestResult run=3 errors=2 failures=0>
The test files are looked for in the directory containing the
calling module. A package keyword argument can be provided to
@@ -2591,14 +2533,14 @@ def test_DocFileSuite():
... 'test_doctest4.txt',
... package='test.test_doctest')
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=3 errors=0 failures=2>
+ <unittest.result.TestResult run=3 errors=2 failures=0>
'/' should be used as a path separator. It will be converted
to a native separator at run time:
>>> suite = doctest.DocFileSuite('../test_doctest/test_doctest.txt')
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=1 errors=0 failures=1>
+ <unittest.result.TestResult run=1 errors=1 failures=0>
If DocFileSuite is used from an interactive session, then files
are resolved relative to the directory of sys.argv[0]:
@@ -2624,7 +2566,7 @@ def test_DocFileSuite():
>>> suite = doctest.DocFileSuite(test_file, module_relative=False)
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=1 errors=0 failures=1>
+ <unittest.result.TestResult run=1 errors=1 failures=0>
It is an error to specify `package` when `module_relative=False`:
@@ -2642,12 +2584,15 @@ def test_DocFileSuite():
... 'test_doctest_skip2.txt')
>>> result = suite.run(unittest.TestResult())
>>> result
- <unittest.result.TestResult run=4 errors=0 failures=1>
+ <unittest.result.TestResult run=4 errors=1 failures=0>
>>> len(result.skipped)
- 1
+ 4
>>> for tst, _ in result.skipped: # doctest: +ELLIPSIS
... print('=', tst)
+ = ...test_doctest_skip.txt [0]
+ = ...test_doctest_skip.txt [1]
= ...test_doctest_skip.txt
+ = ...test_doctest_skip2.txt [0]
You can specify initial global variables:
@@ -2656,7 +2601,7 @@ def test_DocFileSuite():
... 'test_doctest4.txt',
... globs={'favorite_color': 'blue'})
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=3 errors=0 failures=1>
+ <unittest.result.TestResult run=3 errors=1 failures=0>
In this case, we supplied a missing favorite color. You can
provide doctest options:
@@ -2667,7 +2612,7 @@ def test_DocFileSuite():
... optionflags=doctest.DONT_ACCEPT_BLANKLINE,
... globs={'favorite_color': 'blue'})
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=3 errors=0 failures=2>
+ <unittest.result.TestResult run=3 errors=1 failures=1>
And, you can provide setUp and tearDown functions:
@@ -2686,7 +2631,7 @@ def test_DocFileSuite():
... 'test_doctest4.txt',
... setUp=setUp, tearDown=tearDown)
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=3 errors=0 failures=1>
+ <unittest.result.TestResult run=3 errors=1 failures=0>
But the tearDown restores sanity:
@@ -2728,7 +2673,7 @@ def test_DocFileSuite():
... 'test_doctest4.txt',
... encoding='utf-8')
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=3 errors=0 failures=2>
+ <unittest.result.TestResult run=3 errors=2 failures=0>
"""
def test_DocFileSuite_errors():
@@ -2738,72 +2683,49 @@ def test_DocFileSuite_errors():
>>> suite = doctest.DocFileSuite('test_doctest_errors.txt')
>>> result = suite.run(unittest.TestResult())
>>> result
- <unittest.result.TestResult run=1 errors=0 failures=1>
+ <unittest.result.TestResult run=1 errors=3 failures=1>
>>> print(result.failures[0][1]) # doctest: +ELLIPSIS
Traceback (most recent call last):
- File ...
- raise self.failureException(self.format_failure(new.getvalue()))
- AssertionError: Failed doctest test for test_doctest_errors.txt
- File "...test_doctest_errors.txt", line 0
- <BLANKLINE>
- ----------------------------------------------------------------------
- File "...test_doctest_errors.txt", line 4, in test_doctest_errors.txt
- Failed example:
+ File "...test_doctest_errors.txt", line 4, in test_doctest_errors.txt
+ >...>> 2 + 2
+ AssertionError: Failed example:
2 + 2
Expected:
5
Got:
4
- ----------------------------------------------------------------------
- File "...test_doctest_errors.txt", line 6, in test_doctest_errors.txt
- Failed example:
+ <BLANKLINE>
+ >>> print(result.errors[0][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...test_doctest_errors.txt", line 6, in test_doctest_errors.txt
+ >...>> 1/0
+ File "<doctest test_doctest_errors.txt[1]>", line 1, in <module>
1/0
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File "<doctest test_doctest_errors.txt[1]>", line 1, in <module>
- 1/0
- ~^~
- ZeroDivisionError: division by zero
- ----------------------------------------------------------------------
- File "...test_doctest_errors.txt", line 11, in test_doctest_errors.txt
- Failed example:
+ ~^~
+ ZeroDivisionError: division by zero
+ <BLANKLINE>
+ >>> print(result.errors[1][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...test_doctest_errors.txt", line 11, in test_doctest_errors.txt
+ >...>> f()
+ File "<doctest test_doctest_errors.txt[3]>", line 1, in <module>
f()
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- File "<doctest test_doctest_errors.txt[3]>", line 1, in <module>
- f()
- ~^^
- File "<doctest test_doctest_errors.txt[2]>", line 2, in f
- 2 + '2'
- ~~^~~~~
- TypeError: ...
- ----------------------------------------------------------------------
- File "...test_doctest_errors.txt", line 13, in test_doctest_errors.txt
- Failed example:
- 2+*3
- Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^
- File "<doctest test_doctest_errors.txt[4]>", line 1
- 2+*3
- ^
- SyntaxError: invalid syntax
+ ~^^
+ File "<doctest test_doctest_errors.txt[2]>", line 2, in f
+ 2 + '2'
+ ~~^~~~~
+ TypeError: ...
<BLANKLINE>
+ >>> print(result.errors[2][1]) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ File "...test_doctest_errors.txt", line 13, in test_doctest_errors.txt
+ >...>> 2+*3
+ File "<doctest test_doctest_errors.txt[4]>", line 1
+ 2+*3
+ ^
+ SyntaxError: invalid syntax
<BLANKLINE>
+
"""
def test_trailing_space_in_test():
@@ -2874,15 +2796,25 @@ def test_unittest_reportflags():
>>> import unittest
>>> result = suite.run(unittest.TestResult())
>>> result
- <unittest.result.TestResult run=1 errors=0 failures=1>
+ <unittest.result.TestResult run=1 errors=1 failures=1>
>>> print(result.failures[0][1]) # doctest: +ELLIPSIS
- Traceback ...
- Failed example:
- favorite_color
- ...
- Failed example:
+ Traceback (most recent call last):
+ File ...
+ >...>> if 1:
+ AssertionError: Failed example:
if 1:
- ...
+ print('a')
+ print()
+ print('b')
+ Expected:
+ a
+ <BLANKLINE>
+ b
+ Got:
+ a
+ <BLANKLINE>
+ b
+ <BLANKLINE>
Note that we see both failures displayed.
@@ -2891,18 +2823,8 @@ def test_unittest_reportflags():
Now, when we run the test:
- >>> result = suite.run(unittest.TestResult())
- >>> result
- <unittest.result.TestResult run=1 errors=0 failures=1>
- >>> print(result.failures[0][1]) # doctest: +ELLIPSIS
- Traceback ...
- Failed example:
- favorite_color
- Exception raised:
- ...
- NameError: name 'favorite_color' is not defined
- <BLANKLINE>
- <BLANKLINE>
+ >>> suite.run(unittest.TestResult())
+ <unittest.result.TestResult run=1 errors=1 failures=0>
We get only the first failure.
@@ -2912,21 +2834,20 @@ def test_unittest_reportflags():
>>> suite = doctest.DocFileSuite('test_doctest.txt',
... optionflags=doctest.DONT_ACCEPT_BLANKLINE | doctest.REPORT_NDIFF)
- Then the default eporting options are ignored:
+ Then the default reporting options are ignored:
>>> result = suite.run(unittest.TestResult())
>>> result
- <unittest.result.TestResult run=1 errors=0 failures=1>
+ <unittest.result.TestResult run=1 errors=1 failures=1>
*NOTE*: These doctest are intentionally not placed in raw string to depict
the trailing whitespace using `\x20` in the diff below.
>>> print(result.failures[0][1]) # doctest: +ELLIPSIS
Traceback ...
- Failed example:
- favorite_color
- ...
- Failed example:
+ File ...
+ >...>> if 1:
+ AssertionError: Failed example:
if 1:
print('a')
print()
@@ -2937,7 +2858,6 @@ def test_unittest_reportflags():
+\x20
b
<BLANKLINE>
- <BLANKLINE>
Test runners can restore the formatting flags after they run:
@@ -3145,11 +3065,6 @@ Tests for error reporting in the testfile() function.
1/0
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest test_doctest_errors.txt[1]>", line 1, in <module>
1/0
~^~
@@ -3160,11 +3075,6 @@ Tests for error reporting in the testfile() function.
f()
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest test_doctest_errors.txt[3]>", line 1, in <module>
f()
~^^
@@ -3177,12 +3087,6 @@ Tests for error reporting in the testfile() function.
Failed example:
2+*3
Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^
File "<doctest test_doctest_errors.txt[4]>", line 1
2+*3
^
@@ -3343,11 +3247,6 @@ Tests for error reporting in the testmod() function.
1/0
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest test.test_doctest.sample_doctest_errors[1]>", line 1, in <module>
1/0
~^~
@@ -3366,11 +3265,6 @@ Tests for error reporting in the testmod() function.
1/0
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest test.test_doctest.sample_doctest_errors.__test__.bad[1]>", line 1, in <module>
1/0
~^~
@@ -3389,11 +3283,6 @@ Tests for error reporting in the testmod() function.
1/0
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest test.test_doctest.sample_doctest_errors.errors[1]>", line 1, in <module>
1/0
~^~
@@ -3404,11 +3293,6 @@ Tests for error reporting in the testmod() function.
f()
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest test.test_doctest.sample_doctest_errors.errors[3]>", line 1, in <module>
f()
~^^
@@ -3422,11 +3306,6 @@ Tests for error reporting in the testmod() function.
g()
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest test.test_doctest.sample_doctest_errors.errors[4]>", line 1, in <module>
g()
~^^
@@ -3439,12 +3318,6 @@ Tests for error reporting in the testmod() function.
Failed example:
2+*3
Exception raised:
- Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^
File "<doctest test.test_doctest.sample_doctest_errors.syntax_error[0]>", line 1
2+*3
^
@@ -3490,11 +3363,6 @@ Check doctest with a non-ascii filename:
raise Exception('clé')
Exception raised:
Traceback (most recent call last):
- File ...
- exec(compile(example.source, filename, "single",
- ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- compileflags, True), test.globs)
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<doctest foo-bär@baz[0]>", line 1, in <module>
raise Exception('clé')
Exception: clé
@@ -3787,9 +3655,9 @@ def test_run_doctestsuite_multiple_times():
>>> import test.test_doctest.sample_doctest
>>> suite = doctest.DocTestSuite(test.test_doctest.sample_doctest)
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=4>
+ <unittest.result.TestResult run=9 errors=2 failures=2>
>>> suite.run(unittest.TestResult())
- <unittest.result.TestResult run=9 errors=0 failures=4>
+ <unittest.result.TestResult run=9 errors=2 failures=2>
"""
diff --git a/Lib/test/test_email/test__header_value_parser.py b/Lib/test/test_email/test__header_value_parser.py
index fd4ac2c404c..179e236ecdf 100644
--- a/Lib/test/test_email/test__header_value_parser.py
+++ b/Lib/test/test_email/test__header_value_parser.py
@@ -2491,6 +2491,38 @@ class TestParser(TestParserMixin, TestEmailBase):
self.assertEqual(address.all_mailboxes[0].domain, 'example.com')
self.assertEqual(address.all_mailboxes[0].addr_spec, '"example example"@example.com')
+ def test_get_address_with_invalid_domain(self):
+ address = self._test_get_x(parser.get_address,
+ '<T@[',
+ '<T@[]>',
+ '<T@[]>',
+ [errors.InvalidHeaderDefect, # missing trailing '>' on angle-addr
+ errors.InvalidHeaderDefect, # end of input inside domain-literal
+ ],
+ '')
+ self.assertEqual(address.token_type, 'address')
+ self.assertEqual(len(address.mailboxes), 0)
+ self.assertEqual(len(address.all_mailboxes), 1)
+ self.assertEqual(address.all_mailboxes[0].domain, '[]')
+ self.assertEqual(address.all_mailboxes[0].local_part, 'T')
+ self.assertEqual(address.all_mailboxes[0].token_type, 'invalid-mailbox')
+ self.assertEqual(address[0].token_type, 'invalid-mailbox')
+
+ address = self._test_get_x(parser.get_address,
+ '!an??:=m==fr2@[C',
+ '!an??:=m==fr2@[C];',
+ '!an??:=m==fr2@[C];',
+ [errors.InvalidHeaderDefect, # end of header in group
+ errors.InvalidHeaderDefect, # end of input inside domain-literal
+ ],
+ '')
+ self.assertEqual(address.token_type, 'address')
+ self.assertEqual(len(address.mailboxes), 0)
+ self.assertEqual(len(address.all_mailboxes), 1)
+ self.assertEqual(address.all_mailboxes[0].domain, '[C]')
+ self.assertEqual(address.all_mailboxes[0].local_part, '=m==fr2')
+ self.assertEqual(address.all_mailboxes[0].token_type, 'invalid-mailbox')
+ self.assertEqual(address[0].token_type, 'group')
# get_address_list
@@ -2765,6 +2797,19 @@ class TestParser(TestParserMixin, TestEmailBase):
)
self.assertEqual(message_id.token_type, 'message-id')
+ def test_parse_message_id_with_invalid_domain(self):
+ message_id = self._test_parse_x(
+ parser.parse_message_id,
+ "<T@[",
+ "<T@[]>",
+ "<T@[]>",
+ [errors.ObsoleteHeaderDefect] + [errors.InvalidHeaderDefect] * 2,
+ [],
+ )
+ self.assertEqual(message_id.token_type, 'message-id')
+ self.assertEqual(str(message_id.all_defects[-1]),
+ "end of input inside domain-literal")
+
def test_parse_message_id_with_remaining(self):
message_id = self._test_parse_x(
parser.parse_message_id,
diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py
index 2b4b63a030b..303af25fc7a 100644
--- a/Lib/test/test_external_inspection.py
+++ b/Lib/test/test_external_inspection.py
@@ -114,17 +114,17 @@ class TestGetStackTrace(unittest.TestCase):
p.wait(timeout=SHORT_TIMEOUT)
thread_expected_stack_trace = [
- ("foo", script_name, 15),
- ("baz", script_name, 12),
- ("bar", script_name, 9),
- ('Thread.run', threading.__file__, ANY)
+ (script_name, 15, "foo"),
+ (script_name, 12, "baz"),
+ (script_name, 9, "bar"),
+ (threading.__file__, ANY, 'Thread.run')
]
# Is possible that there are more threads, so we check that the
# expected stack traces are in the result (looking at you Windows!)
self.assertIn((ANY, thread_expected_stack_trace), stack_trace)
# Check that the main thread stack trace is in the result
- frame = ("<module>", script_name, 19)
+ frame = (script_name, 19, "<module>")
for _, stack in stack_trace:
if frame in stack:
break
@@ -222,47 +222,47 @@ class TestGetStackTrace(unittest.TestCase):
root_task = "Task-1"
expected_stack_trace = [
[
- ("c5", script_name, 10),
- ("c4", script_name, 14),
- ("c3", script_name, 17),
- ("c2", script_name, 20),
+ (script_name, 10, "c5"),
+ (script_name, 14, "c4"),
+ (script_name, 17, "c3"),
+ (script_name, 20, "c2"),
],
"c2_root",
[
[
[
(
- "TaskGroup._aexit",
taskgroups.__file__,
ANY,
+ "TaskGroup._aexit"
),
(
- "TaskGroup.__aexit__",
taskgroups.__file__,
ANY,
+ "TaskGroup.__aexit__"
),
- ("main", script_name, 26),
+ (script_name, 26, "main"),
],
"Task-1",
[],
],
[
- [("c1", script_name, 23)],
+ [(script_name, 23, "c1")],
"sub_main_1",
[
[
[
(
- "TaskGroup._aexit",
taskgroups.__file__,
ANY,
+ "TaskGroup._aexit"
),
(
- "TaskGroup.__aexit__",
taskgroups.__file__,
ANY,
+ "TaskGroup.__aexit__"
),
- ("main", script_name, 26),
+ (script_name, 26, "main"),
],
"Task-1",
[],
@@ -270,22 +270,22 @@ class TestGetStackTrace(unittest.TestCase):
],
],
[
- [("c1", script_name, 23)],
+ [(script_name, 23, "c1")],
"sub_main_2",
[
[
[
(
- "TaskGroup._aexit",
taskgroups.__file__,
ANY,
+ "TaskGroup._aexit"
),
(
- "TaskGroup.__aexit__",
taskgroups.__file__,
ANY,
+ "TaskGroup.__aexit__"
),
- ("main", script_name, 26),
+ (script_name, 26, "main"),
],
"Task-1",
[],
@@ -363,9 +363,9 @@ class TestGetStackTrace(unittest.TestCase):
expected_stack_trace = [
[
- ("gen_nested_call", script_name, 10),
- ("gen", script_name, 16),
- ("main", script_name, 19),
+ (script_name, 10, "gen_nested_call"),
+ (script_name, 16, "gen"),
+ (script_name, 19, "main"),
],
"Task-1",
[],
@@ -439,9 +439,9 @@ class TestGetStackTrace(unittest.TestCase):
stack_trace[2].sort(key=lambda x: x[1])
expected_stack_trace = [
- [("deep", script_name, 11), ("c1", script_name, 15)],
+ [(script_name, 11, "deep"), (script_name, 15, "c1")],
"Task-2",
- [[[("main", script_name, 21)], "Task-1", []]],
+ [[[(script_name, 21, "main")], "Task-1", []]],
]
self.assertEqual(stack_trace, expected_stack_trace)
@@ -515,16 +515,16 @@ class TestGetStackTrace(unittest.TestCase):
stack_trace[2].sort(key=lambda x: x[1])
expected_stack_trace = [
[
- ("deep", script_name, 11),
- ("c1", script_name, 15),
- ("staggered_race.<locals>.run_one_coro", staggered.__file__, ANY),
+ (script_name, 11, "deep"),
+ (script_name, 15, "c1"),
+ (staggered.__file__, ANY, "staggered_race.<locals>.run_one_coro"),
],
"Task-2",
[
[
[
- ("staggered_race", staggered.__file__, ANY),
- ("main", script_name, 21),
+ (staggered.__file__, ANY, "staggered_race"),
+ (script_name, 21, "main"),
],
"Task-1",
[],
@@ -662,16 +662,16 @@ class TestGetStackTrace(unittest.TestCase):
self.assertIn((ANY, "Task-1", []), entries)
main_stack = [
(
- "TaskGroup._aexit",
taskgroups.__file__,
ANY,
+ "TaskGroup._aexit",
),
(
- "TaskGroup.__aexit__",
taskgroups.__file__,
ANY,
+ "TaskGroup.__aexit__",
),
- ("main", script_name, 60),
+ (script_name, 60, "main"),
]
self.assertIn(
(ANY, "server task", [[main_stack, ANY]]),
@@ -686,16 +686,16 @@ class TestGetStackTrace(unittest.TestCase):
[
[
(
- "TaskGroup._aexit",
taskgroups.__file__,
ANY,
+ "TaskGroup._aexit",
),
(
- "TaskGroup.__aexit__",
taskgroups.__file__,
ANY,
+ "TaskGroup.__aexit__",
),
- ("echo_client_spam", script_name, 41),
+ (script_name, 41, "echo_client_spam"),
],
ANY,
]
@@ -741,14 +741,14 @@ class TestGetStackTrace(unittest.TestCase):
stack[:2],
[
(
- "get_stack_trace",
__file__,
get_stack_trace.__code__.co_firstlineno + 2,
+ "get_stack_trace",
),
(
- "TestGetStackTrace.test_self_trace",
__file__,
self.test_self_trace.__code__.co_firstlineno + 6,
+ "TestGetStackTrace.test_self_trace",
),
]
)
diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py
index 96b3f305194..d1d2739856c 100644
--- a/Lib/test/test_fractions.py
+++ b/Lib/test/test_fractions.py
@@ -1518,6 +1518,8 @@ class FractionTest(unittest.TestCase):
(F(51, 1000), '.1f', '0.1'),
(F(149, 1000), '.1f', '0.1'),
(F(151, 1000), '.1f', '0.2'),
+ (F(22, 7), '.02f', '3.14'), # issue gh-130662
+ (F(22, 7), '005.02f', '03.14'),
]
for fraction, spec, expected in testcases:
with self.subTest(fraction=fraction, spec=spec):
@@ -1616,12 +1618,6 @@ class FractionTest(unittest.TestCase):
'=010%',
'>00.2f',
'>00f',
- # Too many zeros - minimum width should not have leading zeros
- '006f',
- # Leading zeros in precision
- '.010f',
- '.02f',
- '.000f',
# Missing precision
'.e',
'.f',
diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools.py
index a754b4f9ea9..8360afbf78c 100644
--- a/Lib/test/test_free_threading/test_itertools_batched.py
+++ b/Lib/test/test_free_threading/test_itertools.py
@@ -1,15 +1,15 @@
import unittest
from threading import Thread, Barrier
-from itertools import batched
+from itertools import batched, cycle
from test.support import threading_helper
threading_helper.requires_working_threading(module=True)
-class EnumerateThreading(unittest.TestCase):
+class ItertoolsThreading(unittest.TestCase):
@threading_helper.reap_threads
- def test_threading(self):
+ def test_batched(self):
number_of_threads = 10
number_of_iterations = 20
barrier = Barrier(number_of_threads)
@@ -34,5 +34,31 @@ class EnumerateThreading(unittest.TestCase):
barrier.reset()
+ @threading_helper.reap_threads
+ def test_cycle(self):
+ number_of_threads = 6
+ number_of_iterations = 10
+ number_of_cycles = 400
+
+ barrier = Barrier(number_of_threads)
+ def work(it):
+ barrier.wait()
+ for _ in range(number_of_cycles):
+ _ = next(it)
+
+ data = (1, 2, 3, 4)
+ for it in range(number_of_iterations):
+ cycle_iterator = cycle(data)
+ worker_threads = []
+ for ii in range(number_of_threads):
+ worker_threads.append(
+ Thread(target=work, args=[cycle_iterator]))
+
+ with threading_helper.start_threads(worker_threads):
+ pass
+
+ barrier.reset()
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py
index c39565144bf..7f5d48b9c63 100644
--- a/Lib/test/test_grammar.py
+++ b/Lib/test/test_grammar.py
@@ -1,7 +1,7 @@
# Python test set -- part 1, grammar.
# This just tests whether the parser accepts them all.
-from test.support import check_syntax_error
+from test.support import check_syntax_error, skip_wasi_stack_overflow
from test.support import import_helper
import annotationlib
import inspect
@@ -249,6 +249,18 @@ the \'lazy\' dog.\n\
compile(s, "<test>", "exec")
self.assertIn("was never closed", str(cm.exception))
+ @skip_wasi_stack_overflow()
+ def test_max_level(self):
+ # Macro defined in Parser/lexer/state.h
+ MAXLEVEL = 200
+
+ result = eval("(" * MAXLEVEL + ")" * MAXLEVEL)
+ self.assertEqual(result, ())
+
+ with self.assertRaises(SyntaxError) as cm:
+ eval("(" * (MAXLEVEL + 1) + ")" * (MAXLEVEL + 1))
+ self.assertStartsWith(str(cm.exception), 'too many nested parentheses')
+
var_annot_global: int # a global annotated is necessary for test_var_annot
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index 161c7652d7a..b83ae181718 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -12,6 +12,7 @@ import io
import itertools
import logging
import os
+import re
import sys
import sysconfig
import tempfile
@@ -97,6 +98,14 @@ def read_vectors(hash_name):
yield parts
+DEPRECATED_STRING_PARAMETER = re.escape(
+ "the 'string' keyword parameter is deprecated since "
+ "Python 3.15 and slated for removal in Python 3.19; "
+ "use the 'data' keyword parameter or pass the data "
+ "to hash as a positional argument instead"
+)
+
+
class HashLibTestCase(unittest.TestCase):
supported_hash_names = ( 'md5', 'MD5', 'sha1', 'SHA1',
'sha224', 'SHA224', 'sha256', 'SHA256',
@@ -140,11 +149,10 @@ class HashLibTestCase(unittest.TestCase):
# of hashlib.new given the algorithm name.
for algorithm, constructors in self.constructors_to_test.items():
constructors.add(getattr(hashlib, algorithm))
- def _test_algorithm_via_hashlib_new(data=None, _alg=algorithm, **kwargs):
- if data is None:
- return hashlib.new(_alg, **kwargs)
- return hashlib.new(_alg, data, **kwargs)
- constructors.add(_test_algorithm_via_hashlib_new)
+ def c(*args, __algorithm_name=algorithm, **kwargs):
+ return hashlib.new(__algorithm_name, *args, **kwargs)
+ c.__name__ = f'do_test_algorithm_via_hashlib_new_{algorithm}'
+ constructors.add(c)
_hashlib = self._conditional_import_module('_hashlib')
self._hashlib = _hashlib
@@ -249,6 +257,81 @@ class HashLibTestCase(unittest.TestCase):
self._hashlib.new("md5", usedforsecurity=False)
self._hashlib.openssl_md5(usedforsecurity=False)
+ @unittest.skipIf(get_fips_mode(), "skip in FIPS mode")
+ def test_clinic_signature(self):
+ for constructor in self.hash_constructors:
+ with self.subTest(constructor.__name__):
+ constructor(b'')
+ constructor(data=b'')
+ with self.assertWarnsRegex(DeprecationWarning,
+ DEPRECATED_STRING_PARAMETER):
+ constructor(string=b'')
+
+ digest_name = constructor(b'').name
+ with self.subTest(digest_name):
+ hashlib.new(digest_name, b'')
+ hashlib.new(digest_name, data=b'')
+ with self.assertWarnsRegex(DeprecationWarning,
+ DEPRECATED_STRING_PARAMETER):
+ hashlib.new(digest_name, string=b'')
+ if self._hashlib:
+ self._hashlib.new(digest_name, b'')
+ self._hashlib.new(digest_name, data=b'')
+ with self.assertWarnsRegex(DeprecationWarning,
+ DEPRECATED_STRING_PARAMETER):
+ self._hashlib.new(digest_name, string=b'')
+
+ @unittest.skipIf(get_fips_mode(), "skip in FIPS mode")
+ def test_clinic_signature_errors(self):
+ nomsg = b''
+ mymsg = b'msg'
+ conflicting_call = re.escape(
+ "'data' and 'string' are mutually exclusive "
+ "and support for 'string' keyword parameter "
+ "is slated for removal in a future version."
+ )
+ duplicated_param = re.escape("given by name ('data') and position")
+ unexpected_param = re.escape("got an unexpected keyword argument '_'")
+ for args, kwds, errmsg in [
+ # Reject duplicated arguments before unknown keyword arguments.
+ ((nomsg,), dict(data=nomsg, _=nomsg), duplicated_param),
+ ((mymsg,), dict(data=nomsg, _=nomsg), duplicated_param),
+ # Reject duplicated arguments before conflicting ones.
+ *itertools.product(
+ [[nomsg], [mymsg]],
+ [dict(data=nomsg), dict(data=nomsg, string=nomsg)],
+ [duplicated_param]
+ ),
+ # Reject unknown keyword arguments before conflicting ones.
+ *itertools.product(
+ [()],
+ [
+ dict(_=None),
+ dict(data=nomsg, _=None),
+ dict(string=nomsg, _=None),
+ dict(string=nomsg, data=nomsg, _=None),
+ ],
+ [unexpected_param]
+ ),
+ ((nomsg,), dict(_=None), unexpected_param),
+ ((mymsg,), dict(_=None), unexpected_param),
+ # Reject conflicting arguments.
+ [(nomsg,), dict(string=nomsg), conflicting_call],
+ [(mymsg,), dict(string=nomsg), conflicting_call],
+ [(), dict(data=nomsg, string=nomsg), conflicting_call],
+ ]:
+ for constructor in self.hash_constructors:
+ digest_name = constructor(b'').name
+ with self.subTest(constructor.__name__, args=args, kwds=kwds):
+ with self.assertRaisesRegex(TypeError, errmsg):
+ constructor(*args, **kwds)
+ with self.subTest(digest_name, args=args, kwds=kwds):
+ with self.assertRaisesRegex(TypeError, errmsg):
+ hashlib.new(digest_name, *args, **kwds)
+ if self._hashlib:
+ with self.assertRaisesRegex(TypeError, errmsg):
+ self._hashlib.new(digest_name, *args, **kwds)
+
def test_unknown_hash(self):
self.assertRaises(ValueError, hashlib.new, 'spam spam spam spam spam')
self.assertRaises(TypeError, hashlib.new, 1)
@@ -718,8 +801,6 @@ class HashLibTestCase(unittest.TestCase):
self.assertRaises(ValueError, constructor, node_offset=-1)
self.assertRaises(OverflowError, constructor, node_offset=max_offset+1)
- self.assertRaises(TypeError, constructor, data=b'')
- self.assertRaises(TypeError, constructor, string=b'')
self.assertRaises(TypeError, constructor, '')
constructor(
diff --git a/Lib/test/test_http_cookiejar.py b/Lib/test/test_http_cookiejar.py
index 6bc33b15ec3..04cb440cd4c 100644
--- a/Lib/test/test_http_cookiejar.py
+++ b/Lib/test/test_http_cookiejar.py
@@ -4,6 +4,7 @@ import os
import stat
import sys
import re
+from test import support
from test.support import os_helper
from test.support import warnings_helper
import time
@@ -105,8 +106,7 @@ class DateTimeTests(unittest.TestCase):
self.assertEqual(http2time(s.lower()), test_t, s.lower())
self.assertEqual(http2time(s.upper()), test_t, s.upper())
- def test_http2time_garbage(self):
- for test in [
+ @support.subTests('test', [
'',
'Garbage',
'Mandag 16. September 1996',
@@ -121,10 +121,9 @@ class DateTimeTests(unittest.TestCase):
'08-01-3697739',
'09 Feb 19942632 22:23:32 GMT',
'Wed, 09 Feb 1994834 22:23:32 GMT',
- ]:
- self.assertIsNone(http2time(test),
- "http2time(%s) is not None\n"
- "http2time(test) %s" % (test, http2time(test)))
+ ])
+ def test_http2time_garbage(self, test):
+ self.assertIsNone(http2time(test))
def test_http2time_redos_regression_actually_completes(self):
# LOOSE_HTTP_DATE_RE was vulnerable to malicious input which caused catastrophic backtracking (REDoS).
@@ -149,9 +148,7 @@ class DateTimeTests(unittest.TestCase):
self.assertEqual(parse_date("1994-02-03 19:45:29 +0530"),
(1994, 2, 3, 14, 15, 29))
- def test_iso2time_formats(self):
- # test iso2time for supported dates.
- tests = [
+ @support.subTests('s', [
'1994-02-03 00:00:00 -0000', # ISO 8601 format
'1994-02-03 00:00:00 +0000', # ISO 8601 format
'1994-02-03 00:00:00', # zone is optional
@@ -164,16 +161,15 @@ class DateTimeTests(unittest.TestCase):
# A few tests with extra space at various places
' 1994-02-03 ',
' 1994-02-03T00:00:00 ',
- ]
-
+ ])
+ def test_iso2time_formats(self, s):
+ # test iso2time for supported dates.
test_t = 760233600 # assume broken POSIX counting of seconds
- for s in tests:
- self.assertEqual(iso2time(s), test_t, s)
- self.assertEqual(iso2time(s.lower()), test_t, s.lower())
- self.assertEqual(iso2time(s.upper()), test_t, s.upper())
+ self.assertEqual(iso2time(s), test_t, s)
+ self.assertEqual(iso2time(s.lower()), test_t, s.lower())
+ self.assertEqual(iso2time(s.upper()), test_t, s.upper())
- def test_iso2time_garbage(self):
- for test in [
+ @support.subTests('test', [
'',
'Garbage',
'Thursday, 03-Feb-94 00:00:00 GMT',
@@ -186,9 +182,9 @@ class DateTimeTests(unittest.TestCase):
'01-01-1980 00:00:62',
'01-01-1980T00:00:62',
'19800101T250000Z',
- ]:
- self.assertIsNone(iso2time(test),
- "iso2time(%r)" % test)
+ ])
+ def test_iso2time_garbage(self, test):
+ self.assertIsNone(iso2time(test))
def test_iso2time_performance_regression(self):
# If ISO_DATE_RE regresses to quadratic complexity, this test will take a very long time to succeed.
@@ -199,24 +195,23 @@ class DateTimeTests(unittest.TestCase):
class HeaderTests(unittest.TestCase):
- def test_parse_ns_headers(self):
- # quotes should be stripped
- expected = [[('foo', 'bar'), ('expires', 2209069412), ('version', '0')]]
- for hdr in [
+ @support.subTests('hdr', [
'foo=bar; expires=01 Jan 2040 22:23:32 GMT',
'foo=bar; expires="01 Jan 2040 22:23:32 GMT"',
- ]:
- self.assertEqual(parse_ns_headers([hdr]), expected)
-
- def test_parse_ns_headers_version(self):
-
+ ])
+ def test_parse_ns_headers(self, hdr):
# quotes should be stripped
- expected = [[('foo', 'bar'), ('version', '1')]]
- for hdr in [
+ expected = [[('foo', 'bar'), ('expires', 2209069412), ('version', '0')]]
+ self.assertEqual(parse_ns_headers([hdr]), expected)
+
+ @support.subTests('hdr', [
'foo=bar; version="1"',
'foo=bar; Version="1"',
- ]:
- self.assertEqual(parse_ns_headers([hdr]), expected)
+ ])
+ def test_parse_ns_headers_version(self, hdr):
+ # quotes should be stripped
+ expected = [[('foo', 'bar'), ('version', '1')]]
+ self.assertEqual(parse_ns_headers([hdr]), expected)
def test_parse_ns_headers_special_names(self):
# names such as 'expires' are not special in first name=value pair
@@ -226,8 +221,7 @@ class HeaderTests(unittest.TestCase):
expected = [[("expires", "01 Jan 2040 22:23:32 GMT"), ("version", "0")]]
self.assertEqual(parse_ns_headers([hdr]), expected)
- def test_join_header_words(self):
- for src, expected in [
+ @support.subTests('src,expected', [
([[("foo", None), ("bar", "baz")]], "foo; bar=baz"),
(([]), ""),
(([[]]), ""),
@@ -237,12 +231,11 @@ class HeaderTests(unittest.TestCase):
'n; foo="foo;_", bar=foo_bar'),
([[("n", "m"), ("foo", None)], [("bar", "foo_bar")]],
'n=m; foo, bar=foo_bar'),
- ]:
- with self.subTest(src=src):
- self.assertEqual(join_header_words(src), expected)
+ ])
+ def test_join_header_words(self, src, expected):
+ self.assertEqual(join_header_words(src), expected)
- def test_split_header_words(self):
- tests = [
+ @support.subTests('arg,expect', [
("foo", [[("foo", None)]]),
("foo=bar", [[("foo", "bar")]]),
(" foo ", [[("foo", None)]]),
@@ -259,24 +252,22 @@ class HeaderTests(unittest.TestCase):
(r'foo; bar=baz, spam=, foo="\,\;\"", bar= ',
[[("foo", None), ("bar", "baz")],
[("spam", "")], [("foo", ',;"')], [("bar", "")]]),
- ]
-
- for arg, expect in tests:
- try:
- result = split_header_words([arg])
- except:
- import traceback, io
- f = io.StringIO()
- traceback.print_exc(None, f)
- result = "(error -- traceback follows)\n\n%s" % f.getvalue()
- self.assertEqual(result, expect, """
+ ])
+ def test_split_header_words(self, arg, expect):
+ try:
+ result = split_header_words([arg])
+ except:
+ import traceback, io
+ f = io.StringIO()
+ traceback.print_exc(None, f)
+ result = "(error -- traceback follows)\n\n%s" % f.getvalue()
+ self.assertEqual(result, expect, """
When parsing: '%s'
Expected: '%s'
Got: '%s'
""" % (arg, expect, result))
- def test_roundtrip(self):
- tests = [
+ @support.subTests('arg,expect', [
("foo", "foo"),
("foo=bar", "foo=bar"),
(" foo ", "foo"),
@@ -309,12 +300,11 @@ Got: '%s'
('n; foo="foo;_", bar="foo,_"',
'n; foo="foo;_", bar="foo,_"'),
- ]
-
- for arg, expect in tests:
- input = split_header_words([arg])
- res = join_header_words(input)
- self.assertEqual(res, expect, """
+ ])
+ def test_roundtrip(self, arg, expect):
+ input = split_header_words([arg])
+ res = join_header_words(input)
+ self.assertEqual(res, expect, """
When parsing: '%s'
Expected: '%s'
Got: '%s'
@@ -516,14 +506,7 @@ class CookieTests(unittest.TestCase):
## just the 7 special TLD's listed in their spec. And folks rely on
## that...
- def test_domain_return_ok(self):
- # test optimization: .domain_return_ok() should filter out most
- # domains in the CookieJar before we try to access them (because that
- # may require disk access -- in particular, with MSIECookieJar)
- # This is only a rough check for performance reasons, so it's not too
- # critical as long as it's sufficiently liberal.
- pol = DefaultCookiePolicy()
- for url, domain, ok in [
+ @support.subTests('url,domain,ok', [
("http://foo.bar.com/", "blah.com", False),
("http://foo.bar.com/", "rhubarb.blah.com", False),
("http://foo.bar.com/", "rhubarb.foo.bar.com", False),
@@ -543,11 +526,18 @@ class CookieTests(unittest.TestCase):
("http://foo/", ".local", True),
("http://barfoo.com", ".foo.com", False),
("http://barfoo.com", "foo.com", False),
- ]:
- request = urllib.request.Request(url)
- r = pol.domain_return_ok(domain, request)
- if ok: self.assertTrue(r)
- else: self.assertFalse(r)
+ ])
+ def test_domain_return_ok(self, url, domain, ok):
+ # test optimization: .domain_return_ok() should filter out most
+ # domains in the CookieJar before we try to access them (because that
+ # may require disk access -- in particular, with MSIECookieJar)
+ # This is only a rough check for performance reasons, so it's not too
+ # critical as long as it's sufficiently liberal.
+ pol = DefaultCookiePolicy()
+ request = urllib.request.Request(url)
+ r = pol.domain_return_ok(domain, request)
+ if ok: self.assertTrue(r)
+ else: self.assertFalse(r)
def test_missing_value(self):
# missing = sign in Cookie: header is regarded by Mozilla as a missing
@@ -581,10 +571,7 @@ class CookieTests(unittest.TestCase):
self.assertEqual(interact_netscape(c, "http://www.acme.com/foo/"),
'"spam"; eggs')
- def test_rfc2109_handling(self):
- # RFC 2109 cookies are handled as RFC 2965 or Netscape cookies,
- # dependent on policy settings
- for rfc2109_as_netscape, rfc2965, version in [
+ @support.subTests('rfc2109_as_netscape,rfc2965,version', [
# default according to rfc2965 if not explicitly specified
(None, False, 0),
(None, True, 1),
@@ -593,24 +580,27 @@ class CookieTests(unittest.TestCase):
(False, True, 1),
(True, False, 0),
(True, True, 0),
- ]:
- policy = DefaultCookiePolicy(
- rfc2109_as_netscape=rfc2109_as_netscape,
- rfc2965=rfc2965)
- c = CookieJar(policy)
- interact_netscape(c, "http://www.example.com/", "ni=ni; Version=1")
- try:
- cookie = c._cookies["www.example.com"]["/"]["ni"]
- except KeyError:
- self.assertIsNone(version) # didn't expect a stored cookie
- else:
- self.assertEqual(cookie.version, version)
- # 2965 cookies are unaffected
- interact_2965(c, "http://www.example.com/",
- "foo=bar; Version=1")
- if rfc2965:
- cookie2965 = c._cookies["www.example.com"]["/"]["foo"]
- self.assertEqual(cookie2965.version, 1)
+ ])
+ def test_rfc2109_handling(self, rfc2109_as_netscape, rfc2965, version):
+ # RFC 2109 cookies are handled as RFC 2965 or Netscape cookies,
+ # dependent on policy settings
+ policy = DefaultCookiePolicy(
+ rfc2109_as_netscape=rfc2109_as_netscape,
+ rfc2965=rfc2965)
+ c = CookieJar(policy)
+ interact_netscape(c, "http://www.example.com/", "ni=ni; Version=1")
+ try:
+ cookie = c._cookies["www.example.com"]["/"]["ni"]
+ except KeyError:
+ self.assertIsNone(version) # didn't expect a stored cookie
+ else:
+ self.assertEqual(cookie.version, version)
+ # 2965 cookies are unaffected
+ interact_2965(c, "http://www.example.com/",
+ "foo=bar; Version=1")
+ if rfc2965:
+ cookie2965 = c._cookies["www.example.com"]["/"]["foo"]
+ self.assertEqual(cookie2965.version, 1)
def test_ns_parser(self):
c = CookieJar()
@@ -778,8 +768,7 @@ class CookieTests(unittest.TestCase):
# Cookie is sent back to the same URI.
self.assertEqual(interact_netscape(cj, uri), value)
- def test_escape_path(self):
- cases = [
+ @support.subTests('arg,result', [
# quoted safe
("/foo%2f/bar", "/foo%2F/bar"),
("/foo%2F/bar", "/foo%2F/bar"),
@@ -799,9 +788,9 @@ class CookieTests(unittest.TestCase):
("/foo/bar\u00fc", "/foo/bar%C3%BC"), # UTF-8 encoded
# unicode
("/foo/bar\uabcd", "/foo/bar%EA%AF%8D"), # UTF-8 encoded
- ]
- for arg, result in cases:
- self.assertEqual(escape_path(arg), result)
+ ])
+ def test_escape_path(self, arg, result):
+ self.assertEqual(escape_path(arg), result)
def test_request_path(self):
# with parameters
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index 165949167ce..b3c9ef8efba 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -1,17 +1,22 @@
+import contextlib
import os
import pickle
+import sys
from textwrap import dedent
import threading
import types
import unittest
from test import support
+from test.support import os_helper
+from test.support import script_helper
from test.support import import_helper
# Raise SkipTest if subinterpreters not supported.
_interpreters = import_helper.import_module('_interpreters')
from test.support import Py_GIL_DISABLED
from test.support import interpreters
from test.support import force_not_colorized
+import test._crossinterp_definitions as defs
from test.support.interpreters import (
InterpreterError, InterpreterNotFoundError, ExecutionFailed,
)
@@ -29,6 +34,59 @@ WHENCE_STR_XI = 'cross-interpreter C-API'
WHENCE_STR_STDLIB = '_interpreters module'
+def is_pickleable(obj):
+ try:
+ pickle.dumps(obj)
+ except Exception:
+ return False
+ return True
+
+
+@contextlib.contextmanager
+def defined_in___main__(name, script, *, remove=False):
+ import __main__ as mainmod
+ mainns = vars(mainmod)
+ assert name not in mainns
+ exec(script, mainns, mainns)
+ if remove:
+ yield mainns.pop(name)
+ else:
+ try:
+ yield mainns[name]
+ finally:
+ mainns.pop(name, None)
+
+
+def build_excinfo(exctype, msg=None, formatted=None, errdisplay=None):
+ if isinstance(exctype, type):
+ assert issubclass(exctype, BaseException), exctype
+ exctype = types.SimpleNamespace(
+ __name__=exctype.__name__,
+ __qualname__=exctype.__qualname__,
+ __module__=exctype.__module__,
+ )
+ elif isinstance(exctype, str):
+ module, _, name = exctype.rpartition(exctype)
+ if not module and name in __builtins__:
+ module = 'builtins'
+ exctype = types.SimpleNamespace(
+ __name__=name,
+ __qualname__=exctype,
+ __module__=module or None,
+ )
+ else:
+ assert isinstance(exctype, types.SimpleNamespace)
+ assert msg is None or isinstance(msg, str), msg
+ assert formatted is None or isinstance(formatted, str), formatted
+ assert errdisplay is None or isinstance(errdisplay, str), errdisplay
+ return types.SimpleNamespace(
+ type=exctype,
+ msg=msg,
+ formatted=formatted,
+ errdisplay=errdisplay,
+ )
+
+
class ModuleTests(TestBase):
def test_queue_aliases(self):
@@ -890,24 +948,26 @@ class TestInterpreterExec(TestBase):
# Interpreter.exec() behavior.
-def call_func_noop():
- pass
+call_func_noop = defs.spam_minimal
+call_func_ident = defs.spam_returns_arg
+call_func_failure = defs.spam_raises
def call_func_return_shareable():
return (1, None)
-def call_func_return_not_shareable():
- return [1, 2, 3]
+def call_func_return_stateless_func():
+ return (lambda x: x)
-def call_func_failure():
- raise Exception('spam!')
+def call_func_return_pickleable():
+ return [1, 2, 3]
-def call_func_ident(value):
- return value
+def call_func_return_unpickleable():
+ x = 42
+ return (lambda: x)
def get_call_func_closure(value):
@@ -916,6 +976,11 @@ def get_call_func_closure(value):
return call_func_closure
+def call_func_exec_wrapper(script, ns):
+ res = exec(script, ns, ns)
+ return res, ns, id(ns)
+
+
class Spam:
@staticmethod
@@ -1012,86 +1077,375 @@ class TestInterpreterCall(TestBase):
# - preserves info (e.g. SyntaxError)
# - matching error display
- def test_call(self):
+ @contextlib.contextmanager
+ def assert_fails(self, expected):
+ with self.assertRaises(ExecutionFailed) as cm:
+ yield cm
+ uncaught = cm.exception.excinfo
+ self.assertEqual(uncaught.type.__name__, expected.__name__)
+
+ def assert_fails_not_shareable(self):
+ return self.assert_fails(interpreters.NotShareableError)
+
+ def assert_code_equal(self, code1, code2):
+ if code1 == code2:
+ return
+ self.assertEqual(code1.co_name, code2.co_name)
+ self.assertEqual(code1.co_flags, code2.co_flags)
+ self.assertEqual(code1.co_consts, code2.co_consts)
+ self.assertEqual(code1.co_varnames, code2.co_varnames)
+ self.assertEqual(code1.co_cellvars, code2.co_cellvars)
+ self.assertEqual(code1.co_freevars, code2.co_freevars)
+ self.assertEqual(code1.co_names, code2.co_names)
+ self.assertEqual(
+ _testinternalcapi.get_code_var_counts(code1),
+ _testinternalcapi.get_code_var_counts(code2),
+ )
+ self.assertEqual(code1.co_code, code2.co_code)
+
+ def assert_funcs_equal(self, func1, func2):
+ if func1 == func2:
+ return
+ self.assertIs(type(func1), type(func2))
+ self.assertEqual(func1.__name__, func2.__name__)
+ self.assertEqual(func1.__defaults__, func2.__defaults__)
+ self.assertEqual(func1.__kwdefaults__, func2.__kwdefaults__)
+ self.assertEqual(func1.__closure__, func2.__closure__)
+ self.assert_code_equal(func1.__code__, func2.__code__)
+ self.assertEqual(
+ _testinternalcapi.get_code_var_counts(func1),
+ _testinternalcapi.get_code_var_counts(func2),
+ )
+
+ def assert_exceptions_equal(self, exc1, exc2):
+ assert isinstance(exc1, Exception)
+ assert isinstance(exc2, Exception)
+ if exc1 == exc2:
+ return
+ self.assertIs(type(exc1), type(exc2))
+ self.assertEqual(exc1.args, exc2.args)
+
+ def test_stateless_funcs(self):
interp = interpreters.create()
- for i, (callable, args, kwargs) in enumerate([
- (call_func_noop, (), {}),
- (Spam.noop, (), {}),
+ func = call_func_noop
+ with self.subTest('no args, no return'):
+ res = interp.call(func)
+ self.assertIsNone(res)
+
+ func = call_func_return_shareable
+ with self.subTest('no args, returns shareable'):
+ res = interp.call(func)
+ self.assertEqual(res, (1, None))
+
+ func = call_func_return_stateless_func
+ expected = (lambda x: x)
+ with self.subTest('no args, returns stateless func'):
+ res = interp.call(func)
+ self.assert_funcs_equal(res, expected)
+
+ func = call_func_return_pickleable
+ with self.subTest('no args, returns pickleable'):
+ res = interp.call(func)
+ self.assertEqual(res, [1, 2, 3])
+
+ func = call_func_return_unpickleable
+ with self.subTest('no args, returns unpickleable'):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func)
+
+ def test_stateless_func_returns_arg(self):
+ interp = interpreters.create()
+
+ for arg in [
+ None,
+ 10,
+ 'spam!',
+ b'spam!',
+ (1, 2, 'spam!'),
+ memoryview(b'spam!'),
+ ]:
+ with self.subTest(f'shareable {arg!r}'):
+ assert _interpreters.is_shareable(arg)
+ res = interp.call(defs.spam_returns_arg, arg)
+ self.assertEqual(res, arg)
+
+ for arg in defs.STATELESS_FUNCTIONS:
+ with self.subTest(f'stateless func {arg!r}'):
+ res = interp.call(defs.spam_returns_arg, arg)
+ self.assert_funcs_equal(res, arg)
+
+ for arg in defs.TOP_FUNCTIONS:
+ if arg in defs.STATELESS_FUNCTIONS:
+ continue
+ with self.subTest(f'stateful func {arg!r}'):
+ res = interp.call(defs.spam_returns_arg, arg)
+ self.assert_funcs_equal(res, arg)
+ assert is_pickleable(arg)
+
+ for arg in [
+ Ellipsis,
+ NotImplemented,
+ object(),
+ 2**1000,
+ [1, 2, 3],
+ {'a': 1, 'b': 2},
+ types.SimpleNamespace(x=42),
+ # builtin types
+ object,
+ type,
+ Exception,
+ ModuleNotFoundError,
+ # builtin exceptions
+ Exception('uh-oh!'),
+ ModuleNotFoundError('mymodule'),
+ # builtin fnctions
+ len,
+ sys.exit,
+ # user classes
+ *defs.TOP_CLASSES,
+ *(c(*a) for c, a in defs.TOP_CLASSES.items()
+ if c not in defs.CLASSES_WITHOUT_EQUALITY),
+ ]:
+ with self.subTest(f'pickleable {arg!r}'):
+ res = interp.call(defs.spam_returns_arg, arg)
+ if type(arg) is object:
+ self.assertIs(type(res), object)
+ elif isinstance(arg, BaseException):
+ self.assert_exceptions_equal(res, arg)
+ else:
+ self.assertEqual(res, arg)
+ assert is_pickleable(arg)
+
+ for arg in [
+ types.MappingProxyType({}),
+ *(f for f in defs.NESTED_FUNCTIONS
+ if f not in defs.STATELESS_FUNCTIONS),
+ ]:
+ with self.subTest(f'unpickleable {arg!r}'):
+ assert not _interpreters.is_shareable(arg)
+ assert not is_pickleable(arg)
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(defs.spam_returns_arg, arg)
+
+ def test_full_args(self):
+ interp = interpreters.create()
+ expected = (1, 2, 3, 4, 5, 6, ('?',), {'g': 7, 'h': 8})
+ func = defs.spam_full_args
+ res = interp.call(func, 1, 2, 3, 4, '?', e=5, f=6, g=7, h=8)
+ self.assertEqual(res, expected)
+
+ def test_full_defaults(self):
+ # pickleable, but not stateless
+ interp = interpreters.create()
+ expected = (-1, -2, -3, -4, -5, -6, (), {'g': 8, 'h': 9})
+ res = interp.call(defs.spam_full_args_with_defaults, g=8, h=9)
+ self.assertEqual(res, expected)
+
+ def test_modified_arg(self):
+ interp = interpreters.create()
+ script = dedent("""
+ a = 7
+ b = 2
+ c = a ** b
+ """)
+ ns = {}
+ expected = {'a': 7, 'b': 2, 'c': 49}
+ res = interp.call(call_func_exec_wrapper, script, ns)
+ obj, resns, resid = res
+ del resns['__builtins__']
+ self.assertIsNone(obj)
+ self.assertEqual(ns, {})
+ self.assertEqual(resns, expected)
+ self.assertNotEqual(resid, id(ns))
+ self.assertNotEqual(resid, id(resns))
+
+ def test_func_in___main___valid(self):
+ # pickleable, already there'
+
+ with os_helper.temp_dir() as tempdir:
+ def new_mod(name, text):
+ script_helper.make_script(tempdir, name, dedent(text))
+
+ def run(text):
+ name = 'myscript'
+ text = dedent(f"""
+ import sys
+ sys.path.insert(0, {tempdir!r})
+
+ """) + dedent(text)
+ filename = script_helper.make_script(tempdir, name, text)
+ res = script_helper.assert_python_ok(filename)
+ return res.out.decode('utf-8').strip()
+
+ # no module indirection
+ with self.subTest('no indirection'):
+ text = run(f"""
+ from test.support import interpreters
+
+ def spam():
+ # This a global var...
+ return __name__
+
+ if __name__ == '__main__':
+ interp = interpreters.create()
+ res = interp.call(spam)
+ print(res)
+ """)
+ self.assertEqual(text, '<fake __main__>')
+
+ # indirect as func, direct interp
+ new_mod('mymod', f"""
+ def run(interp, func):
+ return interp.call(func)
+ """)
+ with self.subTest('indirect as func, direct interp'):
+ text = run(f"""
+ from test.support import interpreters
+ import mymod
+
+ def spam():
+ # This a global var...
+ return __name__
+
+ if __name__ == '__main__':
+ interp = interpreters.create()
+ res = mymod.run(interp, spam)
+ print(res)
+ """)
+ self.assertEqual(text, '<fake __main__>')
+
+ # indirect as func, indirect interp
+ new_mod('mymod', f"""
+ from test.support import interpreters
+ def run(func):
+ interp = interpreters.create()
+ return interp.call(func)
+ """)
+ with self.subTest('indirect as func, indirect interp'):
+ text = run(f"""
+ import mymod
+
+ def spam():
+ # This a global var...
+ return __name__
+
+ if __name__ == '__main__':
+ res = mymod.run(spam)
+ print(res)
+ """)
+ self.assertEqual(text, '<fake __main__>')
+
+ def test_func_in___main___invalid(self):
+ interp = interpreters.create()
+
+ funcname = f'{__name__.replace(".", "_")}_spam_okay'
+ script = dedent(f"""
+ def {funcname}():
+ # This a global var...
+ return __name__
+ """)
+
+ with self.subTest('pickleable, added dynamically'):
+ with defined_in___main__(funcname, script) as arg:
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(defs.spam_returns_arg, arg)
+
+ with self.subTest('lying about __main__'):
+ with defined_in___main__(funcname, script, remove=True) as arg:
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(defs.spam_returns_arg, arg)
+
+ def test_raises(self):
+ interp = interpreters.create()
+ with self.assertRaises(ExecutionFailed):
+ interp.call(call_func_failure)
+
+ with self.assert_fails(ValueError):
+ interp.call(call_func_complex, '???', exc=ValueError('spam'))
+
+ def test_call_valid(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs, expected) in enumerate([
+ (call_func_noop, (), {}, None),
+ (call_func_ident, ('spamspamspam',), {}, 'spamspamspam'),
+ (call_func_return_shareable, (), {}, (1, None)),
+ (call_func_return_pickleable, (), {}, [1, 2, 3]),
+ (Spam.noop, (), {}, None),
+ (Spam.from_values, (), {}, Spam(())),
+ (Spam.from_values, (1, 2, 3), {}, Spam((1, 2, 3))),
+ (Spam, ('???',), {}, Spam('???')),
+ (Spam(101), (), {}, (101, (), {})),
+ (Spam(10101).run, (), {}, (10101, (), {})),
+ (call_func_complex, ('ident', 'spam'), {}, 'spam'),
+ (call_func_complex, ('full-ident', 'spam'), {}, ('spam', (), {})),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'},
+ ('spam', ('ham',), {'eggs': '!!!'})),
+ (call_func_complex, ('globals',), {}, __name__),
+ (call_func_complex, ('interpid',), {}, interp.id),
+ (call_func_complex, ('custom', 'spam!'), {}, Spam('spam!')),
]):
with self.subTest(f'success case #{i+1}'):
- res = interp.call(callable)
- self.assertIs(res, None)
+ res = interp.call(callable, *args, **kwargs)
+ self.assertEqual(res, expected)
+
+ def test_call_invalid(self):
+ interp = interpreters.create()
+
+ func = get_call_func_closure
+ with self.subTest(func):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func, 42)
+
+ func = get_call_func_closure(42)
+ with self.subTest(func):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func)
+
+ func = call_func_complex
+ op = 'closure'
+ with self.subTest(f'{func} ({op})'):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func, op, value='~~~')
+
+ op = 'custom-inner'
+ with self.subTest(f'{func} ({op})'):
+ with self.assertRaises(interpreters.NotShareableError):
+ interp.call(func, op, 'eggs!')
+
+ def test_call_in_thread(self):
+ interp = interpreters.create()
for i, (callable, args, kwargs) in enumerate([
- (call_func_ident, ('spamspamspam',), {}),
- (get_call_func_closure, (42,), {}),
- (get_call_func_closure(42), (), {}),
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_pickleable, (), {}),
(Spam.from_values, (), {}),
(Spam.from_values, (1, 2, 3), {}),
- (Spam, ('???'), {}),
(Spam(101), (), {}),
(Spam(10101).run, (), {}),
+ (Spam.noop, (), {}),
(call_func_complex, ('ident', 'spam'), {}),
(call_func_complex, ('full-ident', 'spam'), {}),
(call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
(call_func_complex, ('globals',), {}),
(call_func_complex, ('interpid',), {}),
- (call_func_complex, ('closure',), {'value': '~~~'}),
(call_func_complex, ('custom', 'spam!'), {}),
- (call_func_complex, ('custom-inner', 'eggs!'), {}),
- (call_func_complex, ('???',), {'exc': ValueError('spam')}),
- (call_func_return_shareable, (), {}),
- (call_func_return_not_shareable, (), {}),
- ]):
- with self.subTest(f'invalid case #{i+1}'):
- with self.assertRaises(Exception):
- if args or kwargs:
- raise Exception((args, kwargs))
- interp.call(callable)
-
- with self.assertRaises(ExecutionFailed):
- interp.call(call_func_failure)
-
- def test_call_in_thread(self):
- interp = interpreters.create()
-
- for i, (callable, args, kwargs) in enumerate([
- (call_func_noop, (), {}),
- (Spam.noop, (), {}),
]):
with self.subTest(f'success case #{i+1}'):
with self.captured_thread_exception() as ctx:
- t = interp.call_in_thread(callable)
+ t = interp.call_in_thread(callable, *args, **kwargs)
t.join()
self.assertIsNone(ctx.caught)
for i, (callable, args, kwargs) in enumerate([
- (call_func_ident, ('spamspamspam',), {}),
(get_call_func_closure, (42,), {}),
(get_call_func_closure(42), (), {}),
- (Spam.from_values, (), {}),
- (Spam.from_values, (1, 2, 3), {}),
- (Spam, ('???'), {}),
- (Spam(101), (), {}),
- (Spam(10101).run, (), {}),
- (call_func_complex, ('ident', 'spam'), {}),
- (call_func_complex, ('full-ident', 'spam'), {}),
- (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
- (call_func_complex, ('globals',), {}),
- (call_func_complex, ('interpid',), {}),
- (call_func_complex, ('closure',), {'value': '~~~'}),
- (call_func_complex, ('custom', 'spam!'), {}),
- (call_func_complex, ('custom-inner', 'eggs!'), {}),
- (call_func_complex, ('???',), {'exc': ValueError('spam')}),
- (call_func_return_shareable, (), {}),
- (call_func_return_not_shareable, (), {}),
]):
with self.subTest(f'invalid case #{i+1}'):
- if args or kwargs:
- continue
with self.captured_thread_exception() as ctx:
- t = interp.call_in_thread(callable)
+ t = interp.call_in_thread(callable, *args, **kwargs)
t.join()
self.assertIsNotNone(ctx.caught)
@@ -1600,18 +1954,14 @@ class LowLevelTests(TestBase):
with results:
exc = _interpreters.exec(interpid, script)
out = results.stdout()
- self.assertEqual(out, '')
- self.assert_ns_equal(exc, types.SimpleNamespace(
- type=types.SimpleNamespace(
- __name__='Exception',
- __qualname__='Exception',
- __module__='builtins',
- ),
- msg='uh-oh!',
+ expected = build_excinfo(
+ Exception, 'uh-oh!',
# We check these in other tests.
formatted=exc.formatted,
errdisplay=exc.errdisplay,
- ))
+ )
+ self.assertEqual(out, '')
+ self.assert_ns_equal(exc, expected)
with self.subTest('from C-API'):
with self.interpreter_from_capi() as interpid:
@@ -1623,25 +1973,50 @@ class LowLevelTests(TestBase):
self.assertEqual(exc.msg, 'it worked!')
def test_call(self):
- with self.subTest('no args'):
- interpid = _interpreters.create()
- with self.assertRaises(ValueError):
- _interpreters.call(interpid, call_func_return_shareable)
+ interpid = _interpreters.create()
+
+ # Here we focus on basic args and return values.
+ # See TestInterpreterCall for full operational coverage,
+ # including supported callables.
+
+ with self.subTest('no args, return None'):
+ func = defs.spam_minimal
+ res, exc = _interpreters.call(interpid, func)
+ self.assertIsNone(exc)
+ self.assertIsNone(res)
+
+ with self.subTest('empty args, return None'):
+ func = defs.spam_minimal
+ res, exc = _interpreters.call(interpid, func, (), {})
+ self.assertIsNone(exc)
+ self.assertIsNone(res)
+
+ with self.subTest('no args, return non-None'):
+ func = defs.script_with_return
+ res, exc = _interpreters.call(interpid, func)
+ self.assertIsNone(exc)
+ self.assertIs(res, True)
+
+ with self.subTest('full args, return non-None'):
+ expected = (1, 2, 3, 4, 5, 6, (7, 8), {'g': 9, 'h': 0})
+ func = defs.spam_full_args
+ args = (1, 2, 3, 4, 7, 8)
+ kwargs = dict(e=5, f=6, g=9, h=0)
+ res, exc = _interpreters.call(interpid, func, args, kwargs)
+ self.assertIsNone(exc)
+ self.assertEqual(res, expected)
with self.subTest('uncaught exception'):
- interpid = _interpreters.create()
- exc = _interpreters.call(interpid, call_func_failure)
- self.assertEqual(exc, types.SimpleNamespace(
- type=types.SimpleNamespace(
- __name__='Exception',
- __qualname__='Exception',
- __module__='builtins',
- ),
- msg='spam!',
+ func = defs.spam_raises
+ res, exc = _interpreters.call(interpid, func)
+ expected = build_excinfo(
+ Exception, 'spam!',
# We check these in other tests.
formatted=exc.formatted,
errdisplay=exc.errdisplay,
- ))
+ )
+ self.assertIsNone(res)
+ self.assertEqual(exc, expected)
@requires_test_modules
def test_set___main___attrs(self):
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 168e66c5a3f..0c921ffbc25 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -1062,6 +1062,37 @@ class IOTest(unittest.TestCase):
# Silence destructor error
R.flush = lambda self: None
+ @threading_helper.requires_working_threading()
+ def test_write_readline_races(self):
+ # gh-134908: Concurrent iteration over a file caused races
+ thread_count = 2
+ write_count = 100
+ read_count = 100
+
+ def writer(file, barrier):
+ barrier.wait()
+ for _ in range(write_count):
+ file.write("x")
+
+ def reader(file, barrier):
+ barrier.wait()
+ for _ in range(read_count):
+ for line in file:
+ self.assertEqual(line, "")
+
+ with self.open(os_helper.TESTFN, "w+") as f:
+ barrier = threading.Barrier(thread_count + 1)
+ reader = threading.Thread(target=reader, args=(f, barrier))
+ writers = [threading.Thread(target=writer, args=(f, barrier))
+ for _ in range(thread_count)]
+ with threading_helper.catch_threading_exception() as cm:
+ with threading_helper.start_threads(writers + [reader]):
+ pass
+ self.assertIsNone(cm.exc_type)
+
+ self.assertEqual(os.stat(os_helper.TESTFN).st_size,
+ write_count * thread_count)
+
class CIOTest(IOTest):
diff --git a/Lib/test/test_json/test_dump.py b/Lib/test/test_json/test_dump.py
index 13b40020781..39470754003 100644
--- a/Lib/test/test_json/test_dump.py
+++ b/Lib/test/test_json/test_dump.py
@@ -22,6 +22,14 @@ class TestDump:
self.assertIn('valid_key', o)
self.assertNotIn(b'invalid_key', o)
+ def test_dump_skipkeys_indent_empty(self):
+ v = {b'invalid_key': False}
+ self.assertEqual(self.json.dumps(v, skipkeys=True, indent=4), '{}')
+
+ def test_skipkeys_indent(self):
+ v = {b'invalid_key': False, 'valid_key': True}
+ self.assertEqual(self.json.dumps(v, skipkeys=True, indent=4), '{\n "valid_key": true\n}')
+
def test_encode_truefalse(self):
self.assertEqual(self.dumps(
{True: False, False: True}, sort_keys=True),
diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py
index d14336f8bac..384ad5c828d 100644
--- a/Lib/test/test_math.py
+++ b/Lib/test/test_math.py
@@ -1973,6 +1973,28 @@ class MathTests(unittest.TestCase):
self.assertFalse(math.isfinite(float("inf")))
self.assertFalse(math.isfinite(float("-inf")))
+ def testIsnormal(self):
+ self.assertTrue(math.isnormal(1.25))
+ self.assertTrue(math.isnormal(-1.0))
+ self.assertFalse(math.isnormal(0.0))
+ self.assertFalse(math.isnormal(-0.0))
+ self.assertFalse(math.isnormal(INF))
+ self.assertFalse(math.isnormal(NINF))
+ self.assertFalse(math.isnormal(NAN))
+ self.assertFalse(math.isnormal(FLOAT_MIN/2))
+ self.assertFalse(math.isnormal(-FLOAT_MIN/2))
+
+ def testIssubnormal(self):
+ self.assertFalse(math.issubnormal(1.25))
+ self.assertFalse(math.issubnormal(-1.0))
+ self.assertFalse(math.issubnormal(0.0))
+ self.assertFalse(math.issubnormal(-0.0))
+ self.assertFalse(math.issubnormal(INF))
+ self.assertFalse(math.issubnormal(NINF))
+ self.assertFalse(math.issubnormal(NAN))
+ self.assertTrue(math.issubnormal(FLOAT_MIN/2))
+ self.assertTrue(math.issubnormal(-FLOAT_MIN/2))
+
def testIsnan(self):
self.assertTrue(math.isnan(float("nan")))
self.assertTrue(math.isnan(float("-nan")))
diff --git a/Lib/test/test_monitoring.py b/Lib/test/test_monitoring.py
index 263e4e6f394..a932ac80117 100644
--- a/Lib/test/test_monitoring.py
+++ b/Lib/test/test_monitoring.py
@@ -2157,6 +2157,21 @@ class TestRegressions(MonitoringTestBase, unittest.TestCase):
sys.monitoring.restart_events()
sys.monitoring.set_events(0, 0)
+ def test_134879(self):
+ # gh-134789
+ # Specialized FOR_ITER not incrementing index
+ def foo():
+ t = 0
+ for i in [1,2,3,4]:
+ t += i
+ self.assertEqual(t, 10)
+
+ sys.monitoring.use_tool_id(0, "test")
+ self.addCleanup(sys.monitoring.free_tool_id, 0)
+ sys.monitoring.set_local_events(0, foo.__code__, E.BRANCH_LEFT | E.BRANCH_RIGHT)
+ foo()
+ sys.monitoring.set_local_events(0, foo.__code__, 0)
+
class TestOptimizer(MonitoringTestBase, unittest.TestCase):
diff --git a/Lib/test/test_ntpath.py b/Lib/test/test_ntpath.py
index c3b0bdaebc2..22f6403d482 100644
--- a/Lib/test/test_ntpath.py
+++ b/Lib/test/test_ntpath.py
@@ -6,6 +6,8 @@ import subprocess
import sys
import unittest
import warnings
+from ntpath import ALLOW_MISSING
+from test import support
from test.support import TestFailed, cpython_only, os_helper
from test.support.os_helper import FakePath
from test import test_genericpath
@@ -76,6 +78,10 @@ def tester(fn, wantResult):
%(str(fn), str(wantResult), repr(gotResult)))
+def _parameterize(*parameters):
+ return support.subTests('kwargs', parameters, _do_cleanups=True)
+
+
class NtpathTestCase(unittest.TestCase):
def assertPathEqual(self, path1, path2):
if path1 == path2 or _norm(path1) == _norm(path2):
@@ -474,6 +480,27 @@ class TestNtpath(NtpathTestCase):
tester("ntpath.realpath('.\\.')", expected)
tester("ntpath.realpath('\\'.join(['.'] * 100))", expected)
+ def test_realpath_curdir_strict(self):
+ expected = ntpath.normpath(os.getcwd())
+ tester("ntpath.realpath('.', strict=True)", expected)
+ tester("ntpath.realpath('./.', strict=True)", expected)
+ tester("ntpath.realpath('/'.join(['.'] * 100), strict=True)", expected)
+ tester("ntpath.realpath('.\\.', strict=True)", expected)
+ tester("ntpath.realpath('\\'.join(['.'] * 100), strict=True)", expected)
+
+ def test_realpath_curdir_missing_ok(self):
+ expected = ntpath.normpath(os.getcwd())
+ tester("ntpath.realpath('.', strict=ALLOW_MISSING)",
+ expected)
+ tester("ntpath.realpath('./.', strict=ALLOW_MISSING)",
+ expected)
+ tester("ntpath.realpath('/'.join(['.'] * 100), strict=ALLOW_MISSING)",
+ expected)
+ tester("ntpath.realpath('.\\.', strict=ALLOW_MISSING)",
+ expected)
+ tester("ntpath.realpath('\\'.join(['.'] * 100), strict=ALLOW_MISSING)",
+ expected)
+
def test_realpath_pardir(self):
expected = ntpath.normpath(os.getcwd())
tester("ntpath.realpath('..')", ntpath.dirname(expected))
@@ -486,24 +513,59 @@ class TestNtpath(NtpathTestCase):
tester("ntpath.realpath('\\'.join(['..'] * 50))",
ntpath.splitdrive(expected)[0] + '\\')
+ def test_realpath_pardir_strict(self):
+ expected = ntpath.normpath(os.getcwd())
+ tester("ntpath.realpath('..', strict=True)", ntpath.dirname(expected))
+ tester("ntpath.realpath('../..', strict=True)",
+ ntpath.dirname(ntpath.dirname(expected)))
+ tester("ntpath.realpath('/'.join(['..'] * 50), strict=True)",
+ ntpath.splitdrive(expected)[0] + '\\')
+ tester("ntpath.realpath('..\\..', strict=True)",
+ ntpath.dirname(ntpath.dirname(expected)))
+ tester("ntpath.realpath('\\'.join(['..'] * 50), strict=True)",
+ ntpath.splitdrive(expected)[0] + '\\')
+
+ def test_realpath_pardir_missing_ok(self):
+ expected = ntpath.normpath(os.getcwd())
+ tester("ntpath.realpath('..', strict=ALLOW_MISSING)",
+ ntpath.dirname(expected))
+ tester("ntpath.realpath('../..', strict=ALLOW_MISSING)",
+ ntpath.dirname(ntpath.dirname(expected)))
+ tester("ntpath.realpath('/'.join(['..'] * 50), strict=ALLOW_MISSING)",
+ ntpath.splitdrive(expected)[0] + '\\')
+ tester("ntpath.realpath('..\\..', strict=ALLOW_MISSING)",
+ ntpath.dirname(ntpath.dirname(expected)))
+ tester("ntpath.realpath('\\'.join(['..'] * 50), strict=ALLOW_MISSING)",
+ ntpath.splitdrive(expected)[0] + '\\')
+
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
- def test_realpath_basic(self):
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_basic(self, kwargs):
ABSTFN = ntpath.abspath(os_helper.TESTFN)
open(ABSTFN, "wb").close()
self.addCleanup(os_helper.unlink, ABSTFN)
self.addCleanup(os_helper.unlink, ABSTFN + "1")
os.symlink(ABSTFN, ABSTFN + "1")
- self.assertPathEqual(ntpath.realpath(ABSTFN + "1"), ABSTFN)
- self.assertPathEqual(ntpath.realpath(os.fsencode(ABSTFN + "1")),
+ self.assertPathEqual(ntpath.realpath(ABSTFN + "1", **kwargs), ABSTFN)
+ self.assertPathEqual(ntpath.realpath(os.fsencode(ABSTFN + "1"), **kwargs),
os.fsencode(ABSTFN))
# gh-88013: call ntpath.realpath with binary drive name may raise a
# TypeError. The drive should not exist to reproduce the bug.
drives = {f"{c}:\\" for c in string.ascii_uppercase} - set(os.listdrives())
d = drives.pop().encode()
- self.assertEqual(ntpath.realpath(d), d)
+ self.assertEqual(ntpath.realpath(d, strict=False), d)
+
+ # gh-106242: Embedded nulls and non-strict fallback to abspath
+ if kwargs:
+ with self.assertRaises(OSError):
+ ntpath.realpath(os_helper.TESTFN + "\0spam",
+ **kwargs)
+ else:
+ self.assertEqual(ABSTFN + "\0spam",
+ ntpath.realpath(os_helper.TESTFN + "\0spam", **kwargs))
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@@ -526,51 +588,66 @@ class TestNtpath(NtpathTestCase):
self.assertEqual(realpath(path, strict=False), path)
# gh-106242: Embedded nulls should raise OSError (not ValueError)
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFNb + b'\x00'
self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFN + '\\nonexistent\\x\x00'
self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFNb + b'\\nonexistent\\x\x00'
self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
path = ABSTFN + '\x00\\..'
self.assertEqual(realpath(path, strict=False), os.getcwd())
self.assertEqual(realpath(path, strict=True), os.getcwd())
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), os.getcwd())
path = ABSTFNb + b'\x00\\..'
self.assertEqual(realpath(path, strict=False), os.getcwdb())
self.assertEqual(realpath(path, strict=True), os.getcwdb())
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), os.getcwdb())
path = ABSTFN + '\\nonexistent\\x\x00\\..'
self.assertEqual(realpath(path, strict=False), ABSTFN + '\\nonexistent')
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), ABSTFN + '\\nonexistent')
path = ABSTFNb + b'\\nonexistent\\x\x00\\..'
self.assertEqual(realpath(path, strict=False), ABSTFNb + b'\\nonexistent')
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), ABSTFNb + b'\\nonexistent')
+ @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_invalid_unicode_paths(self, kwargs):
+ realpath = ntpath.realpath
+ ABSTFN = ntpath.abspath(os_helper.TESTFN)
+ ABSTFNb = os.fsencode(ABSTFN)
path = ABSTFNb + b'\xff'
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
path = ABSTFNb + b'\\nonexistent\\\xff'
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
path = ABSTFNb + b'\xff\\..'
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
path = ABSTFNb + b'\\nonexistent\\\xff\\..'
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
- self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
+ self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs)
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
- def test_realpath_relative(self):
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_relative(self, kwargs):
ABSTFN = ntpath.abspath(os_helper.TESTFN)
open(ABSTFN, "wb").close()
self.addCleanup(os_helper.unlink, ABSTFN)
self.addCleanup(os_helper.unlink, ABSTFN + "1")
os.symlink(ABSTFN, ntpath.relpath(ABSTFN + "1"))
- self.assertPathEqual(ntpath.realpath(ABSTFN + "1"), ABSTFN)
+ self.assertPathEqual(ntpath.realpath(ABSTFN + "1", **kwargs), ABSTFN)
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@@ -722,7 +799,62 @@ class TestNtpath(NtpathTestCase):
@os_helper.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
- def test_realpath_symlink_prefix(self):
+ def test_realpath_symlink_loops_raise(self):
+ # Symlink loops raise OSError in ALLOW_MISSING mode
+ ABSTFN = ntpath.abspath(os_helper.TESTFN)
+ self.addCleanup(os_helper.unlink, ABSTFN)
+ self.addCleanup(os_helper.unlink, ABSTFN + "1")
+ self.addCleanup(os_helper.unlink, ABSTFN + "2")
+ self.addCleanup(os_helper.unlink, ABSTFN + "y")
+ self.addCleanup(os_helper.unlink, ABSTFN + "c")
+ self.addCleanup(os_helper.unlink, ABSTFN + "a")
+ self.addCleanup(os_helper.unlink, ABSTFN + "x")
+
+ os.symlink(ABSTFN, ABSTFN)
+ self.assertRaises(OSError, ntpath.realpath, ABSTFN, strict=ALLOW_MISSING)
+
+ os.symlink(ABSTFN + "1", ABSTFN + "2")
+ os.symlink(ABSTFN + "2", ABSTFN + "1")
+ self.assertRaises(OSError, ntpath.realpath, ABSTFN + "1",
+ strict=ALLOW_MISSING)
+ self.assertRaises(OSError, ntpath.realpath, ABSTFN + "2",
+ strict=ALLOW_MISSING)
+ self.assertRaises(OSError, ntpath.realpath, ABSTFN + "1\\x",
+ strict=ALLOW_MISSING)
+
+ # Windows eliminates '..' components before resolving links;
+ # realpath is not expected to raise if this removes the loop.
+ self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\.."),
+ ntpath.dirname(ABSTFN))
+ self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\..\\x"),
+ ntpath.dirname(ABSTFN) + "\\x")
+
+ os.symlink(ABSTFN + "x", ABSTFN + "y")
+ self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\..\\"
+ + ntpath.basename(ABSTFN) + "y"),
+ ABSTFN + "x")
+ self.assertRaises(
+ OSError, ntpath.realpath,
+ ABSTFN + "1\\..\\" + ntpath.basename(ABSTFN) + "1",
+ strict=ALLOW_MISSING)
+
+ os.symlink(ntpath.basename(ABSTFN) + "a\\b", ABSTFN + "a")
+ self.assertRaises(OSError, ntpath.realpath, ABSTFN + "a",
+ strict=ALLOW_MISSING)
+
+ os.symlink("..\\" + ntpath.basename(ntpath.dirname(ABSTFN))
+ + "\\" + ntpath.basename(ABSTFN) + "c", ABSTFN + "c")
+ self.assertRaises(OSError, ntpath.realpath, ABSTFN + "c",
+ strict=ALLOW_MISSING)
+
+ # Test using relative path as well.
+ self.assertRaises(OSError, ntpath.realpath, ntpath.basename(ABSTFN),
+ strict=ALLOW_MISSING)
+
+ @os_helper.skip_unless_symlink
+ @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_symlink_prefix(self, kwargs):
ABSTFN = ntpath.abspath(os_helper.TESTFN)
self.addCleanup(os_helper.unlink, ABSTFN + "3")
self.addCleanup(os_helper.unlink, "\\\\?\\" + ABSTFN + "3.")
@@ -737,9 +869,9 @@ class TestNtpath(NtpathTestCase):
f.write(b'1')
os.symlink("\\\\?\\" + ABSTFN + "3.", ABSTFN + "3.link")
- self.assertPathEqual(ntpath.realpath(ABSTFN + "3link"),
+ self.assertPathEqual(ntpath.realpath(ABSTFN + "3link", **kwargs),
ABSTFN + "3")
- self.assertPathEqual(ntpath.realpath(ABSTFN + "3.link"),
+ self.assertPathEqual(ntpath.realpath(ABSTFN + "3.link", **kwargs),
"\\\\?\\" + ABSTFN + "3.")
# Resolved paths should be usable to open target files
@@ -749,14 +881,17 @@ class TestNtpath(NtpathTestCase):
self.assertEqual(f.read(), b'1')
# When the prefix is included, it is not stripped
- self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3link"),
+ self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3link", **kwargs),
"\\\\?\\" + ABSTFN + "3")
- self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3.link"),
+ self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3.link", **kwargs),
"\\\\?\\" + ABSTFN + "3.")
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_nul(self):
tester("ntpath.realpath('NUL')", r'\\.\NUL')
+ tester("ntpath.realpath('NUL', strict=False)", r'\\.\NUL')
+ tester("ntpath.realpath('NUL', strict=True)", r'\\.\NUL')
+ tester("ntpath.realpath('NUL', strict=ALLOW_MISSING)", r'\\.\NUL')
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
@unittest.skipUnless(HAVE_GETSHORTPATHNAME, 'need _getshortpathname')
@@ -780,12 +915,20 @@ class TestNtpath(NtpathTestCase):
self.assertPathEqual(test_file_long, ntpath.realpath(test_file_short))
- with os_helper.change_cwd(test_dir_long):
- self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
- with os_helper.change_cwd(test_dir_long.lower()):
- self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
- with os_helper.change_cwd(test_dir_short):
- self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
+ for kwargs in {}, {'strict': True}, {'strict': ALLOW_MISSING}:
+ with self.subTest(**kwargs):
+ with os_helper.change_cwd(test_dir_long):
+ self.assertPathEqual(
+ test_file_long,
+ ntpath.realpath("file.txt", **kwargs))
+ with os_helper.change_cwd(test_dir_long.lower()):
+ self.assertPathEqual(
+ test_file_long,
+ ntpath.realpath("file.txt", **kwargs))
+ with os_helper.change_cwd(test_dir_short):
+ self.assertPathEqual(
+ test_file_long,
+ ntpath.realpath("file.txt", **kwargs))
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_permission(self):
@@ -806,12 +949,15 @@ class TestNtpath(NtpathTestCase):
# Automatic generation of short names may be disabled on
# NTFS volumes for the sake of performance.
# They're not supported at all on ReFS and exFAT.
- subprocess.run(
+ p = subprocess.run(
# Try to set the short name manually.
['fsutil.exe', 'file', 'setShortName', test_file, 'LONGFI~1.TXT'],
creationflags=subprocess.DETACHED_PROCESS
)
+ if p.returncode:
+ raise unittest.SkipTest('failed to set short name')
+
try:
self.assertPathEqual(test_file, ntpath.realpath(test_file_short))
except AssertionError:
diff --git a/Lib/test/test_peepholer.py b/Lib/test/test_peepholer.py
index f33de3d420c..ef596630b93 100644
--- a/Lib/test/test_peepholer.py
+++ b/Lib/test/test_peepholer.py
@@ -2614,6 +2614,90 @@ class OptimizeLoadFastTestCase(DirectCfgOptimizerTests):
]
self.cfg_optimization_test(insts, expected, consts=[None])
+ def test_format_simple(self):
+ # FORMAT_SIMPLE will leave its operand on the stack if it's a unicode
+ # object. We treat it conservatively and assume that it always leaves
+ # its operand on the stack.
+ insts = [
+ ("LOAD_FAST", 0, 1),
+ ("FORMAT_SIMPLE", None, 2),
+ ("STORE_FAST", 1, 3),
+ ]
+ self.check(insts, insts)
+
+ insts = [
+ ("LOAD_FAST", 0, 1),
+ ("FORMAT_SIMPLE", None, 2),
+ ("POP_TOP", None, 3),
+ ]
+ expected = [
+ ("LOAD_FAST_BORROW", 0, 1),
+ ("FORMAT_SIMPLE", None, 2),
+ ("POP_TOP", None, 3),
+ ]
+ self.check(insts, expected)
+
+ def test_set_function_attribute(self):
+ # SET_FUNCTION_ATTRIBUTE leaves the function on the stack
+ insts = [
+ ("LOAD_CONST", 0, 1),
+ ("LOAD_FAST", 0, 2),
+ ("SET_FUNCTION_ATTRIBUTE", 2, 3),
+ ("STORE_FAST", 1, 4),
+ ("LOAD_CONST", 0, 5),
+ ("RETURN_VALUE", None, 6)
+ ]
+ self.cfg_optimization_test(insts, insts, consts=[None])
+
+ insts = [
+ ("LOAD_CONST", 0, 1),
+ ("LOAD_FAST", 0, 2),
+ ("SET_FUNCTION_ATTRIBUTE", 2, 3),
+ ("RETURN_VALUE", None, 4)
+ ]
+ expected = [
+ ("LOAD_CONST", 0, 1),
+ ("LOAD_FAST_BORROW", 0, 2),
+ ("SET_FUNCTION_ATTRIBUTE", 2, 3),
+ ("RETURN_VALUE", None, 4)
+ ]
+ self.cfg_optimization_test(insts, expected, consts=[None])
+
+ def test_get_yield_from_iter(self):
+ # GET_YIELD_FROM_ITER may leave its operand on the stack
+ insts = [
+ ("LOAD_FAST", 0, 1),
+ ("GET_YIELD_FROM_ITER", None, 2),
+ ("LOAD_CONST", 0, 3),
+ send := self.Label(),
+ ("SEND", end := self.Label(), 5),
+ ("YIELD_VALUE", 1, 6),
+ ("RESUME", 2, 7),
+ ("JUMP", send, 8),
+ end,
+ ("END_SEND", None, 9),
+ ("LOAD_CONST", 0, 10),
+ ("RETURN_VALUE", None, 11),
+ ]
+ self.cfg_optimization_test(insts, insts, consts=[None])
+
+ def test_push_exc_info(self):
+ insts = [
+ ("LOAD_FAST", 0, 1),
+ ("PUSH_EXC_INFO", None, 2),
+ ]
+ self.check(insts, insts)
+
+ def test_load_special(self):
+ # LOAD_SPECIAL may leave self on the stack
+ insts = [
+ ("LOAD_FAST", 0, 1),
+ ("LOAD_SPECIAL", 0, 2),
+ ("STORE_FAST", 1, 3),
+ ]
+ self.check(insts, insts)
+
+
def test_del_in_finally(self):
# This loads `obj` onto the stack, executes `del obj`, then returns the
# `obj` from the stack. See gh-133371 for more details.
@@ -2630,6 +2714,14 @@ class OptimizeLoadFastTestCase(DirectCfgOptimizerTests):
gc.collect()
self.assertEqual(obj, [42])
+ def test_format_simple_unicode(self):
+ # Repro from gh-134889
+ def f():
+ var = f"{1}"
+ var = f"{var}"
+ return var
+ self.assertEqual(f(), "1")
+
if __name__ == "__main__":
diff --git a/Lib/test/test_posixpath.py b/Lib/test/test_posixpath.py
index f3f9895f529..21f06712548 100644
--- a/Lib/test/test_posixpath.py
+++ b/Lib/test/test_posixpath.py
@@ -4,7 +4,8 @@ import posixpath
import random
import sys
import unittest
-from posixpath import realpath, abspath, dirname, basename
+from functools import partial
+from posixpath import realpath, abspath, dirname, basename, ALLOW_MISSING
from test import support
from test import test_genericpath
from test.support import import_helper
@@ -33,6 +34,11 @@ def skip_if_ABSTFN_contains_backslash(test):
msg = "ABSTFN is not a posix path - tests fail"
return [test, unittest.skip(msg)(test)][found_backslash]
+
+def _parameterize(*parameters):
+ return support.subTests('kwargs', parameters)
+
+
class PosixPathTest(unittest.TestCase):
def setUp(self):
@@ -442,32 +448,35 @@ class PosixPathTest(unittest.TestCase):
self.assertEqual(result, expected)
@skip_if_ABSTFN_contains_backslash
- def test_realpath_curdir(self):
- self.assertEqual(realpath('.'), os.getcwd())
- self.assertEqual(realpath('./.'), os.getcwd())
- self.assertEqual(realpath('/'.join(['.'] * 100)), os.getcwd())
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_curdir(self, kwargs):
+ self.assertEqual(realpath('.', **kwargs), os.getcwd())
+ self.assertEqual(realpath('./.', **kwargs), os.getcwd())
+ self.assertEqual(realpath('/'.join(['.'] * 100), **kwargs), os.getcwd())
- self.assertEqual(realpath(b'.'), os.getcwdb())
- self.assertEqual(realpath(b'./.'), os.getcwdb())
- self.assertEqual(realpath(b'/'.join([b'.'] * 100)), os.getcwdb())
+ self.assertEqual(realpath(b'.', **kwargs), os.getcwdb())
+ self.assertEqual(realpath(b'./.', **kwargs), os.getcwdb())
+ self.assertEqual(realpath(b'/'.join([b'.'] * 100), **kwargs), os.getcwdb())
@skip_if_ABSTFN_contains_backslash
- def test_realpath_pardir(self):
- self.assertEqual(realpath('..'), dirname(os.getcwd()))
- self.assertEqual(realpath('../..'), dirname(dirname(os.getcwd())))
- self.assertEqual(realpath('/'.join(['..'] * 100)), '/')
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_pardir(self, kwargs):
+ self.assertEqual(realpath('..', **kwargs), dirname(os.getcwd()))
+ self.assertEqual(realpath('../..', **kwargs), dirname(dirname(os.getcwd())))
+ self.assertEqual(realpath('/'.join(['..'] * 100), **kwargs), '/')
- self.assertEqual(realpath(b'..'), dirname(os.getcwdb()))
- self.assertEqual(realpath(b'../..'), dirname(dirname(os.getcwdb())))
- self.assertEqual(realpath(b'/'.join([b'..'] * 100)), b'/')
+ self.assertEqual(realpath(b'..', **kwargs), dirname(os.getcwdb()))
+ self.assertEqual(realpath(b'../..', **kwargs), dirname(dirname(os.getcwdb())))
+ self.assertEqual(realpath(b'/'.join([b'..'] * 100), **kwargs), b'/')
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_basic(self):
+ @_parameterize({}, {'strict': ALLOW_MISSING})
+ def test_realpath_basic(self, kwargs):
# Basic operation.
try:
os.symlink(ABSTFN+"1", ABSTFN)
- self.assertEqual(realpath(ABSTFN), ABSTFN+"1")
+ self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1")
finally:
os_helper.unlink(ABSTFN)
@@ -487,90 +496,115 @@ class PosixPathTest(unittest.TestCase):
path = '/\x00'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/\x00'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = '/nonexistent/x\x00'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/nonexistent/x\x00'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = '/\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(ValueError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
+
path = '/nonexistent/x\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = b'/nonexistent/x\x00/..'
self.assertRaises(ValueError, realpath, path, strict=False)
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING)
path = '/\udfff'
if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), path)
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), path)
else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=True)
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
path = '/nonexistent/\udfff'
if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), path)
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), path)
else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
path = '/\udfff/..'
if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), '/')
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/')
else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
self.assertRaises(UnicodeEncodeError, realpath, path, strict=True)
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
path = '/nonexistent/\udfff/..'
if sys.platform == 'win32':
self.assertEqual(realpath(path, strict=False), '/nonexistent')
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/nonexistent')
else:
self.assertRaises(UnicodeEncodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING)
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
path = b'/\xff'
if sys.platform == 'win32':
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
self.assertRaises(UnicodeDecodeError, realpath, path, strict=True)
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING)
else:
self.assertEqual(realpath(path, strict=False), path)
if support.is_wasi:
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
else:
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
+ self.assertEqual(realpath(path, strict=ALLOW_MISSING), path)
path = b'/nonexistent/\xff'
if sys.platform == 'win32':
self.assertRaises(UnicodeDecodeError, realpath, path, strict=False)
+ self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING)
else:
self.assertEqual(realpath(path, strict=False), path)
if support.is_wasi:
self.assertRaises(OSError, realpath, path, strict=True)
+ self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING)
else:
self.assertRaises(FileNotFoundError, realpath, path, strict=True)
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_relative(self):
+ @_parameterize({}, {'strict': ALLOW_MISSING})
+ def test_realpath_relative(self, kwargs):
try:
os.symlink(posixpath.relpath(ABSTFN+"1"), ABSTFN)
- self.assertEqual(realpath(ABSTFN), ABSTFN+"1")
+ self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1")
finally:
os_helper.unlink(ABSTFN)
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_missing_pardir(self):
+ @_parameterize({}, {'strict': ALLOW_MISSING})
+ def test_realpath_missing_pardir(self, kwargs):
try:
os.symlink(TESTFN + "1", TESTFN)
- self.assertEqual(realpath("nonexistent/../" + TESTFN), ABSTFN + "1")
+ self.assertEqual(
+ realpath("nonexistent/../" + TESTFN, **kwargs), ABSTFN + "1")
finally:
os_helper.unlink(TESTFN)
@@ -617,37 +651,38 @@ class PosixPathTest(unittest.TestCase):
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_symlink_loops_strict(self):
+ @_parameterize({'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_symlink_loops_strict(self, kwargs):
# Bug #43757, raise OSError if we get into an infinite symlink loop in
- # strict mode.
+ # the strict modes.
try:
os.symlink(ABSTFN, ABSTFN)
- self.assertRaises(OSError, realpath, ABSTFN, strict=True)
+ self.assertRaises(OSError, realpath, ABSTFN, **kwargs)
os.symlink(ABSTFN+"1", ABSTFN+"2")
os.symlink(ABSTFN+"2", ABSTFN+"1")
- self.assertRaises(OSError, realpath, ABSTFN+"1", strict=True)
- self.assertRaises(OSError, realpath, ABSTFN+"2", strict=True)
+ self.assertRaises(OSError, realpath, ABSTFN+"1", **kwargs)
+ self.assertRaises(OSError, realpath, ABSTFN+"2", **kwargs)
- self.assertRaises(OSError, realpath, ABSTFN+"1/x", strict=True)
- self.assertRaises(OSError, realpath, ABSTFN+"1/..", strict=True)
- self.assertRaises(OSError, realpath, ABSTFN+"1/../x", strict=True)
+ self.assertRaises(OSError, realpath, ABSTFN+"1/x", **kwargs)
+ self.assertRaises(OSError, realpath, ABSTFN+"1/..", **kwargs)
+ self.assertRaises(OSError, realpath, ABSTFN+"1/../x", **kwargs)
os.symlink(ABSTFN+"x", ABSTFN+"y")
self.assertRaises(OSError, realpath,
- ABSTFN+"1/../" + basename(ABSTFN) + "y", strict=True)
+ ABSTFN+"1/../" + basename(ABSTFN) + "y", **kwargs)
self.assertRaises(OSError, realpath,
- ABSTFN+"1/../" + basename(ABSTFN) + "1", strict=True)
+ ABSTFN+"1/../" + basename(ABSTFN) + "1", **kwargs)
os.symlink(basename(ABSTFN) + "a/b", ABSTFN+"a")
- self.assertRaises(OSError, realpath, ABSTFN+"a", strict=True)
+ self.assertRaises(OSError, realpath, ABSTFN+"a", **kwargs)
os.symlink("../" + basename(dirname(ABSTFN)) + "/" +
basename(ABSTFN) + "c", ABSTFN+"c")
- self.assertRaises(OSError, realpath, ABSTFN+"c", strict=True)
+ self.assertRaises(OSError, realpath, ABSTFN+"c", **kwargs)
# Test using relative path as well.
with os_helper.change_cwd(dirname(ABSTFN)):
- self.assertRaises(OSError, realpath, basename(ABSTFN), strict=True)
+ self.assertRaises(OSError, realpath, basename(ABSTFN), **kwargs)
finally:
os_helper.unlink(ABSTFN)
os_helper.unlink(ABSTFN+"1")
@@ -658,13 +693,14 @@ class PosixPathTest(unittest.TestCase):
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_repeated_indirect_symlinks(self):
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_repeated_indirect_symlinks(self, kwargs):
# Issue #6975.
try:
os.mkdir(ABSTFN)
os.symlink('../' + basename(ABSTFN), ABSTFN + '/self')
os.symlink('self/self/self', ABSTFN + '/link')
- self.assertEqual(realpath(ABSTFN + '/link'), ABSTFN)
+ self.assertEqual(realpath(ABSTFN + '/link', **kwargs), ABSTFN)
finally:
os_helper.unlink(ABSTFN + '/self')
os_helper.unlink(ABSTFN + '/link')
@@ -672,14 +708,15 @@ class PosixPathTest(unittest.TestCase):
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_deep_recursion(self):
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_deep_recursion(self, kwargs):
depth = 10
try:
os.mkdir(ABSTFN)
for i in range(depth):
os.symlink('/'.join(['%d' % i] * 10), ABSTFN + '/%d' % (i + 1))
os.symlink('.', ABSTFN + '/0')
- self.assertEqual(realpath(ABSTFN + '/%d' % depth), ABSTFN)
+ self.assertEqual(realpath(ABSTFN + '/%d' % depth, **kwargs), ABSTFN)
# Test using relative path as well.
with os_helper.change_cwd(ABSTFN):
@@ -691,7 +728,8 @@ class PosixPathTest(unittest.TestCase):
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_resolve_parents(self):
+ @_parameterize({}, {'strict': ALLOW_MISSING})
+ def test_realpath_resolve_parents(self, kwargs):
# We also need to resolve any symlinks in the parents of a relative
# path passed to realpath. E.g.: current working directory is
# /usr/doc with 'doc' being a symlink to /usr/share/doc. We call
@@ -702,7 +740,8 @@ class PosixPathTest(unittest.TestCase):
os.symlink(ABSTFN + "/y", ABSTFN + "/k")
with os_helper.change_cwd(ABSTFN + "/k"):
- self.assertEqual(realpath("a"), ABSTFN + "/y/a")
+ self.assertEqual(realpath("a", **kwargs),
+ ABSTFN + "/y/a")
finally:
os_helper.unlink(ABSTFN + "/k")
os_helper.rmdir(ABSTFN + "/y")
@@ -710,7 +749,8 @@ class PosixPathTest(unittest.TestCase):
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_resolve_before_normalizing(self):
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_resolve_before_normalizing(self, kwargs):
# Bug #990669: Symbolic links should be resolved before we
# normalize the path. E.g.: if we have directories 'a', 'k' and 'y'
# in the following hierarchy:
@@ -725,10 +765,10 @@ class PosixPathTest(unittest.TestCase):
os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y")
# Absolute path.
- self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k")
+ self.assertEqual(realpath(ABSTFN + "/link-y/..", **kwargs), ABSTFN + "/k")
# Relative path.
with os_helper.change_cwd(dirname(ABSTFN)):
- self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."),
+ self.assertEqual(realpath(basename(ABSTFN) + "/link-y/..", **kwargs),
ABSTFN + "/k")
finally:
os_helper.unlink(ABSTFN + "/link-y")
@@ -738,7 +778,8 @@ class PosixPathTest(unittest.TestCase):
@os_helper.skip_unless_symlink
@skip_if_ABSTFN_contains_backslash
- def test_realpath_resolve_first(self):
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_resolve_first(self, kwargs):
# Bug #1213894: The first component of the path, if not absolute,
# must be resolved too.
@@ -748,8 +789,8 @@ class PosixPathTest(unittest.TestCase):
os.symlink(ABSTFN, ABSTFN + "link")
with os_helper.change_cwd(dirname(ABSTFN)):
base = basename(ABSTFN)
- self.assertEqual(realpath(base + "link"), ABSTFN)
- self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k")
+ self.assertEqual(realpath(base + "link", **kwargs), ABSTFN)
+ self.assertEqual(realpath(base + "link/k", **kwargs), ABSTFN + "/k")
finally:
os_helper.unlink(ABSTFN + "link")
os_helper.rmdir(ABSTFN + "/k")
@@ -767,12 +808,67 @@ class PosixPathTest(unittest.TestCase):
self.assertEqual(realpath(ABSTFN + '/foo'), ABSTFN + '/foo')
self.assertEqual(realpath(ABSTFN + '/../foo'), dirname(ABSTFN) + '/foo')
self.assertEqual(realpath(ABSTFN + '/foo/..'), ABSTFN)
- with self.assertRaises(PermissionError):
- realpath(ABSTFN, strict=True)
finally:
os.chmod(ABSTFN, 0o755, follow_symlinks=False)
os_helper.unlink(ABSTFN)
+ @os_helper.skip_unless_symlink
+ @skip_if_ABSTFN_contains_backslash
+ @unittest.skipIf(os.chmod not in os.supports_follow_symlinks, "Can't set symlink permissions")
+ @unittest.skipIf(sys.platform != "darwin", "only macOS requires read permission to readlink()")
+ @_parameterize({'strict': True}, {'strict': ALLOW_MISSING})
+ def test_realpath_unreadable_symlink_strict(self, kwargs):
+ try:
+ os.symlink(ABSTFN+"1", ABSTFN)
+ os.chmod(ABSTFN, 0o000, follow_symlinks=False)
+ with self.assertRaises(PermissionError):
+ realpath(ABSTFN, **kwargs)
+ with self.assertRaises(PermissionError):
+ realpath(ABSTFN + '/foo', **kwargs),
+ with self.assertRaises(PermissionError):
+ realpath(ABSTFN + '/../foo', **kwargs)
+ with self.assertRaises(PermissionError):
+ realpath(ABSTFN + '/foo/..', **kwargs)
+ finally:
+ os.chmod(ABSTFN, 0o755, follow_symlinks=False)
+ os.unlink(ABSTFN)
+
+ @skip_if_ABSTFN_contains_backslash
+ @os_helper.skip_unless_symlink
+ def test_realpath_unreadable_directory(self):
+ try:
+ os.mkdir(ABSTFN)
+ os.mkdir(ABSTFN + '/k')
+ os.chmod(ABSTFN, 0o000)
+ self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN)
+ self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN)
+ self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN)
+
+ try:
+ os.stat(ABSTFN)
+ except PermissionError:
+ pass
+ else:
+ self.skipTest('Cannot block permissions')
+
+ self.assertEqual(realpath(ABSTFN + '/k', strict=False),
+ ABSTFN + '/k')
+ self.assertRaises(PermissionError, realpath, ABSTFN + '/k',
+ strict=True)
+ self.assertRaises(PermissionError, realpath, ABSTFN + '/k',
+ strict=ALLOW_MISSING)
+
+ self.assertEqual(realpath(ABSTFN + '/missing', strict=False),
+ ABSTFN + '/missing')
+ self.assertRaises(PermissionError, realpath, ABSTFN + '/missing',
+ strict=True)
+ self.assertRaises(PermissionError, realpath, ABSTFN + '/missing',
+ strict=ALLOW_MISSING)
+ finally:
+ os.chmod(ABSTFN, 0o755)
+ os_helper.rmdir(ABSTFN + '/k')
+ os_helper.rmdir(ABSTFN)
+
@skip_if_ABSTFN_contains_backslash
def test_realpath_nonterminal_file(self):
try:
@@ -780,14 +876,27 @@ class PosixPathTest(unittest.TestCase):
f.write('test_posixpath wuz ere')
self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN)
+ self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN)
+
self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN)
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN))
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "/subdir")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir",
+ strict=ALLOW_MISSING)
finally:
os_helper.unlink(ABSTFN)
@@ -800,14 +909,27 @@ class PosixPathTest(unittest.TestCase):
os.symlink(ABSTFN + "1", ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "1")
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "1")
+ self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN + "1")
+
self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "1")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "1")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN))
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "1/subdir")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir",
+ strict=ALLOW_MISSING)
finally:
os_helper.unlink(ABSTFN)
os_helper.unlink(ABSTFN + "1")
@@ -822,14 +944,27 @@ class PosixPathTest(unittest.TestCase):
os.symlink(ABSTFN + "1", ABSTFN)
self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "2")
self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "2")
+ self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "2")
+
self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "2")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "2")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN))
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..",
+ strict=ALLOW_MISSING)
+
self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "2/subdir")
self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True)
+ self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir",
+ strict=ALLOW_MISSING)
finally:
os_helper.unlink(ABSTFN)
os_helper.unlink(ABSTFN + "1")
@@ -1017,9 +1152,12 @@ class PathLikeTests(unittest.TestCase):
def test_path_abspath(self):
self.assertPathEqual(self.path.abspath)
- def test_path_realpath(self):
+ @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING})
+ def test_path_realpath(self, kwargs):
self.assertPathEqual(self.path.realpath)
+ self.assertPathEqual(partial(self.path.realpath, **kwargs))
+
def test_path_relpath(self):
self.assertPathEqual(self.path.relpath)
diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py
index aa3a592766d..98bae7dd703 100644
--- a/Lib/test/test_pyrepl/test_pyrepl.py
+++ b/Lib/test/test_pyrepl/test_pyrepl.py
@@ -1672,6 +1672,17 @@ class TestMain(ReplTestCase):
self.assertEqual(exit_code, 0)
self.assertNotIn("TypeError", output)
+ @force_not_colorized
+ def test_non_string_suggestion_candidates(self):
+ commands = ("import runpy\n"
+ "runpy._run_module_code('blech', {0: '', 'bluch': ''}, '')\n"
+ "exit()\n")
+
+ output, exit_code = self.run_repl(commands)
+ self.assertEqual(exit_code, 0)
+ self.assertNotIn("all elements in 'candidates' must be strings", output)
+ self.assertIn("bluch", output)
+
def test_readline_history_file(self):
# skip, if readline module is not available
readline = import_module('readline')
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index 7f4fe357034..c855fb8fe2b 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -6,7 +6,7 @@ import threading
import time
import unittest
import weakref
-from test.support import gc_collect
+from test.support import gc_collect, bigmemtest
from test.support import import_helper
from test.support import threading_helper
@@ -963,33 +963,33 @@ class BaseSimpleQueueTest:
# One producer, one consumer => results appended in well-defined order
self.assertEqual(results, inputs)
- def test_many_threads(self):
+ @bigmemtest(size=50, memuse=100*2**20, dry_run=False)
+ def test_many_threads(self, size):
# Test multiple concurrent put() and get()
- N = 50
q = self.q
inputs = list(range(10000))
- results = self.run_threads(N, q, inputs, self.feed, self.consume)
+ results = self.run_threads(size, q, inputs, self.feed, self.consume)
# Multiple consumers without synchronization append the
# results in random order
self.assertEqual(sorted(results), inputs)
- def test_many_threads_nonblock(self):
+ @bigmemtest(size=50, memuse=100*2**20, dry_run=False)
+ def test_many_threads_nonblock(self, size):
# Test multiple concurrent put() and get(block=False)
- N = 50
q = self.q
inputs = list(range(10000))
- results = self.run_threads(N, q, inputs,
+ results = self.run_threads(size, q, inputs,
self.feed, self.consume_nonblock)
self.assertEqual(sorted(results), inputs)
- def test_many_threads_timeout(self):
+ @bigmemtest(size=50, memuse=100*2**20, dry_run=False)
+ def test_many_threads_timeout(self, size):
# Test multiple concurrent put() and get(timeout=...)
- N = 50
q = self.q
inputs = list(range(1000))
- results = self.run_threads(N, q, inputs,
+ results = self.run_threads(size, q, inputs,
self.feed, self.consume_timeout)
self.assertEqual(sorted(results), inputs)
diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py
index bd76d636e4f..54910cd8054 100644
--- a/Lib/test/test_random.py
+++ b/Lib/test/test_random.py
@@ -392,6 +392,8 @@ class TestBasicOps:
self.assertRaises(TypeError, self.gen.getrandbits)
self.assertRaises(TypeError, self.gen.getrandbits, 1, 2)
self.assertRaises(ValueError, self.gen.getrandbits, -1)
+ self.assertRaises(OverflowError, self.gen.getrandbits, 1<<1000)
+ self.assertRaises(ValueError, self.gen.getrandbits, -1<<1000)
self.assertRaises(TypeError, self.gen.getrandbits, 10.1)
def test_pickling(self):
@@ -435,6 +437,8 @@ class TestBasicOps:
self.assertRaises(TypeError, self.gen.randbytes)
self.assertRaises(TypeError, self.gen.randbytes, 1, 2)
self.assertRaises(ValueError, self.gen.randbytes, -1)
+ self.assertRaises(OverflowError, self.gen.randbytes, 1<<1000)
+ self.assertRaises((ValueError, OverflowError), self.gen.randbytes, -1<<1000)
self.assertRaises(TypeError, self.gen.randbytes, 1.0)
def test_mu_sigma_default_args(self):
@@ -806,6 +810,22 @@ class MersenneTwister_TestBasicOps(TestBasicOps, unittest.TestCase):
self.assertEqual(self.gen.getrandbits(100),
97904845777343510404718956115)
+ def test_getrandbits_2G_bits(self):
+ size = 2**31
+ self.gen.seed(1234567)
+ x = self.gen.getrandbits(size)
+ self.assertEqual(x.bit_length(), size)
+ self.assertEqual(x & (2**100-1), 890186470919986886340158459475)
+ self.assertEqual(x >> (size-100), 1226514312032729439655761284440)
+
+ @support.bigmemtest(size=2**32, memuse=1/8+2/15, dry_run=False)
+ def test_getrandbits_4G_bits(self, size):
+ self.gen.seed(1234568)
+ x = self.gen.getrandbits(size)
+ self.assertEqual(x.bit_length(), size)
+ self.assertEqual(x & (2**100-1), 287241425661104632871036099814)
+ self.assertEqual(x >> (size-100), 739728759900339699429794460738)
+
def test_randrange_uses_getrandbits(self):
# Verify use of getrandbits by randrange
# Use same seed as in the cross-platform repeatability test
@@ -962,6 +982,14 @@ class MersenneTwister_TestBasicOps(TestBasicOps, unittest.TestCase):
self.assertEqual(self.gen.randbytes(n),
gen2.getrandbits(n * 8).to_bytes(n, 'little'))
+ @support.bigmemtest(size=2**29, memuse=1+16/15, dry_run=False)
+ def test_randbytes_256M(self, size):
+ self.gen.seed(2849427419)
+ x = self.gen.randbytes(size)
+ self.assertEqual(len(x), size)
+ self.assertEqual(x[:12].hex(), 'f6fd9ae63855ab91ea238b4f')
+ self.assertEqual(x[-12:].hex(), '0e7af69a84ee99bf4a11becc')
+
def test_sample_counts_equivalence(self):
# Test the documented strong equivalence to a sample with repeated elements.
# We run this test on random.Random() which makes deterministic selections
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index 8f4fc09442e..a43d2678ebd 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -768,13 +768,16 @@ class BaseTestCase(unittest.TestCase):
self.fail(msg)
return proc
- def run_python(self, args, **kw):
+ def run_python(self, args, isolated=True, **kw):
extraargs = []
if 'uops' in sys._xoptions:
# Pass -X uops along
extraargs.extend(['-X', 'uops'])
- args = [sys.executable, *extraargs, '-X', 'faulthandler', '-I', *args]
- proc = self.run_command(args, **kw)
+ cmd = [sys.executable, *extraargs, '-X', 'faulthandler']
+ if isolated:
+ cmd.append('-I')
+ cmd.extend(args)
+ proc = self.run_command(cmd, **kw)
return proc.stdout
@@ -831,8 +834,8 @@ class ProgramsTestCase(BaseTestCase):
self.check_executed_tests(output, self.tests,
randomize=True, stats=len(self.tests))
- def run_tests(self, args, env=None):
- output = self.run_python(args, env=env)
+ def run_tests(self, args, env=None, isolated=True):
+ output = self.run_python(args, env=env, isolated=isolated)
self.check_output(output)
def test_script_regrtest(self):
@@ -2067,7 +2070,7 @@ class ArgsTestCase(BaseTestCase):
self.check_executed_tests(output, [testname],
failed=[testname],
parallel=True,
- stats=TestStats(1, 1, 0))
+ stats=TestStats(1, 2, 1))
def _check_random_seed(self, run_workers: bool):
# gh-109276: When -r/--randomize is used, random.seed() is called
@@ -2276,7 +2279,6 @@ class ArgsTestCase(BaseTestCase):
def test_xml(self):
code = textwrap.dedent(r"""
import unittest
- from test import support
class VerboseTests(unittest.TestCase):
def test_failed(self):
@@ -2311,6 +2313,39 @@ class ArgsTestCase(BaseTestCase):
for out in testcase.iter('system-out'):
self.assertEqual(out.text, r"abc \x1b def")
+ def test_nonascii(self):
+ code = textwrap.dedent(r"""
+ import unittest
+
+ class NonASCIITests(unittest.TestCase):
+ def test_docstring(self):
+ '''docstring:\u20ac'''
+
+ def test_subtest(self):
+ with self.subTest(param='subtest:\u20ac'):
+ pass
+
+ def test_skip(self):
+ self.skipTest('skipped:\u20ac')
+ """)
+ testname = self.create_test(code=code)
+
+ env = dict(os.environ)
+ env['PYTHONIOENCODING'] = 'ascii'
+
+ def check(output):
+ self.check_executed_tests(output, testname, stats=TestStats(3, 0, 1))
+ self.assertIn(r'docstring:\u20ac', output)
+ self.assertIn(r'skipped:\u20ac', output)
+
+ # Run sequentially
+ output = self.run_tests('-v', testname, env=env, isolated=False)
+ check(output)
+
+ # Run in parallel
+ output = self.run_tests('-j1', '-v', testname, env=env, isolated=False)
+ check(output)
+
class TestUtils(unittest.TestCase):
def test_format_duration(self):
diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py
index 37e0f74f688..7f0b0f36505 100644
--- a/Lib/test/test_sqlite3/test_cli.py
+++ b/Lib/test/test_sqlite3/test_cli.py
@@ -1,14 +1,19 @@
"""sqlite3 CLI tests."""
import sqlite3
+import sys
+import textwrap
import unittest
from sqlite3.__main__ import main as cli
+from test.support.import_helper import import_module
from test.support.os_helper import TESTFN, unlink
+from test.support.pty_helper import run_pty
from test.support import (
captured_stdout,
captured_stderr,
captured_stdin,
force_not_colorized_test_class,
+ requires_subprocess,
)
@@ -200,5 +205,98 @@ class InteractiveSession(unittest.TestCase):
self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: '
'\x1b[35mnear "sel": syntax error\x1b[0m', err)
+
+@requires_subprocess()
+@force_not_colorized_test_class
+class Completion(unittest.TestCase):
+ PS1 = "sqlite> "
+
+ @classmethod
+ def setUpClass(cls):
+ _sqlite3 = import_module("_sqlite3")
+ if not hasattr(_sqlite3, "SQLITE_KEYWORDS"):
+ raise unittest.SkipTest("unable to determine SQLite keywords")
+
+ readline = import_module("readline")
+ if readline.backend == "editline":
+ raise unittest.SkipTest("libedit readline is not supported")
+
+ def write_input(self, input_, env=None):
+ script = textwrap.dedent("""
+ import readline
+ from sqlite3.__main__ import main
+
+ readline.parse_and_bind("set colored-completion-prefix off")
+ main()
+ """)
+ return run_pty(script, input_, env)
+
+ def test_complete_sql_keywords(self):
+ # List candidates starting with 'S', there should be multiple matches.
+ input_ = b"S\t\tEL\t 1;\n.quit\n"
+ output = self.write_input(input_)
+ self.assertIn(b"SELECT", output)
+ self.assertIn(b"SET", output)
+ self.assertIn(b"SAVEPOINT", output)
+ self.assertIn(b"(1,)", output)
+
+ # Keywords are completed in upper case for even lower case user input.
+ input_ = b"sel\t\t 1;\n.quit\n"
+ output = self.write_input(input_)
+ self.assertIn(b"SELECT", output)
+ self.assertIn(b"(1,)", output)
+
+ @unittest.skipIf(sys.platform.startswith("freebsd"),
+ "Two actual tabs are inserted when there are no matching"
+ " completions in the pseudo-terminal opened by run_pty()"
+ " on FreeBSD")
+ def test_complete_no_match(self):
+ input_ = b"xyzzy\t\t\b\b\b\b\b\b\b.quit\n"
+ # Set NO_COLOR to disable coloring for self.PS1.
+ output = self.write_input(input_, env={"NO_COLOR": "1"})
+ lines = output.decode().splitlines()
+ indices = (
+ i for i, line in enumerate(lines, 1)
+ if line.startswith(f"{self.PS1}xyzzy")
+ )
+ line_num = next(indices, -1)
+ self.assertNotEqual(line_num, -1)
+ # Completions occupy lines, assert no extra lines when there is nothing
+ # to complete.
+ self.assertEqual(line_num, len(lines))
+
+ def test_complete_no_input(self):
+ from _sqlite3 import SQLITE_KEYWORDS
+
+ script = textwrap.dedent("""
+ import readline
+ from sqlite3.__main__ import main
+
+ # Configure readline to ...:
+ # - hide control sequences surrounding each candidate
+ # - hide "Display all xxx possibilities? (y or n)"
+ # - hide "--More--"
+ # - show candidates one per line
+ readline.parse_and_bind("set colored-completion-prefix off")
+ readline.parse_and_bind("set colored-stats off")
+ readline.parse_and_bind("set completion-query-items 0")
+ readline.parse_and_bind("set page-completions off")
+ readline.parse_and_bind("set completion-display-width 0")
+
+ main()
+ """)
+ input_ = b"\t\t.quit\n"
+ output = run_pty(script, input_, env={"NO_COLOR": "1"})
+ lines = output.decode().splitlines()
+ indices = [
+ i for i, line in enumerate(lines)
+ if line.startswith(self.PS1)
+ ]
+ self.assertEqual(len(indices), 2)
+ start, end = indices
+ candidates = [l.strip() for l in lines[start+1:end]]
+ self.assertEqual(candidates, sorted(SQLITE_KEYWORDS))
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 2767a53d53c..f123f6ece40 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -31,6 +31,7 @@ import weakref
import platform
import sysconfig
import functools
+from contextlib import nullcontext
try:
import ctypes
except ImportError:
@@ -2843,6 +2844,7 @@ class ThreadedTests(unittest.TestCase):
# See GH-124984: OpenSSL is not thread safe.
threads = []
+ warnings_filters = sys.flags.context_aware_warnings
global USE_SAME_TEST_CONTEXT
USE_SAME_TEST_CONTEXT = True
try:
@@ -2851,7 +2853,10 @@ class ThreadedTests(unittest.TestCase):
self.test_alpn_protocols,
self.test_getpeercert,
self.test_crl_check,
- self.test_check_hostname_idn,
+ functools.partial(
+ self.test_check_hostname_idn,
+ warnings_filters=warnings_filters,
+ ),
self.test_wrong_cert_tls12,
self.test_wrong_cert_tls13,
):
@@ -3097,7 +3102,7 @@ class ThreadedTests(unittest.TestCase):
cipher = s.cipher()[0].split('-')
self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
- def test_check_hostname_idn(self):
+ def test_check_hostname_idn(self, warnings_filters=True):
if support.verbose:
sys.stdout.write("\n")
@@ -3152,16 +3157,30 @@ class ThreadedTests(unittest.TestCase):
server_hostname="python.example.org") as s:
with self.assertRaises(ssl.CertificateError):
s.connect((HOST, server.port))
- with ThreadedEchoServer(context=server_context, chatty=True) as server:
- with warnings_helper.check_no_resource_warning(self):
- with self.assertRaises(UnicodeError):
- context.wrap_socket(socket.socket(),
- server_hostname='.pythontest.net')
- with ThreadedEchoServer(context=server_context, chatty=True) as server:
- with warnings_helper.check_no_resource_warning(self):
- with self.assertRaises(UnicodeDecodeError):
- context.wrap_socket(socket.socket(),
- server_hostname=b'k\xf6nig.idn.pythontest.net')
+ with (
+ ThreadedEchoServer(context=server_context, chatty=True) as server,
+ (
+ warnings_helper.check_no_resource_warning(self)
+ if warnings_filters
+ else nullcontext()
+ ),
+ self.assertRaises(UnicodeError),
+ ):
+ context.wrap_socket(socket.socket(), server_hostname='.pythontest.net')
+
+ with (
+ ThreadedEchoServer(context=server_context, chatty=True) as server,
+ (
+ warnings_helper.check_no_resource_warning(self)
+ if warnings_filters
+ else nullcontext()
+ ),
+ self.assertRaises(UnicodeDecodeError),
+ ):
+ context.wrap_socket(
+ socket.socket(),
+ server_hostname=b'k\xf6nig.idn.pythontest.net',
+ )
def test_wrong_cert_tls12(self):
"""Connecting when the server rejects the client's certificate
diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py
index c7ac7914158..13aaba405e3 100644
--- a/Lib/test/test_syntax.py
+++ b/Lib/test/test_syntax.py
@@ -1436,17 +1436,17 @@ Regression tests for gh-133999:
>>> try: pass
... except TypeError as name: raise from None
Traceback (most recent call last):
- SyntaxError: invalid syntax
+ SyntaxError: did you forget an expression between 'raise' and 'from'?
>>> try: pass
... except* TypeError as name: raise from None
Traceback (most recent call last):
- SyntaxError: invalid syntax
+ SyntaxError: did you forget an expression between 'raise' and 'from'?
>>> match 1:
... case 1 | 2 as abc: raise from None
Traceback (most recent call last):
- SyntaxError: invalid syntax
+ SyntaxError: did you forget an expression between 'raise' and 'from'?
Ensure that early = are not matched by the parser as invalid comparisons
>>> f(2, 4, x=34); 1 $ 2
@@ -1695,6 +1695,28 @@ Make sure that the old "raise X, Y[, Z]" form is gone:
...
SyntaxError: invalid syntax
+Better errors for `raise` statement:
+
+ >>> raise ValueError from
+ Traceback (most recent call last):
+ SyntaxError: did you forget an expression after 'from'?
+
+ >>> raise mod.ValueError() from
+ Traceback (most recent call last):
+ SyntaxError: did you forget an expression after 'from'?
+
+ >>> raise from exc
+ Traceback (most recent call last):
+ SyntaxError: did you forget an expression between 'raise' and 'from'?
+
+ >>> raise from None
+ Traceback (most recent call last):
+ SyntaxError: did you forget an expression between 'raise' and 'from'?
+
+ >>> raise from
+ Traceback (most recent call last):
+ SyntaxError: did you forget an expression between 'raise' and 'from'?
+
Check that an multiple exception types with missing parentheses
raise a custom exception only when using 'as'
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index 795d1ecbb59..bf415894903 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -729,7 +729,7 @@ class SysModuleTest(unittest.TestCase):
info = sys.thread_info
self.assertEqual(len(info), 3)
self.assertIn(info.name, ('nt', 'pthread', 'pthread-stubs', 'solaris', None))
- self.assertIn(info.lock, ('semaphore', 'mutex+cond', None))
+ self.assertIn(info.lock, ('pymutex', None))
if sys.platform.startswith(("linux", "android", "freebsd")):
self.assertEqual(info.name, "pthread")
elif sys.platform == "win32":
@@ -1135,23 +1135,12 @@ class SysModuleTest(unittest.TestCase):
b = sys.getallocatedblocks()
self.assertLessEqual(b, a)
try:
- # While we could imagine a Python session where the number of
- # multiple buffer objects would exceed the sharing of references,
- # it is unlikely to happen in a normal test run.
- #
- # In free-threaded builds each code object owns an array of
- # pointers to copies of the bytecode. When the number of
- # code objects is a large fraction of the total number of
- # references, this can cause the total number of allocated
- # blocks to exceed the total number of references.
- #
- # For some reason, iOS seems to trigger the "unlikely to happen"
- # case reliably under CI conditions. It's not clear why; but as
- # this test is checking the behavior of getallocatedblock()
- # under garbage collection, we can skip this pre-condition check
- # for now. See GH-130384.
- if not support.Py_GIL_DISABLED and not support.is_apple_mobile:
- self.assertLess(a, sys.gettotalrefcount())
+ # The reported blocks will include immortalized strings, but the
+ # total ref count will not. This will sanity check that among all
+ # other objects (those eligible for garbage collection) there
+ # are more references being tracked than allocated blocks.
+ interned_immortal = sys.getunicodeinternedsize(_only_immortal=True)
+ self.assertLess(a - interned_immortal, sys.gettotalrefcount())
except AttributeError:
# gettotalrefcount() not available
pass
@@ -1299,6 +1288,7 @@ class SysModuleTest(unittest.TestCase):
for name in sys.stdlib_module_names:
self.assertIsInstance(name, str)
+ @unittest.skipUnless(hasattr(sys, '_stdlib_dir'), 'need sys._stdlib_dir')
def test_stdlib_dir(self):
os = import_helper.import_fresh_module('os')
marker = getattr(os, '__file__', None)
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index cf218a2bf14..7055e1ed147 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -2715,6 +2715,31 @@ class MiscTest(unittest.TestCase):
str(excinfo.exception),
)
+ @unittest.skipUnless(os_helper.can_symlink(), 'requires symlink support')
+ @unittest.skipUnless(hasattr(os, 'chmod'), "missing os.chmod")
+ @unittest.mock.patch('os.chmod')
+ def test_deferred_directory_attributes_update(self, mock_chmod):
+ # Regression test for gh-127987: setting attributes on arbitrary files
+ tempdir = os.path.join(TEMPDIR, 'test127987')
+ def mock_chmod_side_effect(path, mode, **kwargs):
+ target_path = os.path.realpath(path)
+ if os.path.commonpath([target_path, tempdir]) != tempdir:
+ raise Exception("should not try to chmod anything outside the destination", target_path)
+ mock_chmod.side_effect = mock_chmod_side_effect
+
+ outside_tree_dir = os.path.join(TEMPDIR, 'outside_tree_dir')
+ with ArchiveMaker() as arc:
+ arc.add('x', symlink_to='.')
+ arc.add('x', type=tarfile.DIRTYPE, mode='?rwsrwsrwt')
+ arc.add('x', symlink_to=outside_tree_dir)
+
+ os.makedirs(outside_tree_dir)
+ try:
+ arc.open().extractall(path=tempdir, filter='tar')
+ finally:
+ os_helper.rmtree(outside_tree_dir)
+ os_helper.rmtree(tempdir)
+
class CommandLineTest(unittest.TestCase):
@@ -3275,6 +3300,10 @@ class NoneInfoExtractTests(ReadTest):
got_paths = set(
p.relative_to(directory)
for p in pathlib.Path(directory).glob('**/*'))
+ if self.extraction_filter in (None, 'data'):
+ # The 'data' filter is expected to reject special files
+ for path in 'ustar/fifotype', 'ustar/blktype', 'ustar/chrtype':
+ got_paths.discard(pathlib.Path(path))
self.assertEqual(self.control_paths, got_paths)
@contextmanager
@@ -3504,12 +3533,28 @@ class ArchiveMaker:
self.bio = None
def add(self, name, *, type=None, symlink_to=None, hardlink_to=None,
- mode=None, size=None, **kwargs):
- """Add a member to the test archive. Call within `with`."""
+ mode=None, size=None, content=None, **kwargs):
+ """Add a member to the test archive. Call within `with`.
+
+ Provides many shortcuts:
+ - default `type` is based on symlink_to, hardlink_to, and trailing `/`
+ in name (which is stripped)
+ - size & content defaults are based on each other
+ - content can be str or bytes
+ - mode should be textual ('-rwxrwxrwx')
+
+ (add more! this is unstable internal test-only API)
+ """
name = str(name)
tarinfo = tarfile.TarInfo(name).replace(**kwargs)
+ if content is not None:
+ if isinstance(content, str):
+ content = content.encode()
+ size = len(content)
if size is not None:
tarinfo.size = size
+ if content is None:
+ content = bytes(tarinfo.size)
if mode:
tarinfo.mode = _filemode_to_int(mode)
if symlink_to is not None:
@@ -3523,7 +3568,7 @@ class ArchiveMaker:
if type is not None:
tarinfo.type = type
if tarinfo.isreg():
- fileobj = io.BytesIO(bytes(tarinfo.size))
+ fileobj = io.BytesIO(content)
else:
fileobj = None
self.tar_w.addfile(tarinfo, fileobj)
@@ -3557,7 +3602,7 @@ class TestExtractionFilters(unittest.TestCase):
destdir = outerdir / 'dest'
@contextmanager
- def check_context(self, tar, filter):
+ def check_context(self, tar, filter, *, check_flag=True):
"""Extracts `tar` to `self.destdir` and allows checking the result
If an error occurs, it must be checked using `expect_exception`
@@ -3566,27 +3611,40 @@ class TestExtractionFilters(unittest.TestCase):
except the destination directory itself and parent directories of
other files.
When checking directories, do so before their contents.
+
+ A file called 'flag' is made in outerdir (i.e. outside destdir)
+ before extraction; it should not be altered nor should its contents
+ be read/copied.
"""
with os_helper.temp_dir(self.outerdir):
+ flag_path = self.outerdir / 'flag'
+ flag_path.write_text('capture me')
try:
tar.extractall(self.destdir, filter=filter)
except Exception as exc:
self.raised_exception = exc
+ self.reraise_exception = True
self.expected_paths = set()
else:
self.raised_exception = None
+ self.reraise_exception = False
self.expected_paths = set(self.outerdir.glob('**/*'))
self.expected_paths.discard(self.destdir)
+ self.expected_paths.discard(flag_path)
try:
- yield
+ yield self
finally:
tar.close()
- if self.raised_exception:
+ if self.reraise_exception:
raise self.raised_exception
self.assertEqual(self.expected_paths, set())
+ if check_flag:
+ self.assertEqual(flag_path.read_text(), 'capture me')
+ else:
+ assert filter == 'fully_trusted'
def expect_file(self, name, type=None, symlink_to=None, mode=None,
- size=None):
+ size=None, content=None):
"""Check a single file. See check_context."""
if self.raised_exception:
raise self.raised_exception
@@ -3605,26 +3663,45 @@ class TestExtractionFilters(unittest.TestCase):
# The symlink might be the same (textually) as what we expect,
# but some systems change the link to an equivalent path, so
# we fall back to samefile().
- if expected != got:
- self.assertTrue(got.samefile(expected))
+ try:
+ if expected != got:
+ self.assertTrue(got.samefile(expected))
+ except Exception as e:
+ # attach a note, so it's shown even if `samefile` fails
+ e.add_note(f'{expected=}, {got=}')
+ raise
elif type == tarfile.REGTYPE or type is None:
self.assertTrue(path.is_file())
elif type == tarfile.DIRTYPE:
self.assertTrue(path.is_dir())
elif type == tarfile.FIFOTYPE:
self.assertTrue(path.is_fifo())
+ elif type == tarfile.SYMTYPE:
+ self.assertTrue(path.is_symlink())
else:
raise NotImplementedError(type)
if size is not None:
self.assertEqual(path.stat().st_size, size)
+ if content is not None:
+ self.assertEqual(path.read_text(), content)
for parent in path.parents:
self.expected_paths.discard(parent)
+ def expect_any_tree(self, name):
+ """Check a directory; forget about its contents."""
+ tree_path = (self.destdir / name).resolve()
+ self.expect_file(tree_path, type=tarfile.DIRTYPE)
+ self.expected_paths = {
+ p for p in self.expected_paths
+ if tree_path not in p.parents
+ }
+
def expect_exception(self, exc_type, message_re='.'):
with self.assertRaisesRegex(exc_type, message_re):
if self.raised_exception is not None:
raise self.raised_exception
- self.raised_exception = None
+ self.reraise_exception = False
+ return self.raised_exception
def test_benign_file(self):
with ArchiveMaker() as arc:
@@ -3710,6 +3787,80 @@ class TestExtractionFilters(unittest.TestCase):
self.expect_file('parent/evil')
@symlink_test
+ @os_helper.skip_unless_symlink
+ def test_realpath_limit_attack(self):
+ # (CVE-2025-4517)
+
+ with ArchiveMaker() as arc:
+ # populate the symlinks and dirs that expand in os.path.realpath()
+ # The component length is chosen so that in common cases, the unexpanded
+ # path fits in PATH_MAX, but it overflows when the final symlink
+ # is expanded
+ steps = "abcdefghijklmnop"
+ if sys.platform == 'win32':
+ component = 'd' * 25
+ elif 'PC_PATH_MAX' in os.pathconf_names:
+ max_path_len = os.pathconf(self.outerdir.parent, "PC_PATH_MAX")
+ path_sep_len = 1
+ dest_len = len(str(self.destdir)) + path_sep_len
+ component_len = (max_path_len - dest_len) // (len(steps) + path_sep_len)
+ component = 'd' * component_len
+ else:
+ raise NotImplementedError("Need to guess component length for {sys.platform}")
+ path = ""
+ step_path = ""
+ for i in steps:
+ arc.add(os.path.join(path, component), type=tarfile.DIRTYPE,
+ mode='drwxrwxrwx')
+ arc.add(os.path.join(path, i), symlink_to=component)
+ path = os.path.join(path, component)
+ step_path = os.path.join(step_path, i)
+ # create the final symlink that exceeds PATH_MAX and simply points
+ # to the top dir.
+ # this link will never be expanded by
+ # os.path.realpath(strict=False), nor anything after it.
+ linkpath = os.path.join(*steps, "l"*254)
+ parent_segments = [".."] * len(steps)
+ arc.add(linkpath, symlink_to=os.path.join(*parent_segments))
+ # make a symlink outside to keep the tar command happy
+ arc.add("escape", symlink_to=os.path.join(linkpath, ".."))
+ # use the symlinks above, that are not checked, to create a hardlink
+ # to a file outside of the destination path
+ arc.add("flaglink", hardlink_to=os.path.join("escape", "flag"))
+ # now that we have the hardlink we can overwrite the file
+ arc.add("flaglink", content='overwrite')
+ # we can also create new files as well!
+ arc.add("escape/newfile", content='new')
+
+ with (self.subTest('fully_trusted'),
+ self.check_context(arc.open(), filter='fully_trusted',
+ check_flag=False)):
+ if sys.platform == 'win32':
+ self.expect_exception((FileNotFoundError, FileExistsError))
+ elif self.raised_exception:
+ # Cannot symlink/hardlink: tarfile falls back to getmember()
+ self.expect_exception(KeyError)
+ # Otherwise, this block should never enter.
+ else:
+ self.expect_any_tree(component)
+ self.expect_file('flaglink', content='overwrite')
+ self.expect_file('../newfile', content='new')
+ self.expect_file('escape', type=tarfile.SYMTYPE)
+ self.expect_file('a', symlink_to=component)
+
+ for filter in 'tar', 'data':
+ with self.subTest(filter), self.check_context(arc.open(), filter=filter):
+ exc = self.expect_exception((OSError, KeyError))
+ if isinstance(exc, OSError):
+ if sys.platform == 'win32':
+ # 3: ERROR_PATH_NOT_FOUND
+ # 5: ERROR_ACCESS_DENIED
+ # 206: ERROR_FILENAME_EXCED_RANGE
+ self.assertIn(exc.winerror, (3, 5, 206))
+ else:
+ self.assertEqual(exc.errno, errno.ENAMETOOLONG)
+
+ @symlink_test
def test_parent_symlink2(self):
# Test interplaying symlinks
# Inspired by 'dirsymlink2b' in jwilk/traversal-archives
@@ -3931,8 +4082,8 @@ class TestExtractionFilters(unittest.TestCase):
arc.add('symlink2', symlink_to=os.path.join(
'linkdir', 'hardlink2'))
arc.add('targetdir/target', size=3)
- arc.add('linkdir/hardlink', hardlink_to='targetdir/target')
- arc.add('linkdir/hardlink2', hardlink_to='linkdir/symlink')
+ arc.add('linkdir/hardlink', hardlink_to=os.path.join('targetdir', 'target'))
+ arc.add('linkdir/hardlink2', hardlink_to=os.path.join('linkdir', 'symlink'))
for filter in 'tar', 'data', 'fully_trusted':
with self.check_context(arc.open(), filter):
@@ -3948,6 +4099,129 @@ class TestExtractionFilters(unittest.TestCase):
self.expect_file('linkdir/symlink', size=3)
self.expect_file('symlink2', size=3)
+ @symlink_test
+ def test_sneaky_hardlink_fallback(self):
+ # (CVE-2025-4330)
+ # Test that when hardlink extraction falls back to extracting members
+ # from the archive, the extracted member is (re-)filtered.
+ with ArchiveMaker() as arc:
+ # Create a directory structure so the c/escape symlink stays
+ # inside the path
+ arc.add("a/t/dummy")
+ # Create b/ directory
+ arc.add("b/")
+ # Point "c" to the bottom of the tree in "a"
+ arc.add("c", symlink_to=os.path.join("a", "t"))
+ # link to non-existant location under "a"
+ arc.add("c/escape", symlink_to=os.path.join("..", "..",
+ "link_here"))
+ # Move "c" to point to "b" ("c/escape" no longer exists)
+ arc.add("c", symlink_to="b")
+ # Attempt to create a hard link to "c/escape". Since it doesn't
+ # exist it will attempt to extract "cescape" but at "boom".
+ arc.add("boom", hardlink_to=os.path.join("c", "escape"))
+
+ with self.check_context(arc.open(), 'data'):
+ if not os_helper.can_symlink():
+ # When 'c/escape' is extracted, 'c' is a regular
+ # directory, and 'c/escape' *would* point outside
+ # the destination if symlinks were allowed.
+ self.expect_exception(
+ tarfile.LinkOutsideDestinationError)
+ elif sys.platform == "win32":
+ # On Windows, 'c/escape' points outside the destination
+ self.expect_exception(tarfile.LinkOutsideDestinationError)
+ else:
+ e = self.expect_exception(
+ tarfile.LinkFallbackError,
+ "link 'boom' would be extracted as a copy of "
+ + "'c/escape', which was rejected")
+ self.assertIsInstance(e.__cause__,
+ tarfile.LinkOutsideDestinationError)
+ for filter in 'tar', 'fully_trusted':
+ with self.subTest(filter), self.check_context(arc.open(), filter):
+ if not os_helper.can_symlink():
+ self.expect_file("a/t/dummy")
+ self.expect_file("b/")
+ self.expect_file("c/")
+ else:
+ self.expect_file("a/t/dummy")
+ self.expect_file("b/")
+ self.expect_file("a/t/escape", symlink_to='../../link_here')
+ self.expect_file("boom", symlink_to='../../link_here')
+ self.expect_file("c", symlink_to='b')
+
+ @symlink_test
+ def test_exfiltration_via_symlink(self):
+ # (CVE-2025-4138)
+ # Test changing symlinks that result in a symlink pointing outside
+ # the extraction directory, unless prevented by 'data' filter's
+ # normalization.
+ with ArchiveMaker() as arc:
+ arc.add("escape", symlink_to=os.path.join('link', 'link', '..', '..', 'link-here'))
+ arc.add("link", symlink_to='./')
+
+ for filter in 'tar', 'data', 'fully_trusted':
+ with self.check_context(arc.open(), filter):
+ if os_helper.can_symlink():
+ self.expect_file("link", symlink_to='./')
+ if filter == 'data':
+ self.expect_file("escape", symlink_to='link-here')
+ else:
+ self.expect_file("escape",
+ symlink_to='link/link/../../link-here')
+ else:
+ # Nothing is extracted.
+ pass
+
+ @symlink_test
+ def test_chmod_outside_dir(self):
+ # (CVE-2024-12718)
+ # Test that members used for delayed updates of directory metadata
+ # are (re-)filtered.
+ with ArchiveMaker() as arc:
+ # "pwn" is a veeeery innocent symlink:
+ arc.add("a/pwn", symlink_to='.')
+ # But now "pwn" is also a directory, so it's scheduled to have its
+ # metadata updated later:
+ arc.add("a/pwn/", mode='drwxrwxrwx')
+ # Oops, "pwn" is not so innocent any more:
+ arc.add("a/pwn", symlink_to='x/../')
+ # Newly created symlink points to the dest dir,
+ # so it's OK for the "data" filter.
+ arc.add('a/x', symlink_to=('../'))
+ # But now "pwn" points outside the dest dir
+
+ for filter in 'tar', 'data', 'fully_trusted':
+ with self.check_context(arc.open(), filter) as cc:
+ if not os_helper.can_symlink():
+ self.expect_file("a/pwn/")
+ elif filter == 'data':
+ self.expect_file("a/x", symlink_to='../')
+ self.expect_file("a/pwn", symlink_to='.')
+ else:
+ self.expect_file("a/x", symlink_to='../')
+ self.expect_file("a/pwn", symlink_to='x/../')
+ if sys.platform != "win32":
+ st_mode = cc.outerdir.stat().st_mode
+ self.assertNotEqual(st_mode & 0o777, 0o777)
+
+ def test_link_fallback_normalizes(self):
+ # Make sure hardlink fallbacks work for non-normalized paths for all
+ # filters
+ with ArchiveMaker() as arc:
+ arc.add("dir/")
+ arc.add("dir/../afile")
+ arc.add("link1", hardlink_to='dir/../afile')
+ arc.add("link2", hardlink_to='dir/../dir/../afile')
+
+ for filter in 'tar', 'data', 'fully_trusted':
+ with self.check_context(arc.open(), filter) as cc:
+ self.expect_file("dir/")
+ self.expect_file("afile")
+ self.expect_file("link1")
+ self.expect_file("link2")
+
def test_modes(self):
# Test how file modes are extracted
# (Note that the modes are ignored on platforms without working chmod)
@@ -4072,7 +4346,7 @@ class TestExtractionFilters(unittest.TestCase):
# The 'tar' filter returns TarInfo objects with the same name/type.
# (It can also fail for particularly "evil" input, but we don't have
# that in the test archive.)
- with tarfile.TarFile.open(tarname) as tar:
+ with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar:
for tarinfo in tar.getmembers():
try:
filtered = tarfile.tar_filter(tarinfo, '')
@@ -4084,7 +4358,7 @@ class TestExtractionFilters(unittest.TestCase):
def test_data_filter(self):
# The 'data' filter either raises, or returns TarInfo with the same
# name/type.
- with tarfile.TarFile.open(tarname) as tar:
+ with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar:
for tarinfo in tar.getmembers():
try:
filtered = tarfile.data_filter(tarinfo, '')
@@ -4242,13 +4516,13 @@ class TestExtractionFilters(unittest.TestCase):
# If errorlevel is 0, errors affected by errorlevel are ignored
with self.check_context(arc.open(errorlevel=0), extracterror_filter):
- self.expect_file('file')
+ pass
with self.check_context(arc.open(errorlevel=0), filtererror_filter):
- self.expect_file('file')
+ pass
with self.check_context(arc.open(errorlevel=0), oserror_filter):
- self.expect_file('file')
+ pass
with self.check_context(arc.open(errorlevel=0), tarerror_filter):
self.expect_exception(tarfile.TarError)
@@ -4259,7 +4533,7 @@ class TestExtractionFilters(unittest.TestCase):
# If 1, all fatal errors are raised
with self.check_context(arc.open(errorlevel=1), extracterror_filter):
- self.expect_file('file')
+ pass
with self.check_context(arc.open(errorlevel=1), filtererror_filter):
self.expect_exception(tarfile.FilterError)
diff --git a/Lib/test/test_traceback.py b/Lib/test/test_traceback.py
index b9be87f357f..74b979d0096 100644
--- a/Lib/test/test_traceback.py
+++ b/Lib/test/test_traceback.py
@@ -4188,6 +4188,15 @@ class SuggestionFormattingTestBase:
self.assertNotIn("blech", actual)
self.assertNotIn("oh no!", actual)
+ def test_attribute_error_with_non_string_candidates(self):
+ class T:
+ bluch = 1
+
+ instance = T()
+ instance.__dict__[0] = 1
+ actual = self.get_suggestion(instance, 'blich')
+ self.assertIn("bluch", actual)
+
def test_attribute_error_with_bad_name(self):
def raise_attribute_error_with_bad_name():
raise AttributeError(name=12, obj=23)
@@ -4223,8 +4232,8 @@ class SuggestionFormattingTestBase:
return mod_name
- def get_import_from_suggestion(self, mod_dict, name):
- modname = self.make_module(mod_dict)
+ def get_import_from_suggestion(self, code, name):
+ modname = self.make_module(code)
def callable():
try:
@@ -4301,6 +4310,13 @@ class SuggestionFormattingTestBase:
self.assertIn("'_bluch'", self.get_import_from_suggestion(code, '_luch'))
self.assertNotIn("'_bluch'", self.get_import_from_suggestion(code, 'bluch'))
+ def test_import_from_suggestions_non_string(self):
+ modWithNonStringAttr = textwrap.dedent("""\
+ globals()[0] = 1
+ bluch = 1
+ """)
+ self.assertIn("'bluch'", self.get_import_from_suggestion(modWithNonStringAttr, 'blech'))
+
def test_import_from_suggestions_do_not_trigger_for_long_attributes(self):
code = "blech = None"
@@ -4397,6 +4413,15 @@ class SuggestionFormattingTestBase:
actual = self.get_suggestion(func)
self.assertIn("'ZeroDivisionError'?", actual)
+ def test_name_error_suggestions_with_non_string_candidates(self):
+ def func():
+ abc = 1
+ custom_globals = globals().copy()
+ custom_globals[0] = 1
+ print(eval("abv", custom_globals, locals()))
+ actual = self.get_suggestion(func)
+ self.assertIn("abc", actual)
+
def test_name_error_suggestions_do_not_trigger_for_long_names(self):
def func():
somethingverywronghehehehehehe = None
diff --git a/Lib/test/test_urlparse.py b/Lib/test/test_urlparse.py
index aabc360289a..b2bde5a9b1d 100644
--- a/Lib/test/test_urlparse.py
+++ b/Lib/test/test_urlparse.py
@@ -2,6 +2,7 @@ import sys
import unicodedata
import unittest
import urllib.parse
+from test import support
RFC1808_BASE = "http://a/b/c/d;p?q#f"
RFC2396_BASE = "http://a/b/c/d;p?q"
@@ -156,27 +157,25 @@ class UrlParseTestCase(unittest.TestCase):
self.assertEqual(result3.hostname, result.hostname)
self.assertEqual(result3.port, result.port)
- def test_qsl(self):
- for orig, expect in parse_qsl_test_cases:
- result = urllib.parse.parse_qsl(orig, keep_blank_values=True)
- self.assertEqual(result, expect, "Error parsing %r" % orig)
- expect_without_blanks = [v for v in expect if len(v[1])]
- result = urllib.parse.parse_qsl(orig, keep_blank_values=False)
- self.assertEqual(result, expect_without_blanks,
- "Error parsing %r" % orig)
-
- def test_qs(self):
- for orig, expect in parse_qs_test_cases:
- result = urllib.parse.parse_qs(orig, keep_blank_values=True)
- self.assertEqual(result, expect, "Error parsing %r" % orig)
- expect_without_blanks = {v: expect[v]
- for v in expect if len(expect[v][0])}
- result = urllib.parse.parse_qs(orig, keep_blank_values=False)
- self.assertEqual(result, expect_without_blanks,
- "Error parsing %r" % orig)
-
- def test_roundtrips(self):
- str_cases = [
+ @support.subTests('orig,expect', parse_qsl_test_cases)
+ def test_qsl(self, orig, expect):
+ result = urllib.parse.parse_qsl(orig, keep_blank_values=True)
+ self.assertEqual(result, expect)
+ expect_without_blanks = [v for v in expect if len(v[1])]
+ result = urllib.parse.parse_qsl(orig, keep_blank_values=False)
+ self.assertEqual(result, expect_without_blanks)
+
+ @support.subTests('orig,expect', parse_qs_test_cases)
+ def test_qs(self, orig, expect):
+ result = urllib.parse.parse_qs(orig, keep_blank_values=True)
+ self.assertEqual(result, expect)
+ expect_without_blanks = {v: expect[v]
+ for v in expect if len(expect[v][0])}
+ result = urllib.parse.parse_qs(orig, keep_blank_values=False)
+ self.assertEqual(result, expect_without_blanks)
+
+ @support.subTests('bytes', (False, True))
+ @support.subTests('url,parsed,split', [
('path/to/file',
('', '', 'path/to/file', '', '', ''),
('', '', 'path/to/file', '', '')),
@@ -263,23 +262,21 @@ class UrlParseTestCase(unittest.TestCase):
('sch_me:path/to/file',
('', '', 'sch_me:path/to/file', '', '', ''),
('', '', 'sch_me:path/to/file', '', '')),
- ]
- def _encode(t):
- return (t[0].encode('ascii'),
- tuple(x.encode('ascii') for x in t[1]),
- tuple(x.encode('ascii') for x in t[2]))
- bytes_cases = [_encode(x) for x in str_cases]
- str_cases += [
('schème:path/to/file',
('', '', 'schème:path/to/file', '', '', ''),
('', '', 'schème:path/to/file', '', '')),
- ]
- for url, parsed, split in str_cases + bytes_cases:
- with self.subTest(url):
- self.checkRoundtrips(url, parsed, split)
-
- def test_roundtrips_normalization(self):
- str_cases = [
+ ])
+ def test_roundtrips(self, bytes, url, parsed, split):
+ if bytes:
+ if not url.isascii():
+ self.skipTest('non-ASCII bytes')
+ url = str_encode(url)
+ parsed = tuple_encode(parsed)
+ split = tuple_encode(split)
+ self.checkRoundtrips(url, parsed, split)
+
+ @support.subTests('bytes', (False, True))
+ @support.subTests('url,url2,parsed,split', [
('///path/to/file',
'/path/to/file',
('', '', '/path/to/file', '', '', ''),
@@ -300,22 +297,18 @@ class UrlParseTestCase(unittest.TestCase):
'https:///tmp/junk.txt',
('https', '', '/tmp/junk.txt', '', '', ''),
('https', '', '/tmp/junk.txt', '', '')),
- ]
- def _encode(t):
- return (t[0].encode('ascii'),
- t[1].encode('ascii'),
- tuple(x.encode('ascii') for x in t[2]),
- tuple(x.encode('ascii') for x in t[3]))
- bytes_cases = [_encode(x) for x in str_cases]
- for url, url2, parsed, split in str_cases + bytes_cases:
- with self.subTest(url):
- self.checkRoundtrips(url, parsed, split, url2)
-
- def test_http_roundtrips(self):
- # urllib.parse.urlsplit treats 'http:' as an optimized special case,
- # so we test both 'http:' and 'https:' in all the following.
- # Three cheers for white box knowledge!
- str_cases = [
+ ])
+ def test_roundtrips_normalization(self, bytes, url, url2, parsed, split):
+ if bytes:
+ url = str_encode(url)
+ url2 = str_encode(url2)
+ parsed = tuple_encode(parsed)
+ split = tuple_encode(split)
+ self.checkRoundtrips(url, parsed, split, url2)
+
+ @support.subTests('bytes', (False, True))
+ @support.subTests('scheme', ('http', 'https'))
+ @support.subTests('url,parsed,split', [
('://www.python.org',
('www.python.org', '', '', '', ''),
('www.python.org', '', '', '')),
@@ -331,23 +324,20 @@ class UrlParseTestCase(unittest.TestCase):
('://a/b/c/d;p?q#f',
('a', '/b/c/d', 'p', 'q', 'f'),
('a', '/b/c/d;p', 'q', 'f')),
- ]
- def _encode(t):
- return (t[0].encode('ascii'),
- tuple(x.encode('ascii') for x in t[1]),
- tuple(x.encode('ascii') for x in t[2]))
- bytes_cases = [_encode(x) for x in str_cases]
- str_schemes = ('http', 'https')
- bytes_schemes = (b'http', b'https')
- str_tests = str_schemes, str_cases
- bytes_tests = bytes_schemes, bytes_cases
- for schemes, test_cases in (str_tests, bytes_tests):
- for scheme in schemes:
- for url, parsed, split in test_cases:
- url = scheme + url
- parsed = (scheme,) + parsed
- split = (scheme,) + split
- self.checkRoundtrips(url, parsed, split)
+ ])
+ def test_http_roundtrips(self, bytes, scheme, url, parsed, split):
+ # urllib.parse.urlsplit treats 'http:' as an optimized special case,
+ # so we test both 'http:' and 'https:' in all the following.
+ # Three cheers for white box knowledge!
+ if bytes:
+ scheme = str_encode(scheme)
+ url = str_encode(url)
+ parsed = tuple_encode(parsed)
+ split = tuple_encode(split)
+ url = scheme + url
+ parsed = (scheme,) + parsed
+ split = (scheme,) + split
+ self.checkRoundtrips(url, parsed, split)
def checkJoin(self, base, relurl, expected, *, relroundtrip=True):
with self.subTest(base=base, relurl=relurl):
@@ -363,12 +353,13 @@ class UrlParseTestCase(unittest.TestCase):
relurlb = urllib.parse.urlunsplit(urllib.parse.urlsplit(relurlb))
self.assertEqual(urllib.parse.urljoin(baseb, relurlb), expectedb)
- def test_unparse_parse(self):
- str_cases = ['Python', './Python','x-newscheme://foo.com/stuff','x://y','x:/y','x:/','/',]
- bytes_cases = [x.encode('ascii') for x in str_cases]
- for u in str_cases + bytes_cases:
- self.assertEqual(urllib.parse.urlunsplit(urllib.parse.urlsplit(u)), u)
- self.assertEqual(urllib.parse.urlunparse(urllib.parse.urlparse(u)), u)
+ @support.subTests('bytes', (False, True))
+ @support.subTests('u', ['Python', './Python','x-newscheme://foo.com/stuff','x://y','x:/y','x:/','/',])
+ def test_unparse_parse(self, bytes, u):
+ if bytes:
+ u = str_encode(u)
+ self.assertEqual(urllib.parse.urlunsplit(urllib.parse.urlsplit(u)), u)
+ self.assertEqual(urllib.parse.urlunparse(urllib.parse.urlparse(u)), u)
def test_RFC1808(self):
# "normal" cases from RFC 1808:
@@ -695,8 +686,8 @@ class UrlParseTestCase(unittest.TestCase):
self.checkJoin('///b/c', '///w', '///w')
self.checkJoin('///b/c', 'w', '///b/w')
- def test_RFC2732(self):
- str_cases = [
+ @support.subTests('bytes', (False, True))
+ @support.subTests('url,hostname,port', [
('http://Test.python.org:5432/foo/', 'test.python.org', 5432),
('http://12.34.56.78:5432/foo/', '12.34.56.78', 5432),
('http://[::1]:5432/foo/', '::1', 5432),
@@ -727,26 +718,28 @@ class UrlParseTestCase(unittest.TestCase):
('http://[::12.34.56.78]:/foo/', '::12.34.56.78', None),
('http://[::ffff:12.34.56.78]:/foo/',
'::ffff:12.34.56.78', None),
- ]
- def _encode(t):
- return t[0].encode('ascii'), t[1].encode('ascii'), t[2]
- bytes_cases = [_encode(x) for x in str_cases]
- for url, hostname, port in str_cases + bytes_cases:
- urlparsed = urllib.parse.urlparse(url)
- self.assertEqual((urlparsed.hostname, urlparsed.port) , (hostname, port))
-
- str_cases = [
+ ])
+ def test_RFC2732(self, bytes, url, hostname, port):
+ if bytes:
+ url = str_encode(url)
+ hostname = str_encode(hostname)
+ urlparsed = urllib.parse.urlparse(url)
+ self.assertEqual((urlparsed.hostname, urlparsed.port), (hostname, port))
+
+ @support.subTests('bytes', (False, True))
+ @support.subTests('invalid_url', [
'http://::12.34.56.78]/',
'http://[::1/foo/',
'ftp://[::1/foo/bad]/bad',
'http://[::1/foo/bad]/bad',
- 'http://[::ffff:12.34.56.78']
- bytes_cases = [x.encode('ascii') for x in str_cases]
- for invalid_url in str_cases + bytes_cases:
- self.assertRaises(ValueError, urllib.parse.urlparse, invalid_url)
-
- def test_urldefrag(self):
- str_cases = [
+ 'http://[::ffff:12.34.56.78'])
+ def test_RFC2732_invalid(self, bytes, invalid_url):
+ if bytes:
+ invalid_url = str_encode(invalid_url)
+ self.assertRaises(ValueError, urllib.parse.urlparse, invalid_url)
+
+ @support.subTests('bytes', (False, True))
+ @support.subTests('url,defrag,frag', [
('http://python.org#frag', 'http://python.org', 'frag'),
('http://python.org', 'http://python.org', ''),
('http://python.org/#frag', 'http://python.org/', 'frag'),
@@ -770,18 +763,18 @@ class UrlParseTestCase(unittest.TestCase):
('http:?q#f', 'http:?q', 'f'),
('//a/b/c;p?q#f', '//a/b/c;p?q', 'f'),
('://a/b/c;p?q#f', '://a/b/c;p?q', 'f'),
- ]
- def _encode(t):
- return type(t)(x.encode('ascii') for x in t)
- bytes_cases = [_encode(x) for x in str_cases]
- for url, defrag, frag in str_cases + bytes_cases:
- with self.subTest(url):
- result = urllib.parse.urldefrag(url)
- hash = '#' if isinstance(url, str) else b'#'
- self.assertEqual(result.geturl(), url.rstrip(hash))
- self.assertEqual(result, (defrag, frag))
- self.assertEqual(result.url, defrag)
- self.assertEqual(result.fragment, frag)
+ ])
+ def test_urldefrag(self, bytes, url, defrag, frag):
+ if bytes:
+ url = str_encode(url)
+ defrag = str_encode(defrag)
+ frag = str_encode(frag)
+ result = urllib.parse.urldefrag(url)
+ hash = '#' if isinstance(url, str) else b'#'
+ self.assertEqual(result.geturl(), url.rstrip(hash))
+ self.assertEqual(result, (defrag, frag))
+ self.assertEqual(result.url, defrag)
+ self.assertEqual(result.fragment, frag)
def test_urlsplit_scoped_IPv6(self):
p = urllib.parse.urlsplit('http://[FE80::822a:a8ff:fe49:470c%tESt]:1234')
@@ -981,42 +974,35 @@ class UrlParseTestCase(unittest.TestCase):
self.assertEqual(p.scheme, "https")
self.assertEqual(p.geturl(), "https://www.python.org/")
- def test_attributes_bad_port(self):
+ @support.subTests('bytes', (False, True))
+ @support.subTests('parse', (urllib.parse.urlsplit, urllib.parse.urlparse))
+ @support.subTests('port', ("foo", "1.5", "-1", "0x10", "-0", "1_1", " 1", "1 ", "६"))
+ def test_attributes_bad_port(self, bytes, parse, port):
"""Check handling of invalid ports."""
- for bytes in (False, True):
- for parse in (urllib.parse.urlsplit, urllib.parse.urlparse):
- for port in ("foo", "1.5", "-1", "0x10", "-0", "1_1", " 1", "1 ", "६"):
- with self.subTest(bytes=bytes, parse=parse, port=port):
- netloc = "www.example.net:" + port
- url = "http://" + netloc + "/"
- if bytes:
- if netloc.isascii() and port.isascii():
- netloc = netloc.encode("ascii")
- url = url.encode("ascii")
- else:
- continue
- p = parse(url)
- self.assertEqual(p.netloc, netloc)
- with self.assertRaises(ValueError):
- p.port
+ netloc = "www.example.net:" + port
+ url = "http://" + netloc + "/"
+ if bytes:
+ if not (netloc.isascii() and port.isascii()):
+ self.skipTest('non-ASCII bytes')
+ netloc = str_encode(netloc)
+ url = str_encode(url)
+ p = parse(url)
+ self.assertEqual(p.netloc, netloc)
+ with self.assertRaises(ValueError):
+ p.port
- def test_attributes_bad_scheme(self):
+ @support.subTests('bytes', (False, True))
+ @support.subTests('parse', (urllib.parse.urlsplit, urllib.parse.urlparse))
+ @support.subTests('scheme', (".", "+", "-", "0", "http&", "६http"))
+ def test_attributes_bad_scheme(self, bytes, parse, scheme):
"""Check handling of invalid schemes."""
- for bytes in (False, True):
- for parse in (urllib.parse.urlsplit, urllib.parse.urlparse):
- for scheme in (".", "+", "-", "0", "http&", "६http"):
- with self.subTest(bytes=bytes, parse=parse, scheme=scheme):
- url = scheme + "://www.example.net"
- if bytes:
- if url.isascii():
- url = url.encode("ascii")
- else:
- continue
- p = parse(url)
- if bytes:
- self.assertEqual(p.scheme, b"")
- else:
- self.assertEqual(p.scheme, "")
+ url = scheme + "://www.example.net"
+ if bytes:
+ if not url.isascii():
+ self.skipTest('non-ASCII bytes')
+ url = url.encode("ascii")
+ p = parse(url)
+ self.assertEqual(p.scheme, b"" if bytes else "")
def test_attributes_without_netloc(self):
# This example is straight from RFC 3261. It looks like it
@@ -1128,24 +1114,21 @@ class UrlParseTestCase(unittest.TestCase):
self.assertEqual(urllib.parse.urlparse(b"x-newscheme://foo.com/stuff?query"),
(b'x-newscheme', b'foo.com', b'/stuff', b'', b'query', b''))
- def test_default_scheme(self):
+ @support.subTests('func', (urllib.parse.urlparse, urllib.parse.urlsplit))
+ def test_default_scheme(self, func):
# Exercise the scheme parameter of urlparse() and urlsplit()
- for func in (urllib.parse.urlparse, urllib.parse.urlsplit):
- with self.subTest(function=func):
- result = func("http://example.net/", "ftp")
- self.assertEqual(result.scheme, "http")
- result = func(b"http://example.net/", b"ftp")
- self.assertEqual(result.scheme, b"http")
- self.assertEqual(func("path", "ftp").scheme, "ftp")
- self.assertEqual(func("path", scheme="ftp").scheme, "ftp")
- self.assertEqual(func(b"path", scheme=b"ftp").scheme, b"ftp")
- self.assertEqual(func("path").scheme, "")
- self.assertEqual(func(b"path").scheme, b"")
- self.assertEqual(func(b"path", "").scheme, b"")
-
- def test_parse_fragments(self):
- # Exercise the allow_fragments parameter of urlparse() and urlsplit()
- tests = (
+ result = func("http://example.net/", "ftp")
+ self.assertEqual(result.scheme, "http")
+ result = func(b"http://example.net/", b"ftp")
+ self.assertEqual(result.scheme, b"http")
+ self.assertEqual(func("path", "ftp").scheme, "ftp")
+ self.assertEqual(func("path", scheme="ftp").scheme, "ftp")
+ self.assertEqual(func(b"path", scheme=b"ftp").scheme, b"ftp")
+ self.assertEqual(func("path").scheme, "")
+ self.assertEqual(func(b"path").scheme, b"")
+ self.assertEqual(func(b"path", "").scheme, b"")
+
+ @support.subTests('url,attr,expected_frag', (
("http:#frag", "path", "frag"),
("//example.net#frag", "path", "frag"),
("index.html#frag", "path", "frag"),
@@ -1156,24 +1139,24 @@ class UrlParseTestCase(unittest.TestCase):
("//abc#@frag", "path", "@frag"),
("//abc:80#@frag", "path", "@frag"),
("//abc#@frag:80", "path", "@frag:80"),
- )
- for url, attr, expected_frag in tests:
- for func in (urllib.parse.urlparse, urllib.parse.urlsplit):
- if attr == "params" and func is urllib.parse.urlsplit:
- attr = "path"
- with self.subTest(url=url, function=func):
- result = func(url, allow_fragments=False)
- self.assertEqual(result.fragment, "")
- self.assertEndsWith(getattr(result, attr),
- "#" + expected_frag)
- self.assertEqual(func(url, "", False).fragment, "")
-
- result = func(url, allow_fragments=True)
- self.assertEqual(result.fragment, expected_frag)
- self.assertNotEndsWith(getattr(result, attr), expected_frag)
- self.assertEqual(func(url, "", True).fragment,
- expected_frag)
- self.assertEqual(func(url).fragment, expected_frag)
+ ))
+ @support.subTests('func', (urllib.parse.urlparse, urllib.parse.urlsplit))
+ def test_parse_fragments(self, url, attr, expected_frag, func):
+ # Exercise the allow_fragments parameter of urlparse() and urlsplit()
+ if attr == "params" and func is urllib.parse.urlsplit:
+ attr = "path"
+ result = func(url, allow_fragments=False)
+ self.assertEqual(result.fragment, "")
+ self.assertEndsWith(getattr(result, attr),
+ "#" + expected_frag)
+ self.assertEqual(func(url, "", False).fragment, "")
+
+ result = func(url, allow_fragments=True)
+ self.assertEqual(result.fragment, expected_frag)
+ self.assertNotEndsWith(getattr(result, attr), expected_frag)
+ self.assertEqual(func(url, "", True).fragment,
+ expected_frag)
+ self.assertEqual(func(url).fragment, expected_frag)
def test_mixed_types_rejected(self):
# Several functions that process either strings or ASCII encoded bytes
@@ -1199,7 +1182,14 @@ class UrlParseTestCase(unittest.TestCase):
with self.assertRaisesRegex(TypeError, "Cannot mix str"):
urllib.parse.urljoin(b"http://python.org", "http://python.org")
- def _check_result_type(self, str_type):
+ @support.subTests('result_type', [
+ urllib.parse.DefragResult,
+ urllib.parse.SplitResult,
+ urllib.parse.ParseResult,
+ ])
+ def test_result_pairs(self, result_type):
+ # Check encoding and decoding between result pairs
+ str_type = result_type
num_args = len(str_type._fields)
bytes_type = str_type._encoded_counterpart
self.assertIs(bytes_type._decoded_counterpart, str_type)
@@ -1224,16 +1214,6 @@ class UrlParseTestCase(unittest.TestCase):
self.assertEqual(str_result.encode(encoding, errors), bytes_args)
self.assertEqual(str_result.encode(encoding, errors), bytes_result)
- def test_result_pairs(self):
- # Check encoding and decoding between result pairs
- result_types = [
- urllib.parse.DefragResult,
- urllib.parse.SplitResult,
- urllib.parse.ParseResult,
- ]
- for result_type in result_types:
- self._check_result_type(result_type)
-
def test_parse_qs_encoding(self):
result = urllib.parse.parse_qs("key=\u0141%E9", encoding="latin-1")
self.assertEqual(result, {'key': ['\u0141\xE9']})
@@ -1265,8 +1245,7 @@ class UrlParseTestCase(unittest.TestCase):
urllib.parse.parse_qsl('&'.join(['a=a']*11), max_num_fields=10)
urllib.parse.parse_qsl('&'.join(['a=a']*10), max_num_fields=10)
- def test_parse_qs_separator(self):
- parse_qs_semicolon_cases = [
+ @support.subTests('orig,expect', [
(";", {}),
(";;", {}),
(";a=b", {'a': ['b']}),
@@ -1277,17 +1256,14 @@ class UrlParseTestCase(unittest.TestCase):
(b";a=b", {b'a': [b'b']}),
(b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
(b"a=1;a=2", {b'a': [b'1', b'2']}),
- ]
- for orig, expect in parse_qs_semicolon_cases:
- with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"):
- result = urllib.parse.parse_qs(orig, separator=';')
- self.assertEqual(result, expect, "Error parsing %r" % orig)
- result_bytes = urllib.parse.parse_qs(orig, separator=b';')
- self.assertEqual(result_bytes, expect, "Error parsing %r" % orig)
-
-
- def test_parse_qsl_separator(self):
- parse_qsl_semicolon_cases = [
+ ])
+ def test_parse_qs_separator(self, orig, expect):
+ result = urllib.parse.parse_qs(orig, separator=';')
+ self.assertEqual(result, expect)
+ result_bytes = urllib.parse.parse_qs(orig, separator=b';')
+ self.assertEqual(result_bytes, expect)
+
+ @support.subTests('orig,expect', [
(";", []),
(";;", []),
(";a=b", [('a', 'b')]),
@@ -1298,13 +1274,12 @@ class UrlParseTestCase(unittest.TestCase):
(b";a=b", [(b'a', b'b')]),
(b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
(b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]),
- ]
- for orig, expect in parse_qsl_semicolon_cases:
- with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"):
- result = urllib.parse.parse_qsl(orig, separator=';')
- self.assertEqual(result, expect, "Error parsing %r" % orig)
- result_bytes = urllib.parse.parse_qsl(orig, separator=b';')
- self.assertEqual(result_bytes, expect, "Error parsing %r" % orig)
+ ])
+ def test_parse_qsl_separator(self, orig, expect):
+ result = urllib.parse.parse_qsl(orig, separator=';')
+ self.assertEqual(result, expect)
+ result_bytes = urllib.parse.parse_qsl(orig, separator=b';')
+ self.assertEqual(result_bytes, expect)
def test_parse_qsl_bytes(self):
self.assertEqual(urllib.parse.parse_qsl(b'a=b'), [(b'a', b'b')])
@@ -1695,11 +1670,12 @@ class Utility_Tests(unittest.TestCase):
self.assertRaises(UnicodeError, urllib.parse._to_bytes,
'http://www.python.org/medi\u00e6val')
- def test_unwrap(self):
- for wrapped_url in ('<URL:scheme://host/path>', '<scheme://host/path>',
- 'URL:scheme://host/path', 'scheme://host/path'):
- url = urllib.parse.unwrap(wrapped_url)
- self.assertEqual(url, 'scheme://host/path')
+ @support.subTests('wrapped_url',
+ ('<URL:scheme://host/path>', '<scheme://host/path>',
+ 'URL:scheme://host/path', 'scheme://host/path'))
+ def test_unwrap(self, wrapped_url):
+ url = urllib.parse.unwrap(wrapped_url)
+ self.assertEqual(url, 'scheme://host/path')
class DeprecationTest(unittest.TestCase):
@@ -1780,5 +1756,11 @@ class DeprecationTest(unittest.TestCase):
'urllib.parse.to_bytes() is deprecated as of 3.8')
+def str_encode(s):
+ return s.encode('ascii')
+
+def tuple_encode(t):
+ return tuple(str_encode(x) for x in t)
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py
index 014634e450e..d4c28aed38e 100644
--- a/Lib/test/test_zstd.py
+++ b/Lib/test/test_zstd.py
@@ -293,11 +293,11 @@ class CompressorTestCase(unittest.TestCase):
# zstd lib doesn't support MT compression
if not SUPPORT_MULTITHREADING:
- with self.assertRaises(ZstdError):
+ with self.assertRaises(ValueError):
ZstdCompressor(options={CompressionParameter.nb_workers:4})
- with self.assertRaises(ZstdError):
+ with self.assertRaises(ValueError):
ZstdCompressor(options={CompressionParameter.job_size:4})
- with self.assertRaises(ZstdError):
+ with self.assertRaises(ValueError):
ZstdCompressor(options={CompressionParameter.overlap_log:4})
# out of bounds error msg
@@ -395,6 +395,115 @@ class CompressorTestCase(unittest.TestCase):
c = ZstdCompressor()
self.assertNotEqual(c.compress(b'', c.FLUSH_FRAME), b'')
+ def test_set_pledged_input_size(self):
+ DAT = DECOMPRESSED_100_PLUS_32KB
+ CHUNK_SIZE = len(DAT) // 3
+
+ # wrong value
+ c = ZstdCompressor()
+ with self.assertRaisesRegex(ValueError,
+ r'should be a positive int less than \d+'):
+ c.set_pledged_input_size(-300)
+ # overflow
+ with self.assertRaisesRegex(ValueError,
+ r'should be a positive int less than \d+'):
+ c.set_pledged_input_size(2**64)
+ # ZSTD_CONTENTSIZE_ERROR is invalid
+ with self.assertRaisesRegex(ValueError,
+ r'should be a positive int less than \d+'):
+ c.set_pledged_input_size(2**64-2)
+ # ZSTD_CONTENTSIZE_UNKNOWN should use None
+ with self.assertRaisesRegex(ValueError,
+ r'should be a positive int less than \d+'):
+ c.set_pledged_input_size(2**64-1)
+
+ # check valid values are settable
+ c.set_pledged_input_size(2**63)
+ c.set_pledged_input_size(2**64-3)
+
+ # check that zero means empty frame
+ c = ZstdCompressor(level=1)
+ c.set_pledged_input_size(0)
+ c.compress(b'')
+ dat = c.flush()
+ ret = get_frame_info(dat)
+ self.assertEqual(ret.decompressed_size, 0)
+
+
+ # wrong mode
+ c = ZstdCompressor(level=1)
+ c.compress(b'123456')
+ self.assertEqual(c.last_mode, c.CONTINUE)
+ with self.assertRaisesRegex(ValueError,
+ r'last_mode == FLUSH_FRAME'):
+ c.set_pledged_input_size(300)
+
+ # None value
+ c = ZstdCompressor(level=1)
+ c.set_pledged_input_size(None)
+ dat = c.compress(DAT) + c.flush()
+
+ ret = get_frame_info(dat)
+ self.assertEqual(ret.decompressed_size, None)
+
+ # correct value
+ c = ZstdCompressor(level=1)
+ c.set_pledged_input_size(len(DAT))
+
+ chunks = []
+ posi = 0
+ while posi < len(DAT):
+ dat = c.compress(DAT[posi:posi+CHUNK_SIZE])
+ posi += CHUNK_SIZE
+ chunks.append(dat)
+
+ dat = c.flush()
+ chunks.append(dat)
+ chunks = b''.join(chunks)
+
+ ret = get_frame_info(chunks)
+ self.assertEqual(ret.decompressed_size, len(DAT))
+ self.assertEqual(decompress(chunks), DAT)
+
+ c.set_pledged_input_size(len(DAT)) # the second frame
+ dat = c.compress(DAT) + c.flush()
+
+ ret = get_frame_info(dat)
+ self.assertEqual(ret.decompressed_size, len(DAT))
+ self.assertEqual(decompress(dat), DAT)
+
+ # not enough data
+ c = ZstdCompressor(level=1)
+ c.set_pledged_input_size(len(DAT)+1)
+
+ for start in range(0, len(DAT), CHUNK_SIZE):
+ end = min(start+CHUNK_SIZE, len(DAT))
+ _dat = c.compress(DAT[start:end])
+
+ with self.assertRaises(ZstdError):
+ c.flush()
+
+ # too much data
+ c = ZstdCompressor(level=1)
+ c.set_pledged_input_size(len(DAT))
+
+ for start in range(0, len(DAT), CHUNK_SIZE):
+ end = min(start+CHUNK_SIZE, len(DAT))
+ _dat = c.compress(DAT[start:end])
+
+ with self.assertRaises(ZstdError):
+ c.compress(b'extra', ZstdCompressor.FLUSH_FRAME)
+
+ # content size not set if content_size_flag == 0
+ c = ZstdCompressor(options={CompressionParameter.content_size_flag: 0})
+ c.set_pledged_input_size(10)
+ dat1 = c.compress(b"hello")
+ dat2 = c.compress(b"world")
+ dat3 = c.flush()
+ frame_data = get_frame_info(dat1 + dat2 + dat3)
+ self.assertIsNone(frame_data.decompressed_size)
+
+
class DecompressorTestCase(unittest.TestCase):
def test_simple_decompress_bad_args(self):
@@ -1138,27 +1247,41 @@ class ZstdDictTestCase(unittest.TestCase):
ZstdDecompressor(zd)
# wrong type
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
- ZstdCompressor(zstd_dict=(zd, b'123'))
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
+ ZstdCompressor(zstd_dict=[zd, 1])
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
+ ZstdCompressor(zstd_dict=(zd, 1.0))
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
+ ZstdCompressor(zstd_dict=(zd,))
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
ZstdCompressor(zstd_dict=(zd, 1, 2))
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
ZstdCompressor(zstd_dict=(zd, -1))
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
ZstdCompressor(zstd_dict=(zd, 3))
-
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
- ZstdDecompressor(zstd_dict=(zd, b'123'))
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
+ with self.assertRaises(OverflowError):
+ ZstdCompressor(zstd_dict=(zd, 2**1000))
+ with self.assertRaises(OverflowError):
+ ZstdCompressor(zstd_dict=(zd, -2**1000))
+
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
+ ZstdDecompressor(zstd_dict=[zd, 1])
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
+ ZstdDecompressor(zstd_dict=(zd, 1.0))
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
+ ZstdDecompressor((zd,))
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
ZstdDecompressor((zd, 1, 2))
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
ZstdDecompressor((zd, -1))
- with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'):
+ with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'):
ZstdDecompressor((zd, 3))
+ with self.assertRaises(OverflowError):
+ ZstdDecompressor((zd, 2**1000))
+ with self.assertRaises(OverflowError):
+ ZstdDecompressor((zd, -2**1000))
def test_train_dict(self):
-
-
TRAINED_DICT = train_dict(SAMPLES, DICT_SIZE1)
ZstdDict(TRAINED_DICT.dict_content, is_raw=False)
@@ -1240,17 +1363,36 @@ class ZstdDictTestCase(unittest.TestCase):
with self.assertRaises(TypeError):
_zstd.train_dict({}, (), 100)
with self.assertRaises(TypeError):
+ _zstd.train_dict(bytearray(), (), 100)
+ with self.assertRaises(TypeError):
_zstd.train_dict(b'', 99, 100)
with self.assertRaises(TypeError):
+ _zstd.train_dict(b'', [], 100)
+ with self.assertRaises(TypeError):
_zstd.train_dict(b'', (), 100.1)
+ with self.assertRaises(TypeError):
+ _zstd.train_dict(b'', (99.1,), 100)
+ with self.assertRaises(ValueError):
+ _zstd.train_dict(b'abc', (4, -1), 100)
+ with self.assertRaises(ValueError):
+ _zstd.train_dict(b'abc', (2,), 100)
+ with self.assertRaises(ValueError):
+ _zstd.train_dict(b'', (99,), 100)
# size > size_t
with self.assertRaises(ValueError):
- _zstd.train_dict(b'', (2**64+1,), 100)
+ _zstd.train_dict(b'', (2**1000,), 100)
+ with self.assertRaises(ValueError):
+ _zstd.train_dict(b'', (-2**1000,), 100)
# dict_size <= 0
with self.assertRaises(ValueError):
_zstd.train_dict(b'', (), 0)
+ with self.assertRaises(ValueError):
+ _zstd.train_dict(b'', (), -1)
+
+ with self.assertRaises(ZstdError):
+ _zstd.train_dict(b'', (), 1)
def test_finalize_dict_c(self):
with self.assertRaises(TypeError):
@@ -1260,21 +1402,50 @@ class ZstdDictTestCase(unittest.TestCase):
with self.assertRaises(TypeError):
_zstd.finalize_dict({}, b'', (), 100, 5)
with self.assertRaises(TypeError):
+ _zstd.finalize_dict(bytearray(TRAINED_DICT.dict_content), b'', (), 100, 5)
+ with self.assertRaises(TypeError):
_zstd.finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5)
with self.assertRaises(TypeError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, bytearray(), (), 100, 5)
+ with self.assertRaises(TypeError):
_zstd.finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5)
with self.assertRaises(TypeError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', [], 100, 5)
+ with self.assertRaises(TypeError):
_zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5)
with self.assertRaises(TypeError):
_zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1)
+ with self.assertRaises(ValueError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (4, -1), 100, 5)
+ with self.assertRaises(ValueError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (2,), 100, 5)
+ with self.assertRaises(ValueError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (99,), 100, 5)
+
# size > size_t
with self.assertRaises(ValueError):
- _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5)
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**1000,), 100, 5)
+ with self.assertRaises(ValueError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (-2**1000,), 100, 5)
# dict_size <= 0
with self.assertRaises(ValueError):
_zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5)
+ with self.assertRaises(ValueError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -1, 5)
+ with self.assertRaises(OverflowError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 2**1000, 5)
+ with self.assertRaises(OverflowError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -2**1000, 5)
+
+ with self.assertRaises(OverflowError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 2**1000)
+ with self.assertRaises(OverflowError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, -2**1000)
+
+ with self.assertRaises(ZstdError):
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5)
def test_train_buffer_protocol_samples(self):
def _nbytes(dat):