diff options
Diffstat (limited to 'Lib/test')
46 files changed, 2742 insertions, 984 deletions
diff --git a/Lib/test/_code_definitions.py b/Lib/test/_code_definitions.py index 733a15b25f6..274beb65a6d 100644 --- a/Lib/test/_code_definitions.py +++ b/Lib/test/_code_definitions.py @@ -57,6 +57,15 @@ def spam_with_globals_and_builtins(): print(res) +def spam_full_args(a, b, /, c, d, *args, e, f, **kwargs): + return (a, b, c, d, e, f, args, kwargs) + + +def spam_full_args_with_defaults(a=-1, b=-2, /, c=-3, d=-4, *args, + e=-5, f=-6, **kwargs): + return (a, b, c, d, e, f, args, kwargs) + + def spam_args_attrs_and_builtins(a, b, /, c, d, *args, e, f, **kwargs): if args.__len__() > 2: return None @@ -67,6 +76,10 @@ def spam_returns_arg(x): return x +def spam_raises(): + raise Exception('spam!') + + def spam_with_inner_not_closure(): def eggs(): pass @@ -177,8 +190,11 @@ TOP_FUNCTIONS = [ spam_minimal, spam_with_builtins, spam_with_globals_and_builtins, + spam_full_args, + spam_full_args_with_defaults, spam_args_attrs_and_builtins, spam_returns_arg, + spam_raises, spam_with_inner_not_closure, spam_with_inner_closure, spam_annotated, @@ -219,8 +235,10 @@ STATELESS_FUNCTIONS = [ spam, spam_minimal, spam_with_builtins, + spam_full_args, spam_args_attrs_and_builtins, spam_returns_arg, + spam_raises, spam_annotated, spam_with_inner_not_closure, spam_with_inner_closure, @@ -238,6 +256,7 @@ STATELESS_FUNCTIONS = [ STATELESS_CODE = [ *STATELESS_FUNCTIONS, script_with_globals, + spam_full_args_with_defaults, spam_with_globals_and_builtins, spam_full, ] @@ -248,6 +267,7 @@ PURE_SCRIPT_FUNCTIONS = [ script_with_explicit_empty_return, spam_minimal, spam_with_builtins, + spam_raises, spam_with_inner_not_closure, spam_with_inner_closure, ] diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 713cbedb299..0d9c059a938 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -543,8 +543,6 @@ class Regrtest: self.first_runtests = runtests self.logger.set_tests(runtests) - setup_process() - if (runtests.hunt_refleak is not None) and (not self.num_workers): # gh-109739: WindowsLoadTracker thread interferes with refleak check use_load_tracker = False @@ -721,10 +719,7 @@ class Regrtest: self._execute_python(cmd, environ) def _init(self): - # Set sys.stdout encoder error handler to backslashreplace, - # similar to sys.stderr error handler, to avoid UnicodeEncodeError - # when printing a traceback or any other non-encodable character. - sys.stdout.reconfigure(errors="backslashreplace") + setup_process() if self.junit_filename and not os.path.isabs(self.junit_filename): self.junit_filename = os.path.abspath(self.junit_filename) diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py index c3d1f60a400..9bfc414cd61 100644 --- a/Lib/test/libregrtest/setup.py +++ b/Lib/test/libregrtest/setup.py @@ -1,5 +1,6 @@ import faulthandler import gc +import io import os import random import signal @@ -52,6 +53,14 @@ def setup_process() -> None: support.record_original_stdout(sys.stdout) + # Set sys.stdout encoder error handler to backslashreplace, + # similar to sys.stderr error handler, to avoid UnicodeEncodeError + # when printing a traceback or any other non-encodable character. + # + # Use an assertion to fix mypy error. + assert isinstance(sys.stdout, io.TextIOWrapper) + sys.stdout.reconfigure(errors="backslashreplace") + # Some times __path__ and __file__ are not absolute (e.g. while running from # Lib/) and, if we change the CWD to run the tests in a temporary dir, some # imports might fail. This affects only the modules imported before os.chdir(). diff --git a/Lib/test/subprocessdata/fd_status.py b/Lib/test/subprocessdata/fd_status.py index d12bd95abee..90e785981ae 100644 --- a/Lib/test/subprocessdata/fd_status.py +++ b/Lib/test/subprocessdata/fd_status.py @@ -2,7 +2,7 @@ file descriptors on stdout. Usage: -fd_stats.py: check all file descriptors +fd_status.py: check all file descriptors (up to 255) fd_status.py fd1 fd2 ...: check only specified file descriptors """ @@ -18,7 +18,7 @@ if __name__ == "__main__": _MAXFD = os.sysconf("SC_OPEN_MAX") except: _MAXFD = 256 - test_fds = range(0, _MAXFD) + test_fds = range(0, min(_MAXFD, 256)) else: test_fds = map(int, sys.argv[1:]) for fd in test_fds: diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 351d832a26d..48e74adcce3 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -945,6 +945,31 @@ def check_sizeof(test, o, size): % (type(o), result, size) test.assertEqual(result, size, msg) +def subTests(arg_names, arg_values, /, *, _do_cleanups=False): + """Run multiple subtests with different parameters. + """ + single_param = False + if isinstance(arg_names, str): + arg_names = arg_names.replace(',',' ').split() + if len(arg_names) == 1: + single_param = True + arg_values = tuple(arg_values) + def decorator(func): + if isinstance(func, type): + raise TypeError('subTests() can only decorate methods, not classes') + @functools.wraps(func) + def wrapper(self, /, *args, **kwargs): + for values in arg_values: + if single_param: + values = (values,) + subtest_kwargs = dict(zip(arg_names, values)) + with self.subTest(**subtest_kwargs): + func(self, *args, **kwargs, **subtest_kwargs) + if _do_cleanups: + self.doCleanups() + return wrapper + return decorator + #======================================================================= # Decorator/context manager for running a code in a different locale, # correctly resetting it afterwards. @@ -1084,7 +1109,7 @@ def set_memlimit(limit: str) -> None: global real_max_memuse memlimit = _parse_memlimit(limit) if memlimit < _2G - 1: - raise ValueError('Memory limit {limit!r} too low to be useful') + raise ValueError(f'Memory limit {limit!r} too low to be useful') real_max_memuse = memlimit memlimit = min(memlimit, MAX_Py_ssize_t) @@ -2358,7 +2383,7 @@ def infinite_recursion(max_depth=None): # very deep recursion. max_depth = 20_000 elif max_depth < 3: - raise ValueError("max_depth must be at least 3, got {max_depth}") + raise ValueError(f"max_depth must be at least 3, got {max_depth}") depth = get_recursion_depth() depth = max(depth - 1, 1) # Ignore infinite_recursion() frame. limit = depth + max_depth diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py index e067f259364..6d1b0690805 100644 --- a/Lib/test/support/interpreters/__init__.py +++ b/Lib/test/support/interpreters/__init__.py @@ -226,33 +226,32 @@ class Interpreter: if excinfo is not None: raise ExecutionFailed(excinfo) - def call(self, callable, /): - """Call the object in the interpreter with given args/kwargs. + def _call(self, callable, args, kwargs): + res, excinfo = _interpreters.call(self._id, callable, args, kwargs, restrict=True) + if excinfo is not None: + raise ExecutionFailed(excinfo) + return res - Only functions that take no arguments and have no closure - are supported. + def call(self, callable, /, *args, **kwargs): + """Call the object in the interpreter with given args/kwargs. - The return value is discarded. + Nearly all callables, args, kwargs, and return values are + supported. All "shareable" objects are supported, as are + "stateless" functions (meaning non-closures that do not use + any globals). This method will fall back to pickle. If the callable raises an exception then the error display - (including full traceback) is send back between the interpreters + (including full traceback) is sent back between the interpreters and an ExecutionFailed exception is raised, much like what happens with Interpreter.exec(). """ - # XXX Support args and kwargs. - # XXX Support arbitrary callables. - # XXX Support returning the return value (e.g. via pickle). - excinfo = _interpreters.call(self._id, callable, restrict=True) - if excinfo is not None: - raise ExecutionFailed(excinfo) + return self._call(callable, args, kwargs) - def call_in_thread(self, callable, /): + def call_in_thread(self, callable, /, *args, **kwargs): """Return a new thread that calls the object in the interpreter. The return value and any raised exception are discarded. """ - def task(): - self.call(callable) - t = threading.Thread(target=task) + t = threading.Thread(target=self._call, args=(callable, args, kwargs)) t.start() return t diff --git a/Lib/test/test_ast/test_ast.py b/Lib/test/test_ast/test_ast.py index 46745cfa8f8..cc46529c0ef 100644 --- a/Lib/test/test_ast/test_ast.py +++ b/Lib/test/test_ast/test_ast.py @@ -1372,17 +1372,17 @@ class ASTHelpers_Test(unittest.TestCase): def test_dump(self): node = ast.parse('spam(eggs, "and cheese")') self.assertEqual(ast.dump(node), - "Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), " - "args=[Name(id='eggs', ctx=Load()), Constant(value='and cheese')]))])" + "Module(body=[Expr(value=Call(func=Name(id='spam'), " + "args=[Name(id='eggs'), Constant(value='and cheese')]))])" ) self.assertEqual(ast.dump(node, annotate_fields=False), - "Module([Expr(Call(Name('spam', Load()), [Name('eggs', Load()), " + "Module([Expr(Call(Name('spam'), [Name('eggs'), " "Constant('and cheese')]))])" ) self.assertEqual(ast.dump(node, include_attributes=True), - "Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load(), " + "Module(body=[Expr(value=Call(func=Name(id='spam', " "lineno=1, col_offset=0, end_lineno=1, end_col_offset=4), " - "args=[Name(id='eggs', ctx=Load(), lineno=1, col_offset=5, " + "args=[Name(id='eggs', lineno=1, col_offset=5, " "end_lineno=1, end_col_offset=9), Constant(value='and cheese', " "lineno=1, col_offset=11, end_lineno=1, end_col_offset=23)], " "lineno=1, col_offset=0, end_lineno=1, end_col_offset=24), " @@ -1396,18 +1396,18 @@ Module( body=[ Expr( value=Call( - func=Name(id='spam', ctx=Load()), + func=Name(id='spam'), args=[ - Name(id='eggs', ctx=Load()), + Name(id='eggs'), Constant(value='and cheese')]))])""") self.assertEqual(ast.dump(node, annotate_fields=False, indent='\t'), """\ Module( \t[ \t\tExpr( \t\t\tCall( -\t\t\t\tName('spam', Load()), +\t\t\t\tName('spam'), \t\t\t\t[ -\t\t\t\t\tName('eggs', Load()), +\t\t\t\t\tName('eggs'), \t\t\t\t\tConstant('and cheese')]))])""") self.assertEqual(ast.dump(node, include_attributes=True, indent=3), """\ Module( @@ -1416,7 +1416,6 @@ Module( value=Call( func=Name( id='spam', - ctx=Load(), lineno=1, col_offset=0, end_lineno=1, @@ -1424,7 +1423,6 @@ Module( args=[ Name( id='eggs', - ctx=Load(), lineno=1, col_offset=5, end_lineno=1, @@ -1454,23 +1452,23 @@ Module( ) node = ast.Raise(exc=ast.Name(id='e', ctx=ast.Load()), lineno=3, col_offset=4) self.assertEqual(ast.dump(node), - "Raise(exc=Name(id='e', ctx=Load()))" + "Raise(exc=Name(id='e'))" ) self.assertEqual(ast.dump(node, annotate_fields=False), - "Raise(Name('e', Load()))" + "Raise(Name('e'))" ) self.assertEqual(ast.dump(node, include_attributes=True), - "Raise(exc=Name(id='e', ctx=Load()), lineno=3, col_offset=4)" + "Raise(exc=Name(id='e'), lineno=3, col_offset=4)" ) self.assertEqual(ast.dump(node, annotate_fields=False, include_attributes=True), - "Raise(Name('e', Load()), lineno=3, col_offset=4)" + "Raise(Name('e'), lineno=3, col_offset=4)" ) node = ast.Raise(cause=ast.Name(id='e', ctx=ast.Load())) self.assertEqual(ast.dump(node), - "Raise(cause=Name(id='e', ctx=Load()))" + "Raise(cause=Name(id='e'))" ) self.assertEqual(ast.dump(node, annotate_fields=False), - "Raise(cause=Name('e', Load()))" + "Raise(cause=Name('e'))" ) # Arguments: node = ast.arguments(args=[ast.arg("x")]) @@ -1502,10 +1500,10 @@ Module( [ast.Name('dataclass', ctx=ast.Load())], ) self.assertEqual(ast.dump(node), - "ClassDef(name='T', keywords=[keyword(arg='a', value=Constant(value=None))], decorator_list=[Name(id='dataclass', ctx=Load())])", + "ClassDef(name='T', keywords=[keyword(arg='a', value=Constant(value=None))], decorator_list=[Name(id='dataclass')])", ) self.assertEqual(ast.dump(node, annotate_fields=False), - "ClassDef('T', [], [keyword('a', Constant(None))], [], [Name('dataclass', Load())])", + "ClassDef('T', [], [keyword('a', Constant(None))], [], [Name('dataclass')])", ) def test_dump_show_empty(self): @@ -1533,7 +1531,7 @@ Module( check_node( # Corner case: there are no real `Name` instances with `id=''`: ast.Name(id='', ctx=ast.Load()), - empty="Name(id='', ctx=Load())", + empty="Name(id='')", full="Name(id='', ctx=Load())", ) @@ -1544,39 +1542,63 @@ Module( ) check_node( + ast.MatchSingleton(value=[]), + empty="MatchSingleton(value=[])", + full="MatchSingleton(value=[])", + ) + + check_node( ast.Constant(value=None), empty="Constant(value=None)", full="Constant(value=None)", ) check_node( + ast.Constant(value=[]), + empty="Constant(value=[])", + full="Constant(value=[])", + ) + + check_node( ast.Constant(value=''), empty="Constant(value='')", full="Constant(value='')", ) + check_node( + ast.Interpolation(value=ast.Constant(42), str=None, conversion=-1), + empty="Interpolation(value=Constant(value=42), str=None, conversion=-1)", + full="Interpolation(value=Constant(value=42), str=None, conversion=-1)", + ) + + check_node( + ast.Interpolation(value=ast.Constant(42), str=[], conversion=-1), + empty="Interpolation(value=Constant(value=42), str=[], conversion=-1)", + full="Interpolation(value=Constant(value=42), str=[], conversion=-1)", + ) + check_text( "def a(b: int = 0, *, c): ...", - empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int', ctx=Load()))], kwonlyargs=[arg(arg='c')], kw_defaults=[None], defaults=[Constant(value=0)]), body=[Expr(value=Constant(value=Ellipsis))])])", + empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int'))], kwonlyargs=[arg(arg='c')], kw_defaults=[None], defaults=[Constant(value=0)]), body=[Expr(value=Constant(value=Ellipsis))])])", full="Module(body=[FunctionDef(name='a', args=arguments(posonlyargs=[], args=[arg(arg='b', annotation=Name(id='int', ctx=Load()))], kwonlyargs=[arg(arg='c')], kw_defaults=[None], defaults=[Constant(value=0)]), body=[Expr(value=Constant(value=Ellipsis))], decorator_list=[], type_params=[])], type_ignores=[])", ) check_text( "def a(b: int = 0, *, c): ...", - empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=9, end_lineno=1, end_col_offset=12), lineno=1, col_offset=6, end_lineno=1, end_col_offset=12)], kwonlyargs=[arg(arg='c', lineno=1, col_offset=21, end_lineno=1, end_col_offset=22)], kw_defaults=[None], defaults=[Constant(value=0, lineno=1, col_offset=15, end_lineno=1, end_col_offset=16)]), body=[Expr(value=Constant(value=Ellipsis, lineno=1, col_offset=25, end_lineno=1, end_col_offset=28), lineno=1, col_offset=25, end_lineno=1, end_col_offset=28)], lineno=1, col_offset=0, end_lineno=1, end_col_offset=28)])", + empty="Module(body=[FunctionDef(name='a', args=arguments(args=[arg(arg='b', annotation=Name(id='int', lineno=1, col_offset=9, end_lineno=1, end_col_offset=12), lineno=1, col_offset=6, end_lineno=1, end_col_offset=12)], kwonlyargs=[arg(arg='c', lineno=1, col_offset=21, end_lineno=1, end_col_offset=22)], kw_defaults=[None], defaults=[Constant(value=0, lineno=1, col_offset=15, end_lineno=1, end_col_offset=16)]), body=[Expr(value=Constant(value=Ellipsis, lineno=1, col_offset=25, end_lineno=1, end_col_offset=28), lineno=1, col_offset=25, end_lineno=1, end_col_offset=28)], lineno=1, col_offset=0, end_lineno=1, end_col_offset=28)])", full="Module(body=[FunctionDef(name='a', args=arguments(posonlyargs=[], args=[arg(arg='b', annotation=Name(id='int', ctx=Load(), lineno=1, col_offset=9, end_lineno=1, end_col_offset=12), lineno=1, col_offset=6, end_lineno=1, end_col_offset=12)], kwonlyargs=[arg(arg='c', lineno=1, col_offset=21, end_lineno=1, end_col_offset=22)], kw_defaults=[None], defaults=[Constant(value=0, lineno=1, col_offset=15, end_lineno=1, end_col_offset=16)]), body=[Expr(value=Constant(value=Ellipsis, lineno=1, col_offset=25, end_lineno=1, end_col_offset=28), lineno=1, col_offset=25, end_lineno=1, end_col_offset=28)], decorator_list=[], type_params=[], lineno=1, col_offset=0, end_lineno=1, end_col_offset=28)], type_ignores=[])", include_attributes=True, ) check_text( 'spam(eggs, "and cheese")', - empty="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load()), Constant(value='and cheese')]))])", + empty="Module(body=[Expr(value=Call(func=Name(id='spam'), args=[Name(id='eggs'), Constant(value='and cheese')]))])", full="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load()), Constant(value='and cheese')], keywords=[]))], type_ignores=[])", ) check_text( 'spam(eggs, text="and cheese")', - empty="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load())], keywords=[keyword(arg='text', value=Constant(value='and cheese'))]))])", + empty="Module(body=[Expr(value=Call(func=Name(id='spam'), args=[Name(id='eggs')], keywords=[keyword(arg='text', value=Constant(value='and cheese'))]))])", full="Module(body=[Expr(value=Call(func=Name(id='spam', ctx=Load()), args=[Name(id='eggs', ctx=Load())], keywords=[keyword(arg='text', value=Constant(value='and cheese'))]))], type_ignores=[])", ) @@ -1610,12 +1632,12 @@ Module( self.assertEqual(src, ast.fix_missing_locations(src)) self.maxDiff = None self.assertEqual(ast.dump(src, include_attributes=True), - "Module(body=[Expr(value=Call(func=Name(id='write', ctx=Load(), " + "Module(body=[Expr(value=Call(func=Name(id='write', " "lineno=1, col_offset=0, end_lineno=1, end_col_offset=5), " "args=[Constant(value='spam', lineno=1, col_offset=6, end_lineno=1, " "end_col_offset=12)], lineno=1, col_offset=0, end_lineno=1, " "end_col_offset=13), lineno=1, col_offset=0, end_lineno=1, " - "end_col_offset=13), Expr(value=Call(func=Name(id='spam', ctx=Load(), " + "end_col_offset=13), Expr(value=Call(func=Name(id='spam', " "lineno=1, col_offset=0, end_lineno=1, end_col_offset=0), " "args=[Constant(value='eggs', lineno=1, col_offset=0, end_lineno=1, " "end_col_offset=0)], lineno=1, col_offset=0, end_lineno=1, " @@ -3335,7 +3357,7 @@ class CommandLineTests(unittest.TestCase): body=[ AnnAssign( target=Name(id='x', ctx=Store()), - annotation=Name(id='bool', ctx=Load()), + annotation=Name(id='bool'), value=Constant(value=1), simple=1)], type_ignores=[ @@ -3363,7 +3385,7 @@ class CommandLineTests(unittest.TestCase): expect = ''' Expression( body=Call( - func=Name(id='print', ctx=Load()), + func=Name(id='print'), args=[ Constant(value=1), Constant(value=2), @@ -3379,12 +3401,11 @@ class CommandLineTests(unittest.TestCase): expect = ''' FunctionType( argtypes=[ - Name(id='int', ctx=Load()), - Name(id='str', ctx=Load())], + Name(id='int'), + Name(id='str')], returns=Subscript( - value=Name(id='list', ctx=Load()), - slice=Name(id='int', ctx=Load()), - ctx=Load())) + value=Name(id='list'), + slice=Name(id='int'))) ''' for flag in ('-m=func_type', '--mode=func_type'): with self.subTest(flag=flag): @@ -3398,7 +3419,7 @@ class CommandLineTests(unittest.TestCase): body=[ AnnAssign( target=Name(id='x', ctx=Store()), - annotation=Name(id='bool', ctx=Load()), + annotation=Name(id='bool'), value=Constant(value=1), simple=1)]) ''' @@ -3443,7 +3464,7 @@ class CommandLineTests(unittest.TestCase): Module( body=[ Match( - subject=Name(id='x', ctx=Load()), + subject=Name(id='x'), cases=[ match_case( pattern=MatchValue( @@ -3466,7 +3487,7 @@ class CommandLineTests(unittest.TestCase): Module( body=[ Match( - subject=Name(id='a', ctx=Load()), + subject=Name(id='a'), cases=[ match_case( pattern=MatchValue( @@ -3492,7 +3513,7 @@ class CommandLineTests(unittest.TestCase): Module( body=[ Match( - subject=Name(id='a', ctx=Load()), + subject=Name(id='a'), cases=[ match_case( pattern=MatchValue( diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py index 2c44647bf3e..636cb33dd98 100644 --- a/Lib/test/test_asyncgen.py +++ b/Lib/test/test_asyncgen.py @@ -2021,6 +2021,15 @@ class TestUnawaitedWarnings(unittest.TestCase): g.athrow(RuntimeError) gc_collect() + def test_athrow_throws_immediately(self): + async def gen(): + yield 1 + + g = gen() + msg = "athrow expected at least 1 argument, got 0" + with self.assertRaisesRegex(TypeError, msg): + g.athrow() + def test_aclose(self): async def gen(): yield 1 diff --git a/Lib/test/test_capi/test_opt.py b/Lib/test/test_capi/test_opt.py index cb6eae48414..a292ebcc7f4 100644 --- a/Lib/test/test_capi/test_opt.py +++ b/Lib/test/test_capi/test_opt.py @@ -1183,6 +1183,17 @@ class TestUopsOptimization(unittest.TestCase): self.assertIsNotNone(ex) self.assertIn("_RETURN_GENERATOR", get_opnames(ex)) + def test_for_iter(self): + def testfunc(n): + t = 0 + for i in set(range(n)): + t += i + return t + res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD) + self.assertEqual(res, TIER2_THRESHOLD * (TIER2_THRESHOLD - 1) // 2) + self.assertIsNotNone(ex) + self.assertIn("_FOR_ITER_TIER_TWO", get_opnames(ex)) + @unittest.skip("Tracing into generators currently isn't supported.") def test_for_iter_gen(self): def gen(n): diff --git a/Lib/test/test_capi/test_unicode.py b/Lib/test/test_capi/test_unicode.py index 3408c10f426..6a9c60f3a6d 100644 --- a/Lib/test/test_capi/test_unicode.py +++ b/Lib/test/test_capi/test_unicode.py @@ -1739,6 +1739,20 @@ class CAPITest(unittest.TestCase): # Check that the second call returns the same result self.assertEqual(getargs_s_hash(s), chr(k).encode() * (i + 1)) + @support.cpython_only + @unittest.skipIf(_testcapi is None, 'need _testcapi module') + def test_GET_CACHED_HASH(self): + from _testcapi import unicode_GET_CACHED_HASH + content_bytes = b'some new string' + # avoid parser interning & constant folding + obj = str(content_bytes, 'ascii') + # impl detail: fresh strings do not have cached hash + self.assertEqual(unicode_GET_CACHED_HASH(obj), -1) + # impl detail: adding string to a dict caches its hash + {obj: obj} + # impl detail: ASCII string hashes are equal to bytes ones + self.assertEqual(unicode_GET_CACHED_HASH(obj), hash(content_bytes)) + class PyUnicodeWriterTest(unittest.TestCase): def create_writer(self, size): @@ -1776,6 +1790,13 @@ class PyUnicodeWriterTest(unittest.TestCase): self.assertEqual(writer.finish(), "ascii-latin1=\xE9-euro=\u20AC.") + def test_ascii(self): + writer = self.create_writer(0) + writer.write_ascii(b"Hello ", -1) + writer.write_ascii(b"", 0) + writer.write_ascii(b"Python! <truncated>", 6) + self.assertEqual(writer.finish(), "Hello Python") + def test_invalid_utf8(self): writer = self.create_writer(0) with self.assertRaises(UnicodeDecodeError): diff --git a/Lib/test/test_class.py b/Lib/test/test_class.py index 4c12d43556f..8c7a62a74ba 100644 --- a/Lib/test/test_class.py +++ b/Lib/test/test_class.py @@ -652,6 +652,7 @@ class ClassTests(unittest.TestCase): a = A(hash(A.f)^(-1)) hash(a.f) + @cpython_only def testSetattrWrapperNameIntern(self): # Issue #25794: __setattr__ should intern the attribute name class A: diff --git a/Lib/test/test_code.py b/Lib/test/test_code.py index 32cf8aacaf6..9fc2b047bef 100644 --- a/Lib/test/test_code.py +++ b/Lib/test/test_code.py @@ -701,6 +701,26 @@ class CodeTest(unittest.TestCase): 'checks': CO_FAST_LOCAL, 'res': CO_FAST_LOCAL, }, + defs.spam_full_args: { + 'a': POSONLY, + 'b': POSONLY, + 'c': POSORKW, + 'd': POSORKW, + 'e': KWONLY, + 'f': KWONLY, + 'args': VARARGS, + 'kwargs': VARKWARGS, + }, + defs.spam_full_args_with_defaults: { + 'a': POSONLY, + 'b': POSONLY, + 'c': POSORKW, + 'd': POSORKW, + 'e': KWONLY, + 'f': KWONLY, + 'args': VARARGS, + 'kwargs': VARKWARGS, + }, defs.spam_args_attrs_and_builtins: { 'a': POSONLY, 'b': POSONLY, @@ -714,6 +734,7 @@ class CodeTest(unittest.TestCase): defs.spam_returns_arg: { 'x': POSORKW, }, + defs.spam_raises: {}, defs.spam_with_inner_not_closure: { 'eggs': CO_FAST_LOCAL, }, @@ -934,6 +955,20 @@ class CodeTest(unittest.TestCase): purelocals=5, globalvars=6, ), + defs.spam_full_args: new_var_counts( + posonly=2, + posorkw=2, + kwonly=2, + varargs=1, + varkwargs=1, + ), + defs.spam_full_args_with_defaults: new_var_counts( + posonly=2, + posorkw=2, + kwonly=2, + varargs=1, + varkwargs=1, + ), defs.spam_args_attrs_and_builtins: new_var_counts( posonly=2, posorkw=2, @@ -945,6 +980,9 @@ class CodeTest(unittest.TestCase): defs.spam_returns_arg: new_var_counts( posorkw=1, ), + defs.spam_raises: new_var_counts( + globalvars=1, + ), defs.spam_with_inner_not_closure: new_var_counts( purelocals=1, ), @@ -1097,10 +1135,16 @@ class CodeTest(unittest.TestCase): def test_stateless(self): self.maxDiff = None + STATELESS_FUNCTIONS = [ + *defs.STATELESS_FUNCTIONS, + # stateless with defaults + defs.spam_full_args_with_defaults, + ] + for func in defs.STATELESS_CODE: with self.subTest((func, '(code)')): _testinternalcapi.verify_stateless_code(func.__code__) - for func in defs.STATELESS_FUNCTIONS: + for func in STATELESS_FUNCTIONS: with self.subTest((func, '(func)')): _testinternalcapi.verify_stateless_code(func) @@ -1110,7 +1154,7 @@ class CodeTest(unittest.TestCase): with self.assertRaises(Exception): _testinternalcapi.verify_stateless_code(func.__code__) - if func not in defs.STATELESS_FUNCTIONS: + if func not in STATELESS_FUNCTIONS: with self.subTest((func, '(func)')): with self.assertRaises(Exception): _testinternalcapi.verify_stateless_code(func) diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py index 9aace57633b..60feab225a1 100644 --- a/Lib/test/test_csv.py +++ b/Lib/test/test_csv.py @@ -1122,19 +1122,22 @@ class TestDialectValidity(unittest.TestCase): with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"quotechar" must be a 1-character string') + '"quotechar" must be a unicode character or None, ' + 'not a string of length 0') mydialect.quotechar = "''" with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"quotechar" must be a 1-character string') + '"quotechar" must be a unicode character or None, ' + 'not a string of length 2') mydialect.quotechar = 4 with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"quotechar" must be string or None, not int') + '"quotechar" must be a unicode character or None, ' + 'not int') def test_delimiter(self): class mydialect(csv.Dialect): @@ -1151,31 +1154,32 @@ class TestDialectValidity(unittest.TestCase): with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"delimiter" must be a 1-character string') + '"delimiter" must be a unicode character, ' + 'not a string of length 3') mydialect.delimiter = "" with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"delimiter" must be a 1-character string') + '"delimiter" must be a unicode character, not a string of length 0') mydialect.delimiter = b"," with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"delimiter" must be string, not bytes') + '"delimiter" must be a unicode character, not bytes') mydialect.delimiter = 4 with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"delimiter" must be string, not int') + '"delimiter" must be a unicode character, not int') mydialect.delimiter = None with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"delimiter" must be string, not NoneType') + '"delimiter" must be a unicode character, not NoneType') def test_escapechar(self): class mydialect(csv.Dialect): @@ -1189,20 +1193,32 @@ class TestDialectValidity(unittest.TestCase): self.assertEqual(d.escapechar, "\\") mydialect.escapechar = "" - with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'): + with self.assertRaises(csv.Error) as cm: mydialect() + self.assertEqual(str(cm.exception), + '"escapechar" must be a unicode character or None, ' + 'not a string of length 0') mydialect.escapechar = "**" - with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'): + with self.assertRaises(csv.Error) as cm: mydialect() + self.assertEqual(str(cm.exception), + '"escapechar" must be a unicode character or None, ' + 'not a string of length 2') mydialect.escapechar = b"*" - with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not bytes'): + with self.assertRaises(csv.Error) as cm: mydialect() + self.assertEqual(str(cm.exception), + '"escapechar" must be a unicode character or None, ' + 'not bytes') mydialect.escapechar = 4 - with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not int'): + with self.assertRaises(csv.Error) as cm: mydialect() + self.assertEqual(str(cm.exception), + '"escapechar" must be a unicode character or None, ' + 'not int') def test_lineterminator(self): class mydialect(csv.Dialect): @@ -1223,7 +1239,13 @@ class TestDialectValidity(unittest.TestCase): with self.assertRaises(csv.Error) as cm: mydialect() self.assertEqual(str(cm.exception), - '"lineterminator" must be a string') + '"lineterminator" must be a string, not int') + + mydialect.lineterminator = None + with self.assertRaises(csv.Error) as cm: + mydialect() + self.assertEqual(str(cm.exception), + '"lineterminator" must be a string, not NoneType') def test_invalid_chars(self): def create_invalid(field_name, value, **kwargs): diff --git a/Lib/test/test_ctypes/test_incomplete.py b/Lib/test/test_ctypes/test_incomplete.py index fefdfe9102e..3189fcd1bd1 100644 --- a/Lib/test/test_ctypes/test_incomplete.py +++ b/Lib/test/test_ctypes/test_incomplete.py @@ -1,6 +1,5 @@ import ctypes import unittest -import warnings from ctypes import Structure, POINTER, pointer, c_char_p # String-based "incomplete pointers" were implemented in ctypes 0.6.3 (2003, when @@ -21,9 +20,7 @@ class TestSetPointerType(unittest.TestCase): _fields_ = [("name", c_char_p), ("next", lpcell)] - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - ctypes.SetPointerType(lpcell, cell) + lpcell.set_type(cell) self.assertIs(POINTER(cell), lpcell) @@ -50,10 +47,9 @@ class TestSetPointerType(unittest.TestCase): _fields_ = [("name", c_char_p), ("next", lpcell)] - with self.assertWarns(DeprecationWarning): - ctypes.SetPointerType(lpcell, cell) - + lpcell.set_type(cell) self.assertIs(POINTER(cell), lpcell) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_dbm.py b/Lib/test/test_dbm.py index a10922a403e..7e8d78b8940 100644 --- a/Lib/test/test_dbm.py +++ b/Lib/test/test_dbm.py @@ -135,6 +135,67 @@ class AnyDBMTestCase: assert(f[key] == b"Python:") f.close() + def test_anydbm_readonly_reorganize(self): + self.init_db() + with dbm.open(_fname, 'r') as d: + # Early stopping. + if not hasattr(d, 'reorganize'): + self.skipTest("method reorganize not available this dbm submodule") + + self.assertRaises(dbm.error, lambda: d.reorganize()) + + def test_anydbm_reorganize_not_changed_content(self): + self.init_db() + with dbm.open(_fname, 'c') as d: + # Early stopping. + if not hasattr(d, 'reorganize'): + self.skipTest("method reorganize not available this dbm submodule") + + keys_before = sorted(d.keys()) + values_before = [d[k] for k in keys_before] + d.reorganize() + keys_after = sorted(d.keys()) + values_after = [d[k] for k in keys_before] + self.assertEqual(keys_before, keys_after) + self.assertEqual(values_before, values_after) + + def test_anydbm_reorganize_decreased_size(self): + + def _calculate_db_size(db_path): + if os.path.isfile(db_path): + return os.path.getsize(db_path) + total_size = 0 + for root, _, filenames in os.walk(db_path): + for filename in filenames: + file_path = os.path.join(root, filename) + total_size += os.path.getsize(file_path) + return total_size + + # This test requires relatively large databases to reliably show difference in size before and after reorganizing. + with dbm.open(_fname, 'n') as f: + # Early stopping. + if not hasattr(f, 'reorganize'): + self.skipTest("method reorganize not available this dbm submodule") + + for k in self._dict: + f[k.encode('ascii')] = self._dict[k] * 100000 + db_keys = list(f.keys()) + + # Make sure to calculate size of database only after file is closed to ensure file content are flushed to disk. + size_before = _calculate_db_size(os.path.dirname(_fname)) + + # Delete some elements from the start of the database. + keys_to_delete = db_keys[:len(db_keys) // 2] + with dbm.open(_fname, 'c') as f: + for k in keys_to_delete: + del f[k] + f.reorganize() + + # Make sure to calculate size of database only after file is closed to ensure file content are flushed to disk. + size_after = _calculate_db_size(os.path.dirname(_fname)) + + self.assertLess(size_after, size_before) + def test_open_with_bytes(self): dbm.open(os.fsencode(_fname), "c").close() diff --git a/Lib/test/test_dbm_gnu.py b/Lib/test/test_dbm_gnu.py index 66268c42a30..e0b988b7b95 100644 --- a/Lib/test/test_dbm_gnu.py +++ b/Lib/test/test_dbm_gnu.py @@ -74,12 +74,12 @@ class TestGdbm(unittest.TestCase): # Test the flag parameter open() by trying all supported flag modes. all = set(gdbm.open_flags) # Test standard flags (presumably "crwn"). - modes = all - set('fsu') + modes = all - set('fsum') for mode in sorted(modes): # put "c" mode first self.g = gdbm.open(filename, mode) self.g.close() - # Test additional flags (presumably "fsu"). + # Test additional flags (presumably "fsum"). flags = all - set('crwn') for mode in modes: for flag in flags: @@ -217,6 +217,29 @@ class TestGdbm(unittest.TestCase): create_empty_file(os.path.join(d, 'test')) self.assertRaises(gdbm.error, gdbm.open, filename, 'r') + @unittest.skipUnless('m' in gdbm.open_flags, "requires 'm' in open_flags") + def test_nommap_no_crash(self): + self.g = g = gdbm.open(filename, 'nm') + os.truncate(filename, 0) + + g.get(b'a', b'c') + g.keys() + g.firstkey() + g.nextkey(b'a') + with self.assertRaises(KeyError): + g[b'a'] + with self.assertRaises(gdbm.error): + len(g) + + with self.assertRaises(gdbm.error): + g[b'a'] = b'c' + with self.assertRaises(gdbm.error): + del g[b'a'] + with self.assertRaises(gdbm.error): + g.setdefault(b'a', b'c') + with self.assertRaises(gdbm.error): + g.reorganize() + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_decimal.py b/Lib/test/test_decimal.py index c0a1e378583..ef64b878805 100644 --- a/Lib/test/test_decimal.py +++ b/Lib/test/test_decimal.py @@ -981,6 +981,7 @@ class FormatTest: ('.0f', '0e-2', '0'), ('.0f', '3.14159265', '3'), ('.1f', '3.14159265', '3.1'), + ('.01f', '3.14159265', '3.1'), # leading zero in precision ('.4f', '3.14159265', '3.1416'), ('.6f', '3.14159265', '3.141593'), ('.7f', '3.14159265', '3.1415926'), # round-half-even! @@ -1066,6 +1067,7 @@ class FormatTest: ('8,', '123456', ' 123,456'), ('08,', '123456', '0,123,456'), # special case: extra 0 needed ('+08,', '123456', '+123,456'), # but not if there's a sign + ('008,', '123456', '0,123,456'), # leading zero in width (' 08,', '123456', ' 123,456'), ('08,', '-123456', '-123,456'), ('+09,', '123456', '+0,123,456'), diff --git a/Lib/test/test_dis.py b/Lib/test/test_dis.py index ec930a728aa..355990ed58e 100644 --- a/Lib/test/test_dis.py +++ b/Lib/test/test_dis.py @@ -606,7 +606,7 @@ dis_asyncwith = """\ POP_TOP L1: RESUME 0 -%4d LOAD_FAST_BORROW 0 (c) +%4d LOAD_FAST 0 (c) COPY 1 LOAD_SPECIAL 3 (__aexit__) SWAP 2 diff --git a/Lib/test/test_doctest/test_doctest.py b/Lib/test/test_doctest/test_doctest.py index c5b247797c3..72763d4a013 100644 --- a/Lib/test/test_doctest/test_doctest.py +++ b/Lib/test/test_doctest/test_doctest.py @@ -2269,20 +2269,22 @@ def test_DocTestSuite(): >>> suite = doctest.DocTestSuite(test.test_doctest.sample_doctest) >>> result = suite.run(unittest.TestResult()) >>> result - <unittest.result.TestResult run=9 errors=0 failures=4> + <unittest.result.TestResult run=9 errors=2 failures=2> >>> for tst, _ in result.failures: ... print(tst) - bad (test.test_doctest.sample_doctest.__test__) - foo (test.test_doctest.sample_doctest) - test_silly_setup (test.test_doctest.sample_doctest) - y_is_one (test.test_doctest.sample_doctest) + bad (test.test_doctest.sample_doctest.__test__) [0] + foo (test.test_doctest.sample_doctest) [0] + >>> for tst, _ in result.errors: + ... print(tst) + test_silly_setup (test.test_doctest.sample_doctest) [1] + y_is_one (test.test_doctest.sample_doctest) [0] We can also supply the module by name: >>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest') >>> result = suite.run(unittest.TestResult()) >>> result - <unittest.result.TestResult run=9 errors=0 failures=4> + <unittest.result.TestResult run=9 errors=2 failures=2> The module need not contain any doctest examples: @@ -2304,21 +2306,26 @@ def test_DocTestSuite(): >>> result <unittest.result.TestResult run=6 errors=0 failures=2> >>> len(result.skipped) - 2 + 7 >>> for tst, _ in result.skipped: ... print(tst) + double_skip (test.test_doctest.sample_doctest_skip) [0] + double_skip (test.test_doctest.sample_doctest_skip) [1] double_skip (test.test_doctest.sample_doctest_skip) + partial_skip_fail (test.test_doctest.sample_doctest_skip) [0] + partial_skip_pass (test.test_doctest.sample_doctest_skip) [0] + single_skip (test.test_doctest.sample_doctest_skip) [0] single_skip (test.test_doctest.sample_doctest_skip) >>> for tst, _ in result.failures: ... print(tst) - no_skip_fail (test.test_doctest.sample_doctest_skip) - partial_skip_fail (test.test_doctest.sample_doctest_skip) + no_skip_fail (test.test_doctest.sample_doctest_skip) [0] + partial_skip_fail (test.test_doctest.sample_doctest_skip) [1] We can use the current module: >>> suite = test.test_doctest.sample_doctest.test_suite() >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=4> + <unittest.result.TestResult run=9 errors=2 failures=2> We can also provide a DocTestFinder: @@ -2326,7 +2333,7 @@ def test_DocTestSuite(): >>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', ... test_finder=finder) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=4> + <unittest.result.TestResult run=9 errors=2 failures=2> The DocTestFinder need not return any tests: @@ -2342,7 +2349,7 @@ def test_DocTestSuite(): >>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', globs={}) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=5> + <unittest.result.TestResult run=9 errors=3 failures=2> Alternatively, we can provide extra globals. Here we'll make an error go away by providing an extra global variable: @@ -2350,7 +2357,7 @@ def test_DocTestSuite(): >>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', ... extraglobs={'y': 1}) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=3> + <unittest.result.TestResult run=9 errors=1 failures=2> You can pass option flags. Here we'll cause an extra error by disabling the blank-line feature: @@ -2358,7 +2365,7 @@ def test_DocTestSuite(): >>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', ... optionflags=doctest.DONT_ACCEPT_BLANKLINE) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=5> + <unittest.result.TestResult run=9 errors=2 failures=3> You can supply setUp and tearDown functions: @@ -2375,7 +2382,7 @@ def test_DocTestSuite(): >>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', ... setUp=setUp, tearDown=tearDown) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=3> + <unittest.result.TestResult run=9 errors=1 failures=2> But the tearDown restores sanity: @@ -2393,7 +2400,7 @@ def test_DocTestSuite(): >>> suite = doctest.DocTestSuite('test.test_doctest.sample_doctest', setUp=setUp) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=3> + <unittest.result.TestResult run=9 errors=1 failures=2> Here, we didn't need to use a tearDown function because we modified the test globals, which are a copy of the @@ -2409,162 +2416,97 @@ def test_DocTestSuite_errors(): >>> suite = doctest.DocTestSuite(mod) >>> result = suite.run(unittest.TestResult()) >>> result - <unittest.result.TestResult run=4 errors=0 failures=4> + <unittest.result.TestResult run=4 errors=6 failures=3> >>> print(result.failures[0][1]) # doctest: +ELLIPSIS Traceback (most recent call last): - File ... - raise self.failureException(self.format_failure(new.getvalue())) - AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors - File "...sample_doctest_errors.py", line 0, in sample_doctest_errors - <BLANKLINE> - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line 5, in test.test_doctest.sample_doctest_errors - Failed example: + File "...sample_doctest_errors.py", line 5, in test.test_doctest.sample_doctest_errors + >...>> 2 + 2 + AssertionError: Failed example: 2 + 2 Expected: 5 Got: 4 - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line 7, in test.test_doctest.sample_doctest_errors - Failed example: - 1/0 - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "<doctest test.test_doctest.sample_doctest_errors[1]>", line 1, in <module> - 1/0 - ~^~ - ZeroDivisionError: division by zero - <BLANKLINE> <BLANKLINE> >>> print(result.failures[1][1]) # doctest: +ELLIPSIS Traceback (most recent call last): - File ... - raise self.failureException(self.format_failure(new.getvalue())) - AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors.__test__.bad - File "...sample_doctest_errors.py", line unknown line number, in bad - <BLANKLINE> - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line ?, in test.test_doctest.sample_doctest_errors.__test__.bad - Failed example: + File "...sample_doctest_errors.py", line None, in test.test_doctest.sample_doctest_errors.__test__.bad + AssertionError: Failed example: 2 + 2 Expected: 5 Got: 4 - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line ?, in test.test_doctest.sample_doctest_errors.__test__.bad - Failed example: - 1/0 - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "<doctest test.test_doctest.sample_doctest_errors.__test__.bad[1]>", line 1, in <module> - 1/0 - ~^~ - ZeroDivisionError: division by zero - <BLANKLINE> <BLANKLINE> >>> print(result.failures[2][1]) # doctest: +ELLIPSIS Traceback (most recent call last): - File ... - raise self.failureException(self.format_failure(new.getvalue())) - AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors.errors - File "...sample_doctest_errors.py", line 14, in errors - <BLANKLINE> - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line 16, in test.test_doctest.sample_doctest_errors.errors - Failed example: + File "...sample_doctest_errors.py", line 16, in test.test_doctest.sample_doctest_errors.errors + >...>> 2 + 2 + AssertionError: Failed example: 2 + 2 Expected: 5 Got: 4 - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line 18, in test.test_doctest.sample_doctest_errors.errors - Failed example: + <BLANKLINE> + >>> print(result.errors[0][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...sample_doctest_errors.py", line 7, in test.test_doctest.sample_doctest_errors + >...>> 1/0 + File "<doctest test.test_doctest.sample_doctest_errors[1]>", line 1, in <module> 1/0 - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "<doctest test.test_doctest.sample_doctest_errors.errors[1]>", line 1, in <module> - 1/0 - ~^~ - ZeroDivisionError: division by zero - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line 23, in test.test_doctest.sample_doctest_errors.errors - Failed example: - f() - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "<doctest test.test_doctest.sample_doctest_errors.errors[3]>", line 1, in <module> - f() - ~^^ - File "<doctest test.test_doctest.sample_doctest_errors.errors[2]>", line 2, in f - 2 + '2' - ~~^~~~~ - TypeError: ... - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line 25, in test.test_doctest.sample_doctest_errors.errors - Failed example: - g() - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "<doctest test.test_doctest.sample_doctest_errors.errors[4]>", line 1, in <module> - g() - ~^^ - File "...sample_doctest_errors.py", line 12, in g - [][0] # line 12 - ~~^^^ - IndexError: list index out of range + ~^~ + ZeroDivisionError: division by zero <BLANKLINE> + >>> print(result.errors[1][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...sample_doctest_errors.py", line None, in test.test_doctest.sample_doctest_errors.__test__.bad + File "<doctest test.test_doctest.sample_doctest_errors.__test__.bad[1]>", line 1, in <module> + 1/0 + ~^~ + ZeroDivisionError: division by zero <BLANKLINE> - >>> print(result.failures[3][1]) # doctest: +ELLIPSIS + >>> print(result.errors[2][1]) # doctest: +ELLIPSIS Traceback (most recent call last): - File ... - raise self.failureException(self.format_failure(new.getvalue())) - AssertionError: Failed doctest test for test.test_doctest.sample_doctest_errors.syntax_error - File "...sample_doctest_errors.py", line 29, in syntax_error + File "...sample_doctest_errors.py", line 18, in test.test_doctest.sample_doctest_errors.errors + >...>> 1/0 + File "<doctest test.test_doctest.sample_doctest_errors.errors[1]>", line 1, in <module> + 1/0 + ~^~ + ZeroDivisionError: division by zero <BLANKLINE> - ---------------------------------------------------------------------- - File "...sample_doctest_errors.py", line 31, in test.test_doctest.sample_doctest_errors.syntax_error - Failed example: - 2+*3 - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^ - File "<doctest test.test_doctest.sample_doctest_errors.syntax_error[0]>", line 1 - 2+*3 - ^ - SyntaxError: invalid syntax + >>> print(result.errors[3][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...sample_doctest_errors.py", line 23, in test.test_doctest.sample_doctest_errors.errors + >...>> f() + File "<doctest test.test_doctest.sample_doctest_errors.errors[3]>", line 1, in <module> + f() + ~^^ + File "<doctest test.test_doctest.sample_doctest_errors.errors[2]>", line 2, in f + 2 + '2' + ~~^~~~~ + TypeError: ... <BLANKLINE> + >>> print(result.errors[4][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...sample_doctest_errors.py", line 25, in test.test_doctest.sample_doctest_errors.errors + >...>> g() + File "<doctest test.test_doctest.sample_doctest_errors.errors[4]>", line 1, in <module> + g() + ~^^ + File "...sample_doctest_errors.py", line 12, in g + [][0] # line 12 + ~~^^^ + IndexError: list index out of range + <BLANKLINE> + >>> print(result.errors[5][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...sample_doctest_errors.py", line 31, in test.test_doctest.sample_doctest_errors.syntax_error + >...>> 2+*3 + File "<doctest test.test_doctest.sample_doctest_errors.syntax_error[0]>", line 1 + 2+*3 + ^ + SyntaxError: invalid syntax <BLANKLINE> """ @@ -2579,7 +2521,7 @@ def test_DocFileSuite(): ... 'test_doctest2.txt', ... 'test_doctest4.txt') >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=3 errors=0 failures=2> + <unittest.result.TestResult run=3 errors=2 failures=0> The test files are looked for in the directory containing the calling module. A package keyword argument can be provided to @@ -2591,14 +2533,14 @@ def test_DocFileSuite(): ... 'test_doctest4.txt', ... package='test.test_doctest') >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=3 errors=0 failures=2> + <unittest.result.TestResult run=3 errors=2 failures=0> '/' should be used as a path separator. It will be converted to a native separator at run time: >>> suite = doctest.DocFileSuite('../test_doctest/test_doctest.txt') >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=1 errors=0 failures=1> + <unittest.result.TestResult run=1 errors=1 failures=0> If DocFileSuite is used from an interactive session, then files are resolved relative to the directory of sys.argv[0]: @@ -2624,7 +2566,7 @@ def test_DocFileSuite(): >>> suite = doctest.DocFileSuite(test_file, module_relative=False) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=1 errors=0 failures=1> + <unittest.result.TestResult run=1 errors=1 failures=0> It is an error to specify `package` when `module_relative=False`: @@ -2642,12 +2584,15 @@ def test_DocFileSuite(): ... 'test_doctest_skip2.txt') >>> result = suite.run(unittest.TestResult()) >>> result - <unittest.result.TestResult run=4 errors=0 failures=1> + <unittest.result.TestResult run=4 errors=1 failures=0> >>> len(result.skipped) - 1 + 4 >>> for tst, _ in result.skipped: # doctest: +ELLIPSIS ... print('=', tst) + = ...test_doctest_skip.txt [0] + = ...test_doctest_skip.txt [1] = ...test_doctest_skip.txt + = ...test_doctest_skip2.txt [0] You can specify initial global variables: @@ -2656,7 +2601,7 @@ def test_DocFileSuite(): ... 'test_doctest4.txt', ... globs={'favorite_color': 'blue'}) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=3 errors=0 failures=1> + <unittest.result.TestResult run=3 errors=1 failures=0> In this case, we supplied a missing favorite color. You can provide doctest options: @@ -2667,7 +2612,7 @@ def test_DocFileSuite(): ... optionflags=doctest.DONT_ACCEPT_BLANKLINE, ... globs={'favorite_color': 'blue'}) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=3 errors=0 failures=2> + <unittest.result.TestResult run=3 errors=1 failures=1> And, you can provide setUp and tearDown functions: @@ -2686,7 +2631,7 @@ def test_DocFileSuite(): ... 'test_doctest4.txt', ... setUp=setUp, tearDown=tearDown) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=3 errors=0 failures=1> + <unittest.result.TestResult run=3 errors=1 failures=0> But the tearDown restores sanity: @@ -2728,7 +2673,7 @@ def test_DocFileSuite(): ... 'test_doctest4.txt', ... encoding='utf-8') >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=3 errors=0 failures=2> + <unittest.result.TestResult run=3 errors=2 failures=0> """ def test_DocFileSuite_errors(): @@ -2738,72 +2683,49 @@ def test_DocFileSuite_errors(): >>> suite = doctest.DocFileSuite('test_doctest_errors.txt') >>> result = suite.run(unittest.TestResult()) >>> result - <unittest.result.TestResult run=1 errors=0 failures=1> + <unittest.result.TestResult run=1 errors=3 failures=1> >>> print(result.failures[0][1]) # doctest: +ELLIPSIS Traceback (most recent call last): - File ... - raise self.failureException(self.format_failure(new.getvalue())) - AssertionError: Failed doctest test for test_doctest_errors.txt - File "...test_doctest_errors.txt", line 0 - <BLANKLINE> - ---------------------------------------------------------------------- - File "...test_doctest_errors.txt", line 4, in test_doctest_errors.txt - Failed example: + File "...test_doctest_errors.txt", line 4, in test_doctest_errors.txt + >...>> 2 + 2 + AssertionError: Failed example: 2 + 2 Expected: 5 Got: 4 - ---------------------------------------------------------------------- - File "...test_doctest_errors.txt", line 6, in test_doctest_errors.txt - Failed example: + <BLANKLINE> + >>> print(result.errors[0][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...test_doctest_errors.txt", line 6, in test_doctest_errors.txt + >...>> 1/0 + File "<doctest test_doctest_errors.txt[1]>", line 1, in <module> 1/0 - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "<doctest test_doctest_errors.txt[1]>", line 1, in <module> - 1/0 - ~^~ - ZeroDivisionError: division by zero - ---------------------------------------------------------------------- - File "...test_doctest_errors.txt", line 11, in test_doctest_errors.txt - Failed example: + ~^~ + ZeroDivisionError: division by zero + <BLANKLINE> + >>> print(result.errors[1][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...test_doctest_errors.txt", line 11, in test_doctest_errors.txt + >...>> f() + File "<doctest test_doctest_errors.txt[3]>", line 1, in <module> f() - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "<doctest test_doctest_errors.txt[3]>", line 1, in <module> - f() - ~^^ - File "<doctest test_doctest_errors.txt[2]>", line 2, in f - 2 + '2' - ~~^~~~~ - TypeError: ... - ---------------------------------------------------------------------- - File "...test_doctest_errors.txt", line 13, in test_doctest_errors.txt - Failed example: - 2+*3 - Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^ - File "<doctest test_doctest_errors.txt[4]>", line 1 - 2+*3 - ^ - SyntaxError: invalid syntax + ~^^ + File "<doctest test_doctest_errors.txt[2]>", line 2, in f + 2 + '2' + ~~^~~~~ + TypeError: ... <BLANKLINE> + >>> print(result.errors[2][1]) # doctest: +ELLIPSIS + Traceback (most recent call last): + File "...test_doctest_errors.txt", line 13, in test_doctest_errors.txt + >...>> 2+*3 + File "<doctest test_doctest_errors.txt[4]>", line 1 + 2+*3 + ^ + SyntaxError: invalid syntax <BLANKLINE> + """ def test_trailing_space_in_test(): @@ -2874,15 +2796,25 @@ def test_unittest_reportflags(): >>> import unittest >>> result = suite.run(unittest.TestResult()) >>> result - <unittest.result.TestResult run=1 errors=0 failures=1> + <unittest.result.TestResult run=1 errors=1 failures=1> >>> print(result.failures[0][1]) # doctest: +ELLIPSIS - Traceback ... - Failed example: - favorite_color - ... - Failed example: + Traceback (most recent call last): + File ... + >...>> if 1: + AssertionError: Failed example: if 1: - ... + print('a') + print() + print('b') + Expected: + a + <BLANKLINE> + b + Got: + a + <BLANKLINE> + b + <BLANKLINE> Note that we see both failures displayed. @@ -2891,18 +2823,8 @@ def test_unittest_reportflags(): Now, when we run the test: - >>> result = suite.run(unittest.TestResult()) - >>> result - <unittest.result.TestResult run=1 errors=0 failures=1> - >>> print(result.failures[0][1]) # doctest: +ELLIPSIS - Traceback ... - Failed example: - favorite_color - Exception raised: - ... - NameError: name 'favorite_color' is not defined - <BLANKLINE> - <BLANKLINE> + >>> suite.run(unittest.TestResult()) + <unittest.result.TestResult run=1 errors=1 failures=0> We get only the first failure. @@ -2912,21 +2834,20 @@ def test_unittest_reportflags(): >>> suite = doctest.DocFileSuite('test_doctest.txt', ... optionflags=doctest.DONT_ACCEPT_BLANKLINE | doctest.REPORT_NDIFF) - Then the default eporting options are ignored: + Then the default reporting options are ignored: >>> result = suite.run(unittest.TestResult()) >>> result - <unittest.result.TestResult run=1 errors=0 failures=1> + <unittest.result.TestResult run=1 errors=1 failures=1> *NOTE*: These doctest are intentionally not placed in raw string to depict the trailing whitespace using `\x20` in the diff below. >>> print(result.failures[0][1]) # doctest: +ELLIPSIS Traceback ... - Failed example: - favorite_color - ... - Failed example: + File ... + >...>> if 1: + AssertionError: Failed example: if 1: print('a') print() @@ -2937,7 +2858,6 @@ def test_unittest_reportflags(): +\x20 b <BLANKLINE> - <BLANKLINE> Test runners can restore the formatting flags after they run: @@ -3145,11 +3065,6 @@ Tests for error reporting in the testfile() function. 1/0 Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest test_doctest_errors.txt[1]>", line 1, in <module> 1/0 ~^~ @@ -3160,11 +3075,6 @@ Tests for error reporting in the testfile() function. f() Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest test_doctest_errors.txt[3]>", line 1, in <module> f() ~^^ @@ -3177,12 +3087,6 @@ Tests for error reporting in the testfile() function. Failed example: 2+*3 Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^ File "<doctest test_doctest_errors.txt[4]>", line 1 2+*3 ^ @@ -3343,11 +3247,6 @@ Tests for error reporting in the testmod() function. 1/0 Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest test.test_doctest.sample_doctest_errors[1]>", line 1, in <module> 1/0 ~^~ @@ -3366,11 +3265,6 @@ Tests for error reporting in the testmod() function. 1/0 Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest test.test_doctest.sample_doctest_errors.__test__.bad[1]>", line 1, in <module> 1/0 ~^~ @@ -3389,11 +3283,6 @@ Tests for error reporting in the testmod() function. 1/0 Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest test.test_doctest.sample_doctest_errors.errors[1]>", line 1, in <module> 1/0 ~^~ @@ -3404,11 +3293,6 @@ Tests for error reporting in the testmod() function. f() Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest test.test_doctest.sample_doctest_errors.errors[3]>", line 1, in <module> f() ~^^ @@ -3422,11 +3306,6 @@ Tests for error reporting in the testmod() function. g() Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest test.test_doctest.sample_doctest_errors.errors[4]>", line 1, in <module> g() ~^^ @@ -3439,12 +3318,6 @@ Tests for error reporting in the testmod() function. Failed example: 2+*3 Exception raised: - Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^ File "<doctest test.test_doctest.sample_doctest_errors.syntax_error[0]>", line 1 2+*3 ^ @@ -3490,11 +3363,6 @@ Check doctest with a non-ascii filename: raise Exception('clé') Exception raised: Traceback (most recent call last): - File ... - exec(compile(example.source, filename, "single", - ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - compileflags, True), test.globs) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "<doctest foo-bär@baz[0]>", line 1, in <module> raise Exception('clé') Exception: clé @@ -3787,9 +3655,9 @@ def test_run_doctestsuite_multiple_times(): >>> import test.test_doctest.sample_doctest >>> suite = doctest.DocTestSuite(test.test_doctest.sample_doctest) >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=4> + <unittest.result.TestResult run=9 errors=2 failures=2> >>> suite.run(unittest.TestResult()) - <unittest.result.TestResult run=9 errors=0 failures=4> + <unittest.result.TestResult run=9 errors=2 failures=2> """ diff --git a/Lib/test/test_email/test__header_value_parser.py b/Lib/test/test_email/test__header_value_parser.py index fd4ac2c404c..179e236ecdf 100644 --- a/Lib/test/test_email/test__header_value_parser.py +++ b/Lib/test/test_email/test__header_value_parser.py @@ -2491,6 +2491,38 @@ class TestParser(TestParserMixin, TestEmailBase): self.assertEqual(address.all_mailboxes[0].domain, 'example.com') self.assertEqual(address.all_mailboxes[0].addr_spec, '"example example"@example.com') + def test_get_address_with_invalid_domain(self): + address = self._test_get_x(parser.get_address, + '<T@[', + '<T@[]>', + '<T@[]>', + [errors.InvalidHeaderDefect, # missing trailing '>' on angle-addr + errors.InvalidHeaderDefect, # end of input inside domain-literal + ], + '') + self.assertEqual(address.token_type, 'address') + self.assertEqual(len(address.mailboxes), 0) + self.assertEqual(len(address.all_mailboxes), 1) + self.assertEqual(address.all_mailboxes[0].domain, '[]') + self.assertEqual(address.all_mailboxes[0].local_part, 'T') + self.assertEqual(address.all_mailboxes[0].token_type, 'invalid-mailbox') + self.assertEqual(address[0].token_type, 'invalid-mailbox') + + address = self._test_get_x(parser.get_address, + '!an??:=m==fr2@[C', + '!an??:=m==fr2@[C];', + '!an??:=m==fr2@[C];', + [errors.InvalidHeaderDefect, # end of header in group + errors.InvalidHeaderDefect, # end of input inside domain-literal + ], + '') + self.assertEqual(address.token_type, 'address') + self.assertEqual(len(address.mailboxes), 0) + self.assertEqual(len(address.all_mailboxes), 1) + self.assertEqual(address.all_mailboxes[0].domain, '[C]') + self.assertEqual(address.all_mailboxes[0].local_part, '=m==fr2') + self.assertEqual(address.all_mailboxes[0].token_type, 'invalid-mailbox') + self.assertEqual(address[0].token_type, 'group') # get_address_list @@ -2765,6 +2797,19 @@ class TestParser(TestParserMixin, TestEmailBase): ) self.assertEqual(message_id.token_type, 'message-id') + def test_parse_message_id_with_invalid_domain(self): + message_id = self._test_parse_x( + parser.parse_message_id, + "<T@[", + "<T@[]>", + "<T@[]>", + [errors.ObsoleteHeaderDefect] + [errors.InvalidHeaderDefect] * 2, + [], + ) + self.assertEqual(message_id.token_type, 'message-id') + self.assertEqual(str(message_id.all_defects[-1]), + "end of input inside domain-literal") + def test_parse_message_id_with_remaining(self): message_id = self._test_parse_x( parser.parse_message_id, diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index 2b4b63a030b..303af25fc7a 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -114,17 +114,17 @@ class TestGetStackTrace(unittest.TestCase): p.wait(timeout=SHORT_TIMEOUT) thread_expected_stack_trace = [ - ("foo", script_name, 15), - ("baz", script_name, 12), - ("bar", script_name, 9), - ('Thread.run', threading.__file__, ANY) + (script_name, 15, "foo"), + (script_name, 12, "baz"), + (script_name, 9, "bar"), + (threading.__file__, ANY, 'Thread.run') ] # Is possible that there are more threads, so we check that the # expected stack traces are in the result (looking at you Windows!) self.assertIn((ANY, thread_expected_stack_trace), stack_trace) # Check that the main thread stack trace is in the result - frame = ("<module>", script_name, 19) + frame = (script_name, 19, "<module>") for _, stack in stack_trace: if frame in stack: break @@ -222,47 +222,47 @@ class TestGetStackTrace(unittest.TestCase): root_task = "Task-1" expected_stack_trace = [ [ - ("c5", script_name, 10), - ("c4", script_name, 14), - ("c3", script_name, 17), - ("c2", script_name, 20), + (script_name, 10, "c5"), + (script_name, 14, "c4"), + (script_name, 17, "c3"), + (script_name, 20, "c2"), ], "c2_root", [ [ [ ( - "TaskGroup._aexit", taskgroups.__file__, ANY, + "TaskGroup._aexit" ), ( - "TaskGroup.__aexit__", taskgroups.__file__, ANY, + "TaskGroup.__aexit__" ), - ("main", script_name, 26), + (script_name, 26, "main"), ], "Task-1", [], ], [ - [("c1", script_name, 23)], + [(script_name, 23, "c1")], "sub_main_1", [ [ [ ( - "TaskGroup._aexit", taskgroups.__file__, ANY, + "TaskGroup._aexit" ), ( - "TaskGroup.__aexit__", taskgroups.__file__, ANY, + "TaskGroup.__aexit__" ), - ("main", script_name, 26), + (script_name, 26, "main"), ], "Task-1", [], @@ -270,22 +270,22 @@ class TestGetStackTrace(unittest.TestCase): ], ], [ - [("c1", script_name, 23)], + [(script_name, 23, "c1")], "sub_main_2", [ [ [ ( - "TaskGroup._aexit", taskgroups.__file__, ANY, + "TaskGroup._aexit" ), ( - "TaskGroup.__aexit__", taskgroups.__file__, ANY, + "TaskGroup.__aexit__" ), - ("main", script_name, 26), + (script_name, 26, "main"), ], "Task-1", [], @@ -363,9 +363,9 @@ class TestGetStackTrace(unittest.TestCase): expected_stack_trace = [ [ - ("gen_nested_call", script_name, 10), - ("gen", script_name, 16), - ("main", script_name, 19), + (script_name, 10, "gen_nested_call"), + (script_name, 16, "gen"), + (script_name, 19, "main"), ], "Task-1", [], @@ -439,9 +439,9 @@ class TestGetStackTrace(unittest.TestCase): stack_trace[2].sort(key=lambda x: x[1]) expected_stack_trace = [ - [("deep", script_name, 11), ("c1", script_name, 15)], + [(script_name, 11, "deep"), (script_name, 15, "c1")], "Task-2", - [[[("main", script_name, 21)], "Task-1", []]], + [[[(script_name, 21, "main")], "Task-1", []]], ] self.assertEqual(stack_trace, expected_stack_trace) @@ -515,16 +515,16 @@ class TestGetStackTrace(unittest.TestCase): stack_trace[2].sort(key=lambda x: x[1]) expected_stack_trace = [ [ - ("deep", script_name, 11), - ("c1", script_name, 15), - ("staggered_race.<locals>.run_one_coro", staggered.__file__, ANY), + (script_name, 11, "deep"), + (script_name, 15, "c1"), + (staggered.__file__, ANY, "staggered_race.<locals>.run_one_coro"), ], "Task-2", [ [ [ - ("staggered_race", staggered.__file__, ANY), - ("main", script_name, 21), + (staggered.__file__, ANY, "staggered_race"), + (script_name, 21, "main"), ], "Task-1", [], @@ -662,16 +662,16 @@ class TestGetStackTrace(unittest.TestCase): self.assertIn((ANY, "Task-1", []), entries) main_stack = [ ( - "TaskGroup._aexit", taskgroups.__file__, ANY, + "TaskGroup._aexit", ), ( - "TaskGroup.__aexit__", taskgroups.__file__, ANY, + "TaskGroup.__aexit__", ), - ("main", script_name, 60), + (script_name, 60, "main"), ] self.assertIn( (ANY, "server task", [[main_stack, ANY]]), @@ -686,16 +686,16 @@ class TestGetStackTrace(unittest.TestCase): [ [ ( - "TaskGroup._aexit", taskgroups.__file__, ANY, + "TaskGroup._aexit", ), ( - "TaskGroup.__aexit__", taskgroups.__file__, ANY, + "TaskGroup.__aexit__", ), - ("echo_client_spam", script_name, 41), + (script_name, 41, "echo_client_spam"), ], ANY, ] @@ -741,14 +741,14 @@ class TestGetStackTrace(unittest.TestCase): stack[:2], [ ( - "get_stack_trace", __file__, get_stack_trace.__code__.co_firstlineno + 2, + "get_stack_trace", ), ( - "TestGetStackTrace.test_self_trace", __file__, self.test_self_trace.__code__.co_firstlineno + 6, + "TestGetStackTrace.test_self_trace", ), ] ) diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py index 96b3f305194..d1d2739856c 100644 --- a/Lib/test/test_fractions.py +++ b/Lib/test/test_fractions.py @@ -1518,6 +1518,8 @@ class FractionTest(unittest.TestCase): (F(51, 1000), '.1f', '0.1'), (F(149, 1000), '.1f', '0.1'), (F(151, 1000), '.1f', '0.2'), + (F(22, 7), '.02f', '3.14'), # issue gh-130662 + (F(22, 7), '005.02f', '03.14'), ] for fraction, spec, expected in testcases: with self.subTest(fraction=fraction, spec=spec): @@ -1616,12 +1618,6 @@ class FractionTest(unittest.TestCase): '=010%', '>00.2f', '>00f', - # Too many zeros - minimum width should not have leading zeros - '006f', - # Leading zeros in precision - '.010f', - '.02f', - '.000f', # Missing precision '.e', '.f', diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools.py index a754b4f9ea9..8360afbf78c 100644 --- a/Lib/test/test_free_threading/test_itertools_batched.py +++ b/Lib/test/test_free_threading/test_itertools.py @@ -1,15 +1,15 @@ import unittest from threading import Thread, Barrier -from itertools import batched +from itertools import batched, cycle from test.support import threading_helper threading_helper.requires_working_threading(module=True) -class EnumerateThreading(unittest.TestCase): +class ItertoolsThreading(unittest.TestCase): @threading_helper.reap_threads - def test_threading(self): + def test_batched(self): number_of_threads = 10 number_of_iterations = 20 barrier = Barrier(number_of_threads) @@ -34,5 +34,31 @@ class EnumerateThreading(unittest.TestCase): barrier.reset() + @threading_helper.reap_threads + def test_cycle(self): + number_of_threads = 6 + number_of_iterations = 10 + number_of_cycles = 400 + + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + for _ in range(number_of_cycles): + _ = next(it) + + data = (1, 2, 3, 4) + for it in range(number_of_iterations): + cycle_iterator = cycle(data) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[cycle_iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py index c39565144bf..7f5d48b9c63 100644 --- a/Lib/test/test_grammar.py +++ b/Lib/test/test_grammar.py @@ -1,7 +1,7 @@ # Python test set -- part 1, grammar. # This just tests whether the parser accepts them all. -from test.support import check_syntax_error +from test.support import check_syntax_error, skip_wasi_stack_overflow from test.support import import_helper import annotationlib import inspect @@ -249,6 +249,18 @@ the \'lazy\' dog.\n\ compile(s, "<test>", "exec") self.assertIn("was never closed", str(cm.exception)) + @skip_wasi_stack_overflow() + def test_max_level(self): + # Macro defined in Parser/lexer/state.h + MAXLEVEL = 200 + + result = eval("(" * MAXLEVEL + ")" * MAXLEVEL) + self.assertEqual(result, ()) + + with self.assertRaises(SyntaxError) as cm: + eval("(" * (MAXLEVEL + 1) + ")" * (MAXLEVEL + 1)) + self.assertStartsWith(str(cm.exception), 'too many nested parentheses') + var_annot_global: int # a global annotated is necessary for test_var_annot diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py index 161c7652d7a..b83ae181718 100644 --- a/Lib/test/test_hashlib.py +++ b/Lib/test/test_hashlib.py @@ -12,6 +12,7 @@ import io import itertools import logging import os +import re import sys import sysconfig import tempfile @@ -97,6 +98,14 @@ def read_vectors(hash_name): yield parts +DEPRECATED_STRING_PARAMETER = re.escape( + "the 'string' keyword parameter is deprecated since " + "Python 3.15 and slated for removal in Python 3.19; " + "use the 'data' keyword parameter or pass the data " + "to hash as a positional argument instead" +) + + class HashLibTestCase(unittest.TestCase): supported_hash_names = ( 'md5', 'MD5', 'sha1', 'SHA1', 'sha224', 'SHA224', 'sha256', 'SHA256', @@ -140,11 +149,10 @@ class HashLibTestCase(unittest.TestCase): # of hashlib.new given the algorithm name. for algorithm, constructors in self.constructors_to_test.items(): constructors.add(getattr(hashlib, algorithm)) - def _test_algorithm_via_hashlib_new(data=None, _alg=algorithm, **kwargs): - if data is None: - return hashlib.new(_alg, **kwargs) - return hashlib.new(_alg, data, **kwargs) - constructors.add(_test_algorithm_via_hashlib_new) + def c(*args, __algorithm_name=algorithm, **kwargs): + return hashlib.new(__algorithm_name, *args, **kwargs) + c.__name__ = f'do_test_algorithm_via_hashlib_new_{algorithm}' + constructors.add(c) _hashlib = self._conditional_import_module('_hashlib') self._hashlib = _hashlib @@ -249,6 +257,81 @@ class HashLibTestCase(unittest.TestCase): self._hashlib.new("md5", usedforsecurity=False) self._hashlib.openssl_md5(usedforsecurity=False) + @unittest.skipIf(get_fips_mode(), "skip in FIPS mode") + def test_clinic_signature(self): + for constructor in self.hash_constructors: + with self.subTest(constructor.__name__): + constructor(b'') + constructor(data=b'') + with self.assertWarnsRegex(DeprecationWarning, + DEPRECATED_STRING_PARAMETER): + constructor(string=b'') + + digest_name = constructor(b'').name + with self.subTest(digest_name): + hashlib.new(digest_name, b'') + hashlib.new(digest_name, data=b'') + with self.assertWarnsRegex(DeprecationWarning, + DEPRECATED_STRING_PARAMETER): + hashlib.new(digest_name, string=b'') + if self._hashlib: + self._hashlib.new(digest_name, b'') + self._hashlib.new(digest_name, data=b'') + with self.assertWarnsRegex(DeprecationWarning, + DEPRECATED_STRING_PARAMETER): + self._hashlib.new(digest_name, string=b'') + + @unittest.skipIf(get_fips_mode(), "skip in FIPS mode") + def test_clinic_signature_errors(self): + nomsg = b'' + mymsg = b'msg' + conflicting_call = re.escape( + "'data' and 'string' are mutually exclusive " + "and support for 'string' keyword parameter " + "is slated for removal in a future version." + ) + duplicated_param = re.escape("given by name ('data') and position") + unexpected_param = re.escape("got an unexpected keyword argument '_'") + for args, kwds, errmsg in [ + # Reject duplicated arguments before unknown keyword arguments. + ((nomsg,), dict(data=nomsg, _=nomsg), duplicated_param), + ((mymsg,), dict(data=nomsg, _=nomsg), duplicated_param), + # Reject duplicated arguments before conflicting ones. + *itertools.product( + [[nomsg], [mymsg]], + [dict(data=nomsg), dict(data=nomsg, string=nomsg)], + [duplicated_param] + ), + # Reject unknown keyword arguments before conflicting ones. + *itertools.product( + [()], + [ + dict(_=None), + dict(data=nomsg, _=None), + dict(string=nomsg, _=None), + dict(string=nomsg, data=nomsg, _=None), + ], + [unexpected_param] + ), + ((nomsg,), dict(_=None), unexpected_param), + ((mymsg,), dict(_=None), unexpected_param), + # Reject conflicting arguments. + [(nomsg,), dict(string=nomsg), conflicting_call], + [(mymsg,), dict(string=nomsg), conflicting_call], + [(), dict(data=nomsg, string=nomsg), conflicting_call], + ]: + for constructor in self.hash_constructors: + digest_name = constructor(b'').name + with self.subTest(constructor.__name__, args=args, kwds=kwds): + with self.assertRaisesRegex(TypeError, errmsg): + constructor(*args, **kwds) + with self.subTest(digest_name, args=args, kwds=kwds): + with self.assertRaisesRegex(TypeError, errmsg): + hashlib.new(digest_name, *args, **kwds) + if self._hashlib: + with self.assertRaisesRegex(TypeError, errmsg): + self._hashlib.new(digest_name, *args, **kwds) + def test_unknown_hash(self): self.assertRaises(ValueError, hashlib.new, 'spam spam spam spam spam') self.assertRaises(TypeError, hashlib.new, 1) @@ -718,8 +801,6 @@ class HashLibTestCase(unittest.TestCase): self.assertRaises(ValueError, constructor, node_offset=-1) self.assertRaises(OverflowError, constructor, node_offset=max_offset+1) - self.assertRaises(TypeError, constructor, data=b'') - self.assertRaises(TypeError, constructor, string=b'') self.assertRaises(TypeError, constructor, '') constructor( diff --git a/Lib/test/test_http_cookiejar.py b/Lib/test/test_http_cookiejar.py index 6bc33b15ec3..04cb440cd4c 100644 --- a/Lib/test/test_http_cookiejar.py +++ b/Lib/test/test_http_cookiejar.py @@ -4,6 +4,7 @@ import os import stat import sys import re +from test import support from test.support import os_helper from test.support import warnings_helper import time @@ -105,8 +106,7 @@ class DateTimeTests(unittest.TestCase): self.assertEqual(http2time(s.lower()), test_t, s.lower()) self.assertEqual(http2time(s.upper()), test_t, s.upper()) - def test_http2time_garbage(self): - for test in [ + @support.subTests('test', [ '', 'Garbage', 'Mandag 16. September 1996', @@ -121,10 +121,9 @@ class DateTimeTests(unittest.TestCase): '08-01-3697739', '09 Feb 19942632 22:23:32 GMT', 'Wed, 09 Feb 1994834 22:23:32 GMT', - ]: - self.assertIsNone(http2time(test), - "http2time(%s) is not None\n" - "http2time(test) %s" % (test, http2time(test))) + ]) + def test_http2time_garbage(self, test): + self.assertIsNone(http2time(test)) def test_http2time_redos_regression_actually_completes(self): # LOOSE_HTTP_DATE_RE was vulnerable to malicious input which caused catastrophic backtracking (REDoS). @@ -149,9 +148,7 @@ class DateTimeTests(unittest.TestCase): self.assertEqual(parse_date("1994-02-03 19:45:29 +0530"), (1994, 2, 3, 14, 15, 29)) - def test_iso2time_formats(self): - # test iso2time for supported dates. - tests = [ + @support.subTests('s', [ '1994-02-03 00:00:00 -0000', # ISO 8601 format '1994-02-03 00:00:00 +0000', # ISO 8601 format '1994-02-03 00:00:00', # zone is optional @@ -164,16 +161,15 @@ class DateTimeTests(unittest.TestCase): # A few tests with extra space at various places ' 1994-02-03 ', ' 1994-02-03T00:00:00 ', - ] - + ]) + def test_iso2time_formats(self, s): + # test iso2time for supported dates. test_t = 760233600 # assume broken POSIX counting of seconds - for s in tests: - self.assertEqual(iso2time(s), test_t, s) - self.assertEqual(iso2time(s.lower()), test_t, s.lower()) - self.assertEqual(iso2time(s.upper()), test_t, s.upper()) + self.assertEqual(iso2time(s), test_t, s) + self.assertEqual(iso2time(s.lower()), test_t, s.lower()) + self.assertEqual(iso2time(s.upper()), test_t, s.upper()) - def test_iso2time_garbage(self): - for test in [ + @support.subTests('test', [ '', 'Garbage', 'Thursday, 03-Feb-94 00:00:00 GMT', @@ -186,9 +182,9 @@ class DateTimeTests(unittest.TestCase): '01-01-1980 00:00:62', '01-01-1980T00:00:62', '19800101T250000Z', - ]: - self.assertIsNone(iso2time(test), - "iso2time(%r)" % test) + ]) + def test_iso2time_garbage(self, test): + self.assertIsNone(iso2time(test)) def test_iso2time_performance_regression(self): # If ISO_DATE_RE regresses to quadratic complexity, this test will take a very long time to succeed. @@ -199,24 +195,23 @@ class DateTimeTests(unittest.TestCase): class HeaderTests(unittest.TestCase): - def test_parse_ns_headers(self): - # quotes should be stripped - expected = [[('foo', 'bar'), ('expires', 2209069412), ('version', '0')]] - for hdr in [ + @support.subTests('hdr', [ 'foo=bar; expires=01 Jan 2040 22:23:32 GMT', 'foo=bar; expires="01 Jan 2040 22:23:32 GMT"', - ]: - self.assertEqual(parse_ns_headers([hdr]), expected) - - def test_parse_ns_headers_version(self): - + ]) + def test_parse_ns_headers(self, hdr): # quotes should be stripped - expected = [[('foo', 'bar'), ('version', '1')]] - for hdr in [ + expected = [[('foo', 'bar'), ('expires', 2209069412), ('version', '0')]] + self.assertEqual(parse_ns_headers([hdr]), expected) + + @support.subTests('hdr', [ 'foo=bar; version="1"', 'foo=bar; Version="1"', - ]: - self.assertEqual(parse_ns_headers([hdr]), expected) + ]) + def test_parse_ns_headers_version(self, hdr): + # quotes should be stripped + expected = [[('foo', 'bar'), ('version', '1')]] + self.assertEqual(parse_ns_headers([hdr]), expected) def test_parse_ns_headers_special_names(self): # names such as 'expires' are not special in first name=value pair @@ -226,8 +221,7 @@ class HeaderTests(unittest.TestCase): expected = [[("expires", "01 Jan 2040 22:23:32 GMT"), ("version", "0")]] self.assertEqual(parse_ns_headers([hdr]), expected) - def test_join_header_words(self): - for src, expected in [ + @support.subTests('src,expected', [ ([[("foo", None), ("bar", "baz")]], "foo; bar=baz"), (([]), ""), (([[]]), ""), @@ -237,12 +231,11 @@ class HeaderTests(unittest.TestCase): 'n; foo="foo;_", bar=foo_bar'), ([[("n", "m"), ("foo", None)], [("bar", "foo_bar")]], 'n=m; foo, bar=foo_bar'), - ]: - with self.subTest(src=src): - self.assertEqual(join_header_words(src), expected) + ]) + def test_join_header_words(self, src, expected): + self.assertEqual(join_header_words(src), expected) - def test_split_header_words(self): - tests = [ + @support.subTests('arg,expect', [ ("foo", [[("foo", None)]]), ("foo=bar", [[("foo", "bar")]]), (" foo ", [[("foo", None)]]), @@ -259,24 +252,22 @@ class HeaderTests(unittest.TestCase): (r'foo; bar=baz, spam=, foo="\,\;\"", bar= ', [[("foo", None), ("bar", "baz")], [("spam", "")], [("foo", ',;"')], [("bar", "")]]), - ] - - for arg, expect in tests: - try: - result = split_header_words([arg]) - except: - import traceback, io - f = io.StringIO() - traceback.print_exc(None, f) - result = "(error -- traceback follows)\n\n%s" % f.getvalue() - self.assertEqual(result, expect, """ + ]) + def test_split_header_words(self, arg, expect): + try: + result = split_header_words([arg]) + except: + import traceback, io + f = io.StringIO() + traceback.print_exc(None, f) + result = "(error -- traceback follows)\n\n%s" % f.getvalue() + self.assertEqual(result, expect, """ When parsing: '%s' Expected: '%s' Got: '%s' """ % (arg, expect, result)) - def test_roundtrip(self): - tests = [ + @support.subTests('arg,expect', [ ("foo", "foo"), ("foo=bar", "foo=bar"), (" foo ", "foo"), @@ -309,12 +300,11 @@ Got: '%s' ('n; foo="foo;_", bar="foo,_"', 'n; foo="foo;_", bar="foo,_"'), - ] - - for arg, expect in tests: - input = split_header_words([arg]) - res = join_header_words(input) - self.assertEqual(res, expect, """ + ]) + def test_roundtrip(self, arg, expect): + input = split_header_words([arg]) + res = join_header_words(input) + self.assertEqual(res, expect, """ When parsing: '%s' Expected: '%s' Got: '%s' @@ -516,14 +506,7 @@ class CookieTests(unittest.TestCase): ## just the 7 special TLD's listed in their spec. And folks rely on ## that... - def test_domain_return_ok(self): - # test optimization: .domain_return_ok() should filter out most - # domains in the CookieJar before we try to access them (because that - # may require disk access -- in particular, with MSIECookieJar) - # This is only a rough check for performance reasons, so it's not too - # critical as long as it's sufficiently liberal. - pol = DefaultCookiePolicy() - for url, domain, ok in [ + @support.subTests('url,domain,ok', [ ("http://foo.bar.com/", "blah.com", False), ("http://foo.bar.com/", "rhubarb.blah.com", False), ("http://foo.bar.com/", "rhubarb.foo.bar.com", False), @@ -543,11 +526,18 @@ class CookieTests(unittest.TestCase): ("http://foo/", ".local", True), ("http://barfoo.com", ".foo.com", False), ("http://barfoo.com", "foo.com", False), - ]: - request = urllib.request.Request(url) - r = pol.domain_return_ok(domain, request) - if ok: self.assertTrue(r) - else: self.assertFalse(r) + ]) + def test_domain_return_ok(self, url, domain, ok): + # test optimization: .domain_return_ok() should filter out most + # domains in the CookieJar before we try to access them (because that + # may require disk access -- in particular, with MSIECookieJar) + # This is only a rough check for performance reasons, so it's not too + # critical as long as it's sufficiently liberal. + pol = DefaultCookiePolicy() + request = urllib.request.Request(url) + r = pol.domain_return_ok(domain, request) + if ok: self.assertTrue(r) + else: self.assertFalse(r) def test_missing_value(self): # missing = sign in Cookie: header is regarded by Mozilla as a missing @@ -581,10 +571,7 @@ class CookieTests(unittest.TestCase): self.assertEqual(interact_netscape(c, "http://www.acme.com/foo/"), '"spam"; eggs') - def test_rfc2109_handling(self): - # RFC 2109 cookies are handled as RFC 2965 or Netscape cookies, - # dependent on policy settings - for rfc2109_as_netscape, rfc2965, version in [ + @support.subTests('rfc2109_as_netscape,rfc2965,version', [ # default according to rfc2965 if not explicitly specified (None, False, 0), (None, True, 1), @@ -593,24 +580,27 @@ class CookieTests(unittest.TestCase): (False, True, 1), (True, False, 0), (True, True, 0), - ]: - policy = DefaultCookiePolicy( - rfc2109_as_netscape=rfc2109_as_netscape, - rfc2965=rfc2965) - c = CookieJar(policy) - interact_netscape(c, "http://www.example.com/", "ni=ni; Version=1") - try: - cookie = c._cookies["www.example.com"]["/"]["ni"] - except KeyError: - self.assertIsNone(version) # didn't expect a stored cookie - else: - self.assertEqual(cookie.version, version) - # 2965 cookies are unaffected - interact_2965(c, "http://www.example.com/", - "foo=bar; Version=1") - if rfc2965: - cookie2965 = c._cookies["www.example.com"]["/"]["foo"] - self.assertEqual(cookie2965.version, 1) + ]) + def test_rfc2109_handling(self, rfc2109_as_netscape, rfc2965, version): + # RFC 2109 cookies are handled as RFC 2965 or Netscape cookies, + # dependent on policy settings + policy = DefaultCookiePolicy( + rfc2109_as_netscape=rfc2109_as_netscape, + rfc2965=rfc2965) + c = CookieJar(policy) + interact_netscape(c, "http://www.example.com/", "ni=ni; Version=1") + try: + cookie = c._cookies["www.example.com"]["/"]["ni"] + except KeyError: + self.assertIsNone(version) # didn't expect a stored cookie + else: + self.assertEqual(cookie.version, version) + # 2965 cookies are unaffected + interact_2965(c, "http://www.example.com/", + "foo=bar; Version=1") + if rfc2965: + cookie2965 = c._cookies["www.example.com"]["/"]["foo"] + self.assertEqual(cookie2965.version, 1) def test_ns_parser(self): c = CookieJar() @@ -778,8 +768,7 @@ class CookieTests(unittest.TestCase): # Cookie is sent back to the same URI. self.assertEqual(interact_netscape(cj, uri), value) - def test_escape_path(self): - cases = [ + @support.subTests('arg,result', [ # quoted safe ("/foo%2f/bar", "/foo%2F/bar"), ("/foo%2F/bar", "/foo%2F/bar"), @@ -799,9 +788,9 @@ class CookieTests(unittest.TestCase): ("/foo/bar\u00fc", "/foo/bar%C3%BC"), # UTF-8 encoded # unicode ("/foo/bar\uabcd", "/foo/bar%EA%AF%8D"), # UTF-8 encoded - ] - for arg, result in cases: - self.assertEqual(escape_path(arg), result) + ]) + def test_escape_path(self, arg, result): + self.assertEqual(escape_path(arg), result) def test_request_path(self): # with parameters diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py index 165949167ce..b3c9ef8efba 100644 --- a/Lib/test/test_interpreters/test_api.py +++ b/Lib/test/test_interpreters/test_api.py @@ -1,17 +1,22 @@ +import contextlib import os import pickle +import sys from textwrap import dedent import threading import types import unittest from test import support +from test.support import os_helper +from test.support import script_helper from test.support import import_helper # Raise SkipTest if subinterpreters not supported. _interpreters = import_helper.import_module('_interpreters') from test.support import Py_GIL_DISABLED from test.support import interpreters from test.support import force_not_colorized +import test._crossinterp_definitions as defs from test.support.interpreters import ( InterpreterError, InterpreterNotFoundError, ExecutionFailed, ) @@ -29,6 +34,59 @@ WHENCE_STR_XI = 'cross-interpreter C-API' WHENCE_STR_STDLIB = '_interpreters module' +def is_pickleable(obj): + try: + pickle.dumps(obj) + except Exception: + return False + return True + + +@contextlib.contextmanager +def defined_in___main__(name, script, *, remove=False): + import __main__ as mainmod + mainns = vars(mainmod) + assert name not in mainns + exec(script, mainns, mainns) + if remove: + yield mainns.pop(name) + else: + try: + yield mainns[name] + finally: + mainns.pop(name, None) + + +def build_excinfo(exctype, msg=None, formatted=None, errdisplay=None): + if isinstance(exctype, type): + assert issubclass(exctype, BaseException), exctype + exctype = types.SimpleNamespace( + __name__=exctype.__name__, + __qualname__=exctype.__qualname__, + __module__=exctype.__module__, + ) + elif isinstance(exctype, str): + module, _, name = exctype.rpartition(exctype) + if not module and name in __builtins__: + module = 'builtins' + exctype = types.SimpleNamespace( + __name__=name, + __qualname__=exctype, + __module__=module or None, + ) + else: + assert isinstance(exctype, types.SimpleNamespace) + assert msg is None or isinstance(msg, str), msg + assert formatted is None or isinstance(formatted, str), formatted + assert errdisplay is None or isinstance(errdisplay, str), errdisplay + return types.SimpleNamespace( + type=exctype, + msg=msg, + formatted=formatted, + errdisplay=errdisplay, + ) + + class ModuleTests(TestBase): def test_queue_aliases(self): @@ -890,24 +948,26 @@ class TestInterpreterExec(TestBase): # Interpreter.exec() behavior. -def call_func_noop(): - pass +call_func_noop = defs.spam_minimal +call_func_ident = defs.spam_returns_arg +call_func_failure = defs.spam_raises def call_func_return_shareable(): return (1, None) -def call_func_return_not_shareable(): - return [1, 2, 3] +def call_func_return_stateless_func(): + return (lambda x: x) -def call_func_failure(): - raise Exception('spam!') +def call_func_return_pickleable(): + return [1, 2, 3] -def call_func_ident(value): - return value +def call_func_return_unpickleable(): + x = 42 + return (lambda: x) def get_call_func_closure(value): @@ -916,6 +976,11 @@ def get_call_func_closure(value): return call_func_closure +def call_func_exec_wrapper(script, ns): + res = exec(script, ns, ns) + return res, ns, id(ns) + + class Spam: @staticmethod @@ -1012,86 +1077,375 @@ class TestInterpreterCall(TestBase): # - preserves info (e.g. SyntaxError) # - matching error display - def test_call(self): + @contextlib.contextmanager + def assert_fails(self, expected): + with self.assertRaises(ExecutionFailed) as cm: + yield cm + uncaught = cm.exception.excinfo + self.assertEqual(uncaught.type.__name__, expected.__name__) + + def assert_fails_not_shareable(self): + return self.assert_fails(interpreters.NotShareableError) + + def assert_code_equal(self, code1, code2): + if code1 == code2: + return + self.assertEqual(code1.co_name, code2.co_name) + self.assertEqual(code1.co_flags, code2.co_flags) + self.assertEqual(code1.co_consts, code2.co_consts) + self.assertEqual(code1.co_varnames, code2.co_varnames) + self.assertEqual(code1.co_cellvars, code2.co_cellvars) + self.assertEqual(code1.co_freevars, code2.co_freevars) + self.assertEqual(code1.co_names, code2.co_names) + self.assertEqual( + _testinternalcapi.get_code_var_counts(code1), + _testinternalcapi.get_code_var_counts(code2), + ) + self.assertEqual(code1.co_code, code2.co_code) + + def assert_funcs_equal(self, func1, func2): + if func1 == func2: + return + self.assertIs(type(func1), type(func2)) + self.assertEqual(func1.__name__, func2.__name__) + self.assertEqual(func1.__defaults__, func2.__defaults__) + self.assertEqual(func1.__kwdefaults__, func2.__kwdefaults__) + self.assertEqual(func1.__closure__, func2.__closure__) + self.assert_code_equal(func1.__code__, func2.__code__) + self.assertEqual( + _testinternalcapi.get_code_var_counts(func1), + _testinternalcapi.get_code_var_counts(func2), + ) + + def assert_exceptions_equal(self, exc1, exc2): + assert isinstance(exc1, Exception) + assert isinstance(exc2, Exception) + if exc1 == exc2: + return + self.assertIs(type(exc1), type(exc2)) + self.assertEqual(exc1.args, exc2.args) + + def test_stateless_funcs(self): interp = interpreters.create() - for i, (callable, args, kwargs) in enumerate([ - (call_func_noop, (), {}), - (Spam.noop, (), {}), + func = call_func_noop + with self.subTest('no args, no return'): + res = interp.call(func) + self.assertIsNone(res) + + func = call_func_return_shareable + with self.subTest('no args, returns shareable'): + res = interp.call(func) + self.assertEqual(res, (1, None)) + + func = call_func_return_stateless_func + expected = (lambda x: x) + with self.subTest('no args, returns stateless func'): + res = interp.call(func) + self.assert_funcs_equal(res, expected) + + func = call_func_return_pickleable + with self.subTest('no args, returns pickleable'): + res = interp.call(func) + self.assertEqual(res, [1, 2, 3]) + + func = call_func_return_unpickleable + with self.subTest('no args, returns unpickleable'): + with self.assertRaises(interpreters.NotShareableError): + interp.call(func) + + def test_stateless_func_returns_arg(self): + interp = interpreters.create() + + for arg in [ + None, + 10, + 'spam!', + b'spam!', + (1, 2, 'spam!'), + memoryview(b'spam!'), + ]: + with self.subTest(f'shareable {arg!r}'): + assert _interpreters.is_shareable(arg) + res = interp.call(defs.spam_returns_arg, arg) + self.assertEqual(res, arg) + + for arg in defs.STATELESS_FUNCTIONS: + with self.subTest(f'stateless func {arg!r}'): + res = interp.call(defs.spam_returns_arg, arg) + self.assert_funcs_equal(res, arg) + + for arg in defs.TOP_FUNCTIONS: + if arg in defs.STATELESS_FUNCTIONS: + continue + with self.subTest(f'stateful func {arg!r}'): + res = interp.call(defs.spam_returns_arg, arg) + self.assert_funcs_equal(res, arg) + assert is_pickleable(arg) + + for arg in [ + Ellipsis, + NotImplemented, + object(), + 2**1000, + [1, 2, 3], + {'a': 1, 'b': 2}, + types.SimpleNamespace(x=42), + # builtin types + object, + type, + Exception, + ModuleNotFoundError, + # builtin exceptions + Exception('uh-oh!'), + ModuleNotFoundError('mymodule'), + # builtin fnctions + len, + sys.exit, + # user classes + *defs.TOP_CLASSES, + *(c(*a) for c, a in defs.TOP_CLASSES.items() + if c not in defs.CLASSES_WITHOUT_EQUALITY), + ]: + with self.subTest(f'pickleable {arg!r}'): + res = interp.call(defs.spam_returns_arg, arg) + if type(arg) is object: + self.assertIs(type(res), object) + elif isinstance(arg, BaseException): + self.assert_exceptions_equal(res, arg) + else: + self.assertEqual(res, arg) + assert is_pickleable(arg) + + for arg in [ + types.MappingProxyType({}), + *(f for f in defs.NESTED_FUNCTIONS + if f not in defs.STATELESS_FUNCTIONS), + ]: + with self.subTest(f'unpickleable {arg!r}'): + assert not _interpreters.is_shareable(arg) + assert not is_pickleable(arg) + with self.assertRaises(interpreters.NotShareableError): + interp.call(defs.spam_returns_arg, arg) + + def test_full_args(self): + interp = interpreters.create() + expected = (1, 2, 3, 4, 5, 6, ('?',), {'g': 7, 'h': 8}) + func = defs.spam_full_args + res = interp.call(func, 1, 2, 3, 4, '?', e=5, f=6, g=7, h=8) + self.assertEqual(res, expected) + + def test_full_defaults(self): + # pickleable, but not stateless + interp = interpreters.create() + expected = (-1, -2, -3, -4, -5, -6, (), {'g': 8, 'h': 9}) + res = interp.call(defs.spam_full_args_with_defaults, g=8, h=9) + self.assertEqual(res, expected) + + def test_modified_arg(self): + interp = interpreters.create() + script = dedent(""" + a = 7 + b = 2 + c = a ** b + """) + ns = {} + expected = {'a': 7, 'b': 2, 'c': 49} + res = interp.call(call_func_exec_wrapper, script, ns) + obj, resns, resid = res + del resns['__builtins__'] + self.assertIsNone(obj) + self.assertEqual(ns, {}) + self.assertEqual(resns, expected) + self.assertNotEqual(resid, id(ns)) + self.assertNotEqual(resid, id(resns)) + + def test_func_in___main___valid(self): + # pickleable, already there' + + with os_helper.temp_dir() as tempdir: + def new_mod(name, text): + script_helper.make_script(tempdir, name, dedent(text)) + + def run(text): + name = 'myscript' + text = dedent(f""" + import sys + sys.path.insert(0, {tempdir!r}) + + """) + dedent(text) + filename = script_helper.make_script(tempdir, name, text) + res = script_helper.assert_python_ok(filename) + return res.out.decode('utf-8').strip() + + # no module indirection + with self.subTest('no indirection'): + text = run(f""" + from test.support import interpreters + + def spam(): + # This a global var... + return __name__ + + if __name__ == '__main__': + interp = interpreters.create() + res = interp.call(spam) + print(res) + """) + self.assertEqual(text, '<fake __main__>') + + # indirect as func, direct interp + new_mod('mymod', f""" + def run(interp, func): + return interp.call(func) + """) + with self.subTest('indirect as func, direct interp'): + text = run(f""" + from test.support import interpreters + import mymod + + def spam(): + # This a global var... + return __name__ + + if __name__ == '__main__': + interp = interpreters.create() + res = mymod.run(interp, spam) + print(res) + """) + self.assertEqual(text, '<fake __main__>') + + # indirect as func, indirect interp + new_mod('mymod', f""" + from test.support import interpreters + def run(func): + interp = interpreters.create() + return interp.call(func) + """) + with self.subTest('indirect as func, indirect interp'): + text = run(f""" + import mymod + + def spam(): + # This a global var... + return __name__ + + if __name__ == '__main__': + res = mymod.run(spam) + print(res) + """) + self.assertEqual(text, '<fake __main__>') + + def test_func_in___main___invalid(self): + interp = interpreters.create() + + funcname = f'{__name__.replace(".", "_")}_spam_okay' + script = dedent(f""" + def {funcname}(): + # This a global var... + return __name__ + """) + + with self.subTest('pickleable, added dynamically'): + with defined_in___main__(funcname, script) as arg: + with self.assertRaises(interpreters.NotShareableError): + interp.call(defs.spam_returns_arg, arg) + + with self.subTest('lying about __main__'): + with defined_in___main__(funcname, script, remove=True) as arg: + with self.assertRaises(interpreters.NotShareableError): + interp.call(defs.spam_returns_arg, arg) + + def test_raises(self): + interp = interpreters.create() + with self.assertRaises(ExecutionFailed): + interp.call(call_func_failure) + + with self.assert_fails(ValueError): + interp.call(call_func_complex, '???', exc=ValueError('spam')) + + def test_call_valid(self): + interp = interpreters.create() + + for i, (callable, args, kwargs, expected) in enumerate([ + (call_func_noop, (), {}, None), + (call_func_ident, ('spamspamspam',), {}, 'spamspamspam'), + (call_func_return_shareable, (), {}, (1, None)), + (call_func_return_pickleable, (), {}, [1, 2, 3]), + (Spam.noop, (), {}, None), + (Spam.from_values, (), {}, Spam(())), + (Spam.from_values, (1, 2, 3), {}, Spam((1, 2, 3))), + (Spam, ('???',), {}, Spam('???')), + (Spam(101), (), {}, (101, (), {})), + (Spam(10101).run, (), {}, (10101, (), {})), + (call_func_complex, ('ident', 'spam'), {}, 'spam'), + (call_func_complex, ('full-ident', 'spam'), {}, ('spam', (), {})), + (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}, + ('spam', ('ham',), {'eggs': '!!!'})), + (call_func_complex, ('globals',), {}, __name__), + (call_func_complex, ('interpid',), {}, interp.id), + (call_func_complex, ('custom', 'spam!'), {}, Spam('spam!')), ]): with self.subTest(f'success case #{i+1}'): - res = interp.call(callable) - self.assertIs(res, None) + res = interp.call(callable, *args, **kwargs) + self.assertEqual(res, expected) + + def test_call_invalid(self): + interp = interpreters.create() + + func = get_call_func_closure + with self.subTest(func): + with self.assertRaises(interpreters.NotShareableError): + interp.call(func, 42) + + func = get_call_func_closure(42) + with self.subTest(func): + with self.assertRaises(interpreters.NotShareableError): + interp.call(func) + + func = call_func_complex + op = 'closure' + with self.subTest(f'{func} ({op})'): + with self.assertRaises(interpreters.NotShareableError): + interp.call(func, op, value='~~~') + + op = 'custom-inner' + with self.subTest(f'{func} ({op})'): + with self.assertRaises(interpreters.NotShareableError): + interp.call(func, op, 'eggs!') + + def test_call_in_thread(self): + interp = interpreters.create() for i, (callable, args, kwargs) in enumerate([ - (call_func_ident, ('spamspamspam',), {}), - (get_call_func_closure, (42,), {}), - (get_call_func_closure(42), (), {}), + (call_func_noop, (), {}), + (call_func_return_shareable, (), {}), + (call_func_return_pickleable, (), {}), (Spam.from_values, (), {}), (Spam.from_values, (1, 2, 3), {}), - (Spam, ('???'), {}), (Spam(101), (), {}), (Spam(10101).run, (), {}), + (Spam.noop, (), {}), (call_func_complex, ('ident', 'spam'), {}), (call_func_complex, ('full-ident', 'spam'), {}), (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), (call_func_complex, ('globals',), {}), (call_func_complex, ('interpid',), {}), - (call_func_complex, ('closure',), {'value': '~~~'}), (call_func_complex, ('custom', 'spam!'), {}), - (call_func_complex, ('custom-inner', 'eggs!'), {}), - (call_func_complex, ('???',), {'exc': ValueError('spam')}), - (call_func_return_shareable, (), {}), - (call_func_return_not_shareable, (), {}), - ]): - with self.subTest(f'invalid case #{i+1}'): - with self.assertRaises(Exception): - if args or kwargs: - raise Exception((args, kwargs)) - interp.call(callable) - - with self.assertRaises(ExecutionFailed): - interp.call(call_func_failure) - - def test_call_in_thread(self): - interp = interpreters.create() - - for i, (callable, args, kwargs) in enumerate([ - (call_func_noop, (), {}), - (Spam.noop, (), {}), ]): with self.subTest(f'success case #{i+1}'): with self.captured_thread_exception() as ctx: - t = interp.call_in_thread(callable) + t = interp.call_in_thread(callable, *args, **kwargs) t.join() self.assertIsNone(ctx.caught) for i, (callable, args, kwargs) in enumerate([ - (call_func_ident, ('spamspamspam',), {}), (get_call_func_closure, (42,), {}), (get_call_func_closure(42), (), {}), - (Spam.from_values, (), {}), - (Spam.from_values, (1, 2, 3), {}), - (Spam, ('???'), {}), - (Spam(101), (), {}), - (Spam(10101).run, (), {}), - (call_func_complex, ('ident', 'spam'), {}), - (call_func_complex, ('full-ident', 'spam'), {}), - (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}), - (call_func_complex, ('globals',), {}), - (call_func_complex, ('interpid',), {}), - (call_func_complex, ('closure',), {'value': '~~~'}), - (call_func_complex, ('custom', 'spam!'), {}), - (call_func_complex, ('custom-inner', 'eggs!'), {}), - (call_func_complex, ('???',), {'exc': ValueError('spam')}), - (call_func_return_shareable, (), {}), - (call_func_return_not_shareable, (), {}), ]): with self.subTest(f'invalid case #{i+1}'): - if args or kwargs: - continue with self.captured_thread_exception() as ctx: - t = interp.call_in_thread(callable) + t = interp.call_in_thread(callable, *args, **kwargs) t.join() self.assertIsNotNone(ctx.caught) @@ -1600,18 +1954,14 @@ class LowLevelTests(TestBase): with results: exc = _interpreters.exec(interpid, script) out = results.stdout() - self.assertEqual(out, '') - self.assert_ns_equal(exc, types.SimpleNamespace( - type=types.SimpleNamespace( - __name__='Exception', - __qualname__='Exception', - __module__='builtins', - ), - msg='uh-oh!', + expected = build_excinfo( + Exception, 'uh-oh!', # We check these in other tests. formatted=exc.formatted, errdisplay=exc.errdisplay, - )) + ) + self.assertEqual(out, '') + self.assert_ns_equal(exc, expected) with self.subTest('from C-API'): with self.interpreter_from_capi() as interpid: @@ -1623,25 +1973,50 @@ class LowLevelTests(TestBase): self.assertEqual(exc.msg, 'it worked!') def test_call(self): - with self.subTest('no args'): - interpid = _interpreters.create() - with self.assertRaises(ValueError): - _interpreters.call(interpid, call_func_return_shareable) + interpid = _interpreters.create() + + # Here we focus on basic args and return values. + # See TestInterpreterCall for full operational coverage, + # including supported callables. + + with self.subTest('no args, return None'): + func = defs.spam_minimal + res, exc = _interpreters.call(interpid, func) + self.assertIsNone(exc) + self.assertIsNone(res) + + with self.subTest('empty args, return None'): + func = defs.spam_minimal + res, exc = _interpreters.call(interpid, func, (), {}) + self.assertIsNone(exc) + self.assertIsNone(res) + + with self.subTest('no args, return non-None'): + func = defs.script_with_return + res, exc = _interpreters.call(interpid, func) + self.assertIsNone(exc) + self.assertIs(res, True) + + with self.subTest('full args, return non-None'): + expected = (1, 2, 3, 4, 5, 6, (7, 8), {'g': 9, 'h': 0}) + func = defs.spam_full_args + args = (1, 2, 3, 4, 7, 8) + kwargs = dict(e=5, f=6, g=9, h=0) + res, exc = _interpreters.call(interpid, func, args, kwargs) + self.assertIsNone(exc) + self.assertEqual(res, expected) with self.subTest('uncaught exception'): - interpid = _interpreters.create() - exc = _interpreters.call(interpid, call_func_failure) - self.assertEqual(exc, types.SimpleNamespace( - type=types.SimpleNamespace( - __name__='Exception', - __qualname__='Exception', - __module__='builtins', - ), - msg='spam!', + func = defs.spam_raises + res, exc = _interpreters.call(interpid, func) + expected = build_excinfo( + Exception, 'spam!', # We check these in other tests. formatted=exc.formatted, errdisplay=exc.errdisplay, - )) + ) + self.assertIsNone(res) + self.assertEqual(exc, expected) @requires_test_modules def test_set___main___attrs(self): diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 168e66c5a3f..0c921ffbc25 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -1062,6 +1062,37 @@ class IOTest(unittest.TestCase): # Silence destructor error R.flush = lambda self: None + @threading_helper.requires_working_threading() + def test_write_readline_races(self): + # gh-134908: Concurrent iteration over a file caused races + thread_count = 2 + write_count = 100 + read_count = 100 + + def writer(file, barrier): + barrier.wait() + for _ in range(write_count): + file.write("x") + + def reader(file, barrier): + barrier.wait() + for _ in range(read_count): + for line in file: + self.assertEqual(line, "") + + with self.open(os_helper.TESTFN, "w+") as f: + barrier = threading.Barrier(thread_count + 1) + reader = threading.Thread(target=reader, args=(f, barrier)) + writers = [threading.Thread(target=writer, args=(f, barrier)) + for _ in range(thread_count)] + with threading_helper.catch_threading_exception() as cm: + with threading_helper.start_threads(writers + [reader]): + pass + self.assertIsNone(cm.exc_type) + + self.assertEqual(os.stat(os_helper.TESTFN).st_size, + write_count * thread_count) + class CIOTest(IOTest): diff --git a/Lib/test/test_json/test_dump.py b/Lib/test/test_json/test_dump.py index 13b40020781..39470754003 100644 --- a/Lib/test/test_json/test_dump.py +++ b/Lib/test/test_json/test_dump.py @@ -22,6 +22,14 @@ class TestDump: self.assertIn('valid_key', o) self.assertNotIn(b'invalid_key', o) + def test_dump_skipkeys_indent_empty(self): + v = {b'invalid_key': False} + self.assertEqual(self.json.dumps(v, skipkeys=True, indent=4), '{}') + + def test_skipkeys_indent(self): + v = {b'invalid_key': False, 'valid_key': True} + self.assertEqual(self.json.dumps(v, skipkeys=True, indent=4), '{\n "valid_key": true\n}') + def test_encode_truefalse(self): self.assertEqual(self.dumps( {True: False, False: True}, sort_keys=True), diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py index d14336f8bac..384ad5c828d 100644 --- a/Lib/test/test_math.py +++ b/Lib/test/test_math.py @@ -1973,6 +1973,28 @@ class MathTests(unittest.TestCase): self.assertFalse(math.isfinite(float("inf"))) self.assertFalse(math.isfinite(float("-inf"))) + def testIsnormal(self): + self.assertTrue(math.isnormal(1.25)) + self.assertTrue(math.isnormal(-1.0)) + self.assertFalse(math.isnormal(0.0)) + self.assertFalse(math.isnormal(-0.0)) + self.assertFalse(math.isnormal(INF)) + self.assertFalse(math.isnormal(NINF)) + self.assertFalse(math.isnormal(NAN)) + self.assertFalse(math.isnormal(FLOAT_MIN/2)) + self.assertFalse(math.isnormal(-FLOAT_MIN/2)) + + def testIssubnormal(self): + self.assertFalse(math.issubnormal(1.25)) + self.assertFalse(math.issubnormal(-1.0)) + self.assertFalse(math.issubnormal(0.0)) + self.assertFalse(math.issubnormal(-0.0)) + self.assertFalse(math.issubnormal(INF)) + self.assertFalse(math.issubnormal(NINF)) + self.assertFalse(math.issubnormal(NAN)) + self.assertTrue(math.issubnormal(FLOAT_MIN/2)) + self.assertTrue(math.issubnormal(-FLOAT_MIN/2)) + def testIsnan(self): self.assertTrue(math.isnan(float("nan"))) self.assertTrue(math.isnan(float("-nan"))) diff --git a/Lib/test/test_monitoring.py b/Lib/test/test_monitoring.py index 263e4e6f394..a932ac80117 100644 --- a/Lib/test/test_monitoring.py +++ b/Lib/test/test_monitoring.py @@ -2157,6 +2157,21 @@ class TestRegressions(MonitoringTestBase, unittest.TestCase): sys.monitoring.restart_events() sys.monitoring.set_events(0, 0) + def test_134879(self): + # gh-134789 + # Specialized FOR_ITER not incrementing index + def foo(): + t = 0 + for i in [1,2,3,4]: + t += i + self.assertEqual(t, 10) + + sys.monitoring.use_tool_id(0, "test") + self.addCleanup(sys.monitoring.free_tool_id, 0) + sys.monitoring.set_local_events(0, foo.__code__, E.BRANCH_LEFT | E.BRANCH_RIGHT) + foo() + sys.monitoring.set_local_events(0, foo.__code__, 0) + class TestOptimizer(MonitoringTestBase, unittest.TestCase): diff --git a/Lib/test/test_ntpath.py b/Lib/test/test_ntpath.py index c3b0bdaebc2..22f6403d482 100644 --- a/Lib/test/test_ntpath.py +++ b/Lib/test/test_ntpath.py @@ -6,6 +6,8 @@ import subprocess import sys import unittest import warnings +from ntpath import ALLOW_MISSING +from test import support from test.support import TestFailed, cpython_only, os_helper from test.support.os_helper import FakePath from test import test_genericpath @@ -76,6 +78,10 @@ def tester(fn, wantResult): %(str(fn), str(wantResult), repr(gotResult))) +def _parameterize(*parameters): + return support.subTests('kwargs', parameters, _do_cleanups=True) + + class NtpathTestCase(unittest.TestCase): def assertPathEqual(self, path1, path2): if path1 == path2 or _norm(path1) == _norm(path2): @@ -474,6 +480,27 @@ class TestNtpath(NtpathTestCase): tester("ntpath.realpath('.\\.')", expected) tester("ntpath.realpath('\\'.join(['.'] * 100))", expected) + def test_realpath_curdir_strict(self): + expected = ntpath.normpath(os.getcwd()) + tester("ntpath.realpath('.', strict=True)", expected) + tester("ntpath.realpath('./.', strict=True)", expected) + tester("ntpath.realpath('/'.join(['.'] * 100), strict=True)", expected) + tester("ntpath.realpath('.\\.', strict=True)", expected) + tester("ntpath.realpath('\\'.join(['.'] * 100), strict=True)", expected) + + def test_realpath_curdir_missing_ok(self): + expected = ntpath.normpath(os.getcwd()) + tester("ntpath.realpath('.', strict=ALLOW_MISSING)", + expected) + tester("ntpath.realpath('./.', strict=ALLOW_MISSING)", + expected) + tester("ntpath.realpath('/'.join(['.'] * 100), strict=ALLOW_MISSING)", + expected) + tester("ntpath.realpath('.\\.', strict=ALLOW_MISSING)", + expected) + tester("ntpath.realpath('\\'.join(['.'] * 100), strict=ALLOW_MISSING)", + expected) + def test_realpath_pardir(self): expected = ntpath.normpath(os.getcwd()) tester("ntpath.realpath('..')", ntpath.dirname(expected)) @@ -486,24 +513,59 @@ class TestNtpath(NtpathTestCase): tester("ntpath.realpath('\\'.join(['..'] * 50))", ntpath.splitdrive(expected)[0] + '\\') + def test_realpath_pardir_strict(self): + expected = ntpath.normpath(os.getcwd()) + tester("ntpath.realpath('..', strict=True)", ntpath.dirname(expected)) + tester("ntpath.realpath('../..', strict=True)", + ntpath.dirname(ntpath.dirname(expected))) + tester("ntpath.realpath('/'.join(['..'] * 50), strict=True)", + ntpath.splitdrive(expected)[0] + '\\') + tester("ntpath.realpath('..\\..', strict=True)", + ntpath.dirname(ntpath.dirname(expected))) + tester("ntpath.realpath('\\'.join(['..'] * 50), strict=True)", + ntpath.splitdrive(expected)[0] + '\\') + + def test_realpath_pardir_missing_ok(self): + expected = ntpath.normpath(os.getcwd()) + tester("ntpath.realpath('..', strict=ALLOW_MISSING)", + ntpath.dirname(expected)) + tester("ntpath.realpath('../..', strict=ALLOW_MISSING)", + ntpath.dirname(ntpath.dirname(expected))) + tester("ntpath.realpath('/'.join(['..'] * 50), strict=ALLOW_MISSING)", + ntpath.splitdrive(expected)[0] + '\\') + tester("ntpath.realpath('..\\..', strict=ALLOW_MISSING)", + ntpath.dirname(ntpath.dirname(expected))) + tester("ntpath.realpath('\\'.join(['..'] * 50), strict=ALLOW_MISSING)", + ntpath.splitdrive(expected)[0] + '\\') + @os_helper.skip_unless_symlink @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') - def test_realpath_basic(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_basic(self, kwargs): ABSTFN = ntpath.abspath(os_helper.TESTFN) open(ABSTFN, "wb").close() self.addCleanup(os_helper.unlink, ABSTFN) self.addCleanup(os_helper.unlink, ABSTFN + "1") os.symlink(ABSTFN, ABSTFN + "1") - self.assertPathEqual(ntpath.realpath(ABSTFN + "1"), ABSTFN) - self.assertPathEqual(ntpath.realpath(os.fsencode(ABSTFN + "1")), + self.assertPathEqual(ntpath.realpath(ABSTFN + "1", **kwargs), ABSTFN) + self.assertPathEqual(ntpath.realpath(os.fsencode(ABSTFN + "1"), **kwargs), os.fsencode(ABSTFN)) # gh-88013: call ntpath.realpath with binary drive name may raise a # TypeError. The drive should not exist to reproduce the bug. drives = {f"{c}:\\" for c in string.ascii_uppercase} - set(os.listdrives()) d = drives.pop().encode() - self.assertEqual(ntpath.realpath(d), d) + self.assertEqual(ntpath.realpath(d, strict=False), d) + + # gh-106242: Embedded nulls and non-strict fallback to abspath + if kwargs: + with self.assertRaises(OSError): + ntpath.realpath(os_helper.TESTFN + "\0spam", + **kwargs) + else: + self.assertEqual(ABSTFN + "\0spam", + ntpath.realpath(os_helper.TESTFN + "\0spam", **kwargs)) @os_helper.skip_unless_symlink @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @@ -526,51 +588,66 @@ class TestNtpath(NtpathTestCase): self.assertEqual(realpath(path, strict=False), path) # gh-106242: Embedded nulls should raise OSError (not ValueError) self.assertRaises(OSError, realpath, path, strict=True) + self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING) path = ABSTFNb + b'\x00' self.assertEqual(realpath(path, strict=False), path) self.assertRaises(OSError, realpath, path, strict=True) + self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING) path = ABSTFN + '\\nonexistent\\x\x00' self.assertEqual(realpath(path, strict=False), path) self.assertRaises(OSError, realpath, path, strict=True) + self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING) path = ABSTFNb + b'\\nonexistent\\x\x00' self.assertEqual(realpath(path, strict=False), path) self.assertRaises(OSError, realpath, path, strict=True) + self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING) path = ABSTFN + '\x00\\..' self.assertEqual(realpath(path, strict=False), os.getcwd()) self.assertEqual(realpath(path, strict=True), os.getcwd()) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), os.getcwd()) path = ABSTFNb + b'\x00\\..' self.assertEqual(realpath(path, strict=False), os.getcwdb()) self.assertEqual(realpath(path, strict=True), os.getcwdb()) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), os.getcwdb()) path = ABSTFN + '\\nonexistent\\x\x00\\..' self.assertEqual(realpath(path, strict=False), ABSTFN + '\\nonexistent') self.assertRaises(OSError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), ABSTFN + '\\nonexistent') path = ABSTFNb + b'\\nonexistent\\x\x00\\..' self.assertEqual(realpath(path, strict=False), ABSTFNb + b'\\nonexistent') self.assertRaises(OSError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), ABSTFNb + b'\\nonexistent') + @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_invalid_unicode_paths(self, kwargs): + realpath = ntpath.realpath + ABSTFN = ntpath.abspath(os_helper.TESTFN) + ABSTFNb = os.fsencode(ABSTFN) path = ABSTFNb + b'\xff' - self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) - self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) path = ABSTFNb + b'\\nonexistent\\\xff' - self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) - self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) path = ABSTFNb + b'\xff\\..' - self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) - self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) path = ABSTFNb + b'\\nonexistent\\\xff\\..' - self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) - self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) + self.assertRaises(UnicodeDecodeError, realpath, path, **kwargs) @os_helper.skip_unless_symlink @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') - def test_realpath_relative(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_relative(self, kwargs): ABSTFN = ntpath.abspath(os_helper.TESTFN) open(ABSTFN, "wb").close() self.addCleanup(os_helper.unlink, ABSTFN) self.addCleanup(os_helper.unlink, ABSTFN + "1") os.symlink(ABSTFN, ntpath.relpath(ABSTFN + "1")) - self.assertPathEqual(ntpath.realpath(ABSTFN + "1"), ABSTFN) + self.assertPathEqual(ntpath.realpath(ABSTFN + "1", **kwargs), ABSTFN) @os_helper.skip_unless_symlink @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @@ -722,7 +799,62 @@ class TestNtpath(NtpathTestCase): @os_helper.skip_unless_symlink @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') - def test_realpath_symlink_prefix(self): + def test_realpath_symlink_loops_raise(self): + # Symlink loops raise OSError in ALLOW_MISSING mode + ABSTFN = ntpath.abspath(os_helper.TESTFN) + self.addCleanup(os_helper.unlink, ABSTFN) + self.addCleanup(os_helper.unlink, ABSTFN + "1") + self.addCleanup(os_helper.unlink, ABSTFN + "2") + self.addCleanup(os_helper.unlink, ABSTFN + "y") + self.addCleanup(os_helper.unlink, ABSTFN + "c") + self.addCleanup(os_helper.unlink, ABSTFN + "a") + self.addCleanup(os_helper.unlink, ABSTFN + "x") + + os.symlink(ABSTFN, ABSTFN) + self.assertRaises(OSError, ntpath.realpath, ABSTFN, strict=ALLOW_MISSING) + + os.symlink(ABSTFN + "1", ABSTFN + "2") + os.symlink(ABSTFN + "2", ABSTFN + "1") + self.assertRaises(OSError, ntpath.realpath, ABSTFN + "1", + strict=ALLOW_MISSING) + self.assertRaises(OSError, ntpath.realpath, ABSTFN + "2", + strict=ALLOW_MISSING) + self.assertRaises(OSError, ntpath.realpath, ABSTFN + "1\\x", + strict=ALLOW_MISSING) + + # Windows eliminates '..' components before resolving links; + # realpath is not expected to raise if this removes the loop. + self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\.."), + ntpath.dirname(ABSTFN)) + self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\..\\x"), + ntpath.dirname(ABSTFN) + "\\x") + + os.symlink(ABSTFN + "x", ABSTFN + "y") + self.assertPathEqual(ntpath.realpath(ABSTFN + "1\\..\\" + + ntpath.basename(ABSTFN) + "y"), + ABSTFN + "x") + self.assertRaises( + OSError, ntpath.realpath, + ABSTFN + "1\\..\\" + ntpath.basename(ABSTFN) + "1", + strict=ALLOW_MISSING) + + os.symlink(ntpath.basename(ABSTFN) + "a\\b", ABSTFN + "a") + self.assertRaises(OSError, ntpath.realpath, ABSTFN + "a", + strict=ALLOW_MISSING) + + os.symlink("..\\" + ntpath.basename(ntpath.dirname(ABSTFN)) + + "\\" + ntpath.basename(ABSTFN) + "c", ABSTFN + "c") + self.assertRaises(OSError, ntpath.realpath, ABSTFN + "c", + strict=ALLOW_MISSING) + + # Test using relative path as well. + self.assertRaises(OSError, ntpath.realpath, ntpath.basename(ABSTFN), + strict=ALLOW_MISSING) + + @os_helper.skip_unless_symlink + @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_symlink_prefix(self, kwargs): ABSTFN = ntpath.abspath(os_helper.TESTFN) self.addCleanup(os_helper.unlink, ABSTFN + "3") self.addCleanup(os_helper.unlink, "\\\\?\\" + ABSTFN + "3.") @@ -737,9 +869,9 @@ class TestNtpath(NtpathTestCase): f.write(b'1') os.symlink("\\\\?\\" + ABSTFN + "3.", ABSTFN + "3.link") - self.assertPathEqual(ntpath.realpath(ABSTFN + "3link"), + self.assertPathEqual(ntpath.realpath(ABSTFN + "3link", **kwargs), ABSTFN + "3") - self.assertPathEqual(ntpath.realpath(ABSTFN + "3.link"), + self.assertPathEqual(ntpath.realpath(ABSTFN + "3.link", **kwargs), "\\\\?\\" + ABSTFN + "3.") # Resolved paths should be usable to open target files @@ -749,14 +881,17 @@ class TestNtpath(NtpathTestCase): self.assertEqual(f.read(), b'1') # When the prefix is included, it is not stripped - self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3link"), + self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3link", **kwargs), "\\\\?\\" + ABSTFN + "3") - self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3.link"), + self.assertPathEqual(ntpath.realpath("\\\\?\\" + ABSTFN + "3.link", **kwargs), "\\\\?\\" + ABSTFN + "3.") @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') def test_realpath_nul(self): tester("ntpath.realpath('NUL')", r'\\.\NUL') + tester("ntpath.realpath('NUL', strict=False)", r'\\.\NUL') + tester("ntpath.realpath('NUL', strict=True)", r'\\.\NUL') + tester("ntpath.realpath('NUL', strict=ALLOW_MISSING)", r'\\.\NUL') @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') @unittest.skipUnless(HAVE_GETSHORTPATHNAME, 'need _getshortpathname') @@ -780,12 +915,20 @@ class TestNtpath(NtpathTestCase): self.assertPathEqual(test_file_long, ntpath.realpath(test_file_short)) - with os_helper.change_cwd(test_dir_long): - self.assertPathEqual(test_file_long, ntpath.realpath("file.txt")) - with os_helper.change_cwd(test_dir_long.lower()): - self.assertPathEqual(test_file_long, ntpath.realpath("file.txt")) - with os_helper.change_cwd(test_dir_short): - self.assertPathEqual(test_file_long, ntpath.realpath("file.txt")) + for kwargs in {}, {'strict': True}, {'strict': ALLOW_MISSING}: + with self.subTest(**kwargs): + with os_helper.change_cwd(test_dir_long): + self.assertPathEqual( + test_file_long, + ntpath.realpath("file.txt", **kwargs)) + with os_helper.change_cwd(test_dir_long.lower()): + self.assertPathEqual( + test_file_long, + ntpath.realpath("file.txt", **kwargs)) + with os_helper.change_cwd(test_dir_short): + self.assertPathEqual( + test_file_long, + ntpath.realpath("file.txt", **kwargs)) @unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname') def test_realpath_permission(self): @@ -806,12 +949,15 @@ class TestNtpath(NtpathTestCase): # Automatic generation of short names may be disabled on # NTFS volumes for the sake of performance. # They're not supported at all on ReFS and exFAT. - subprocess.run( + p = subprocess.run( # Try to set the short name manually. ['fsutil.exe', 'file', 'setShortName', test_file, 'LONGFI~1.TXT'], creationflags=subprocess.DETACHED_PROCESS ) + if p.returncode: + raise unittest.SkipTest('failed to set short name') + try: self.assertPathEqual(test_file, ntpath.realpath(test_file_short)) except AssertionError: diff --git a/Lib/test/test_peepholer.py b/Lib/test/test_peepholer.py index f33de3d420c..ef596630b93 100644 --- a/Lib/test/test_peepholer.py +++ b/Lib/test/test_peepholer.py @@ -2614,6 +2614,90 @@ class OptimizeLoadFastTestCase(DirectCfgOptimizerTests): ] self.cfg_optimization_test(insts, expected, consts=[None]) + def test_format_simple(self): + # FORMAT_SIMPLE will leave its operand on the stack if it's a unicode + # object. We treat it conservatively and assume that it always leaves + # its operand on the stack. + insts = [ + ("LOAD_FAST", 0, 1), + ("FORMAT_SIMPLE", None, 2), + ("STORE_FAST", 1, 3), + ] + self.check(insts, insts) + + insts = [ + ("LOAD_FAST", 0, 1), + ("FORMAT_SIMPLE", None, 2), + ("POP_TOP", None, 3), + ] + expected = [ + ("LOAD_FAST_BORROW", 0, 1), + ("FORMAT_SIMPLE", None, 2), + ("POP_TOP", None, 3), + ] + self.check(insts, expected) + + def test_set_function_attribute(self): + # SET_FUNCTION_ATTRIBUTE leaves the function on the stack + insts = [ + ("LOAD_CONST", 0, 1), + ("LOAD_FAST", 0, 2), + ("SET_FUNCTION_ATTRIBUTE", 2, 3), + ("STORE_FAST", 1, 4), + ("LOAD_CONST", 0, 5), + ("RETURN_VALUE", None, 6) + ] + self.cfg_optimization_test(insts, insts, consts=[None]) + + insts = [ + ("LOAD_CONST", 0, 1), + ("LOAD_FAST", 0, 2), + ("SET_FUNCTION_ATTRIBUTE", 2, 3), + ("RETURN_VALUE", None, 4) + ] + expected = [ + ("LOAD_CONST", 0, 1), + ("LOAD_FAST_BORROW", 0, 2), + ("SET_FUNCTION_ATTRIBUTE", 2, 3), + ("RETURN_VALUE", None, 4) + ] + self.cfg_optimization_test(insts, expected, consts=[None]) + + def test_get_yield_from_iter(self): + # GET_YIELD_FROM_ITER may leave its operand on the stack + insts = [ + ("LOAD_FAST", 0, 1), + ("GET_YIELD_FROM_ITER", None, 2), + ("LOAD_CONST", 0, 3), + send := self.Label(), + ("SEND", end := self.Label(), 5), + ("YIELD_VALUE", 1, 6), + ("RESUME", 2, 7), + ("JUMP", send, 8), + end, + ("END_SEND", None, 9), + ("LOAD_CONST", 0, 10), + ("RETURN_VALUE", None, 11), + ] + self.cfg_optimization_test(insts, insts, consts=[None]) + + def test_push_exc_info(self): + insts = [ + ("LOAD_FAST", 0, 1), + ("PUSH_EXC_INFO", None, 2), + ] + self.check(insts, insts) + + def test_load_special(self): + # LOAD_SPECIAL may leave self on the stack + insts = [ + ("LOAD_FAST", 0, 1), + ("LOAD_SPECIAL", 0, 2), + ("STORE_FAST", 1, 3), + ] + self.check(insts, insts) + + def test_del_in_finally(self): # This loads `obj` onto the stack, executes `del obj`, then returns the # `obj` from the stack. See gh-133371 for more details. @@ -2630,6 +2714,14 @@ class OptimizeLoadFastTestCase(DirectCfgOptimizerTests): gc.collect() self.assertEqual(obj, [42]) + def test_format_simple_unicode(self): + # Repro from gh-134889 + def f(): + var = f"{1}" + var = f"{var}" + return var + self.assertEqual(f(), "1") + if __name__ == "__main__": diff --git a/Lib/test/test_posixpath.py b/Lib/test/test_posixpath.py index f3f9895f529..21f06712548 100644 --- a/Lib/test/test_posixpath.py +++ b/Lib/test/test_posixpath.py @@ -4,7 +4,8 @@ import posixpath import random import sys import unittest -from posixpath import realpath, abspath, dirname, basename +from functools import partial +from posixpath import realpath, abspath, dirname, basename, ALLOW_MISSING from test import support from test import test_genericpath from test.support import import_helper @@ -33,6 +34,11 @@ def skip_if_ABSTFN_contains_backslash(test): msg = "ABSTFN is not a posix path - tests fail" return [test, unittest.skip(msg)(test)][found_backslash] + +def _parameterize(*parameters): + return support.subTests('kwargs', parameters) + + class PosixPathTest(unittest.TestCase): def setUp(self): @@ -442,32 +448,35 @@ class PosixPathTest(unittest.TestCase): self.assertEqual(result, expected) @skip_if_ABSTFN_contains_backslash - def test_realpath_curdir(self): - self.assertEqual(realpath('.'), os.getcwd()) - self.assertEqual(realpath('./.'), os.getcwd()) - self.assertEqual(realpath('/'.join(['.'] * 100)), os.getcwd()) + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_curdir(self, kwargs): + self.assertEqual(realpath('.', **kwargs), os.getcwd()) + self.assertEqual(realpath('./.', **kwargs), os.getcwd()) + self.assertEqual(realpath('/'.join(['.'] * 100), **kwargs), os.getcwd()) - self.assertEqual(realpath(b'.'), os.getcwdb()) - self.assertEqual(realpath(b'./.'), os.getcwdb()) - self.assertEqual(realpath(b'/'.join([b'.'] * 100)), os.getcwdb()) + self.assertEqual(realpath(b'.', **kwargs), os.getcwdb()) + self.assertEqual(realpath(b'./.', **kwargs), os.getcwdb()) + self.assertEqual(realpath(b'/'.join([b'.'] * 100), **kwargs), os.getcwdb()) @skip_if_ABSTFN_contains_backslash - def test_realpath_pardir(self): - self.assertEqual(realpath('..'), dirname(os.getcwd())) - self.assertEqual(realpath('../..'), dirname(dirname(os.getcwd()))) - self.assertEqual(realpath('/'.join(['..'] * 100)), '/') + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_pardir(self, kwargs): + self.assertEqual(realpath('..', **kwargs), dirname(os.getcwd())) + self.assertEqual(realpath('../..', **kwargs), dirname(dirname(os.getcwd()))) + self.assertEqual(realpath('/'.join(['..'] * 100), **kwargs), '/') - self.assertEqual(realpath(b'..'), dirname(os.getcwdb())) - self.assertEqual(realpath(b'../..'), dirname(dirname(os.getcwdb()))) - self.assertEqual(realpath(b'/'.join([b'..'] * 100)), b'/') + self.assertEqual(realpath(b'..', **kwargs), dirname(os.getcwdb())) + self.assertEqual(realpath(b'../..', **kwargs), dirname(dirname(os.getcwdb()))) + self.assertEqual(realpath(b'/'.join([b'..'] * 100), **kwargs), b'/') @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_basic(self): + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_basic(self, kwargs): # Basic operation. try: os.symlink(ABSTFN+"1", ABSTFN) - self.assertEqual(realpath(ABSTFN), ABSTFN+"1") + self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1") finally: os_helper.unlink(ABSTFN) @@ -487,90 +496,115 @@ class PosixPathTest(unittest.TestCase): path = '/\x00' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) path = b'/\x00' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) path = '/nonexistent/x\x00' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) path = b'/nonexistent/x\x00' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) path = '/\x00/..' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) path = b'/\x00/..' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + path = '/nonexistent/x\x00/..' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) path = b'/nonexistent/x\x00/..' self.assertRaises(ValueError, realpath, path, strict=False) self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) path = '/\udfff' if sys.platform == 'win32': self.assertEqual(realpath(path, strict=False), path) self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), path) else: self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) path = '/nonexistent/\udfff' if sys.platform == 'win32': self.assertEqual(realpath(path, strict=False), path) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), path) else: self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) self.assertRaises(FileNotFoundError, realpath, path, strict=True) path = '/\udfff/..' if sys.platform == 'win32': self.assertEqual(realpath(path, strict=False), '/') self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/') else: self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) path = '/nonexistent/\udfff/..' if sys.platform == 'win32': self.assertEqual(realpath(path, strict=False), '/nonexistent') + self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/nonexistent') else: self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) self.assertRaises(FileNotFoundError, realpath, path, strict=True) path = b'/\xff' if sys.platform == 'win32': self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING) else: self.assertEqual(realpath(path, strict=False), path) if support.is_wasi: self.assertRaises(OSError, realpath, path, strict=True) + self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING) else: self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), path) path = b'/nonexistent/\xff' if sys.platform == 'win32': self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING) else: self.assertEqual(realpath(path, strict=False), path) if support.is_wasi: self.assertRaises(OSError, realpath, path, strict=True) + self.assertRaises(OSError, realpath, path, strict=ALLOW_MISSING) else: self.assertRaises(FileNotFoundError, realpath, path, strict=True) @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_relative(self): + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_relative(self, kwargs): try: os.symlink(posixpath.relpath(ABSTFN+"1"), ABSTFN) - self.assertEqual(realpath(ABSTFN), ABSTFN+"1") + self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1") finally: os_helper.unlink(ABSTFN) @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_missing_pardir(self): + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_missing_pardir(self, kwargs): try: os.symlink(TESTFN + "1", TESTFN) - self.assertEqual(realpath("nonexistent/../" + TESTFN), ABSTFN + "1") + self.assertEqual( + realpath("nonexistent/../" + TESTFN, **kwargs), ABSTFN + "1") finally: os_helper.unlink(TESTFN) @@ -617,37 +651,38 @@ class PosixPathTest(unittest.TestCase): @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_symlink_loops_strict(self): + @_parameterize({'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_symlink_loops_strict(self, kwargs): # Bug #43757, raise OSError if we get into an infinite symlink loop in - # strict mode. + # the strict modes. try: os.symlink(ABSTFN, ABSTFN) - self.assertRaises(OSError, realpath, ABSTFN, strict=True) + self.assertRaises(OSError, realpath, ABSTFN, **kwargs) os.symlink(ABSTFN+"1", ABSTFN+"2") os.symlink(ABSTFN+"2", ABSTFN+"1") - self.assertRaises(OSError, realpath, ABSTFN+"1", strict=True) - self.assertRaises(OSError, realpath, ABSTFN+"2", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"1", **kwargs) + self.assertRaises(OSError, realpath, ABSTFN+"2", **kwargs) - self.assertRaises(OSError, realpath, ABSTFN+"1/x", strict=True) - self.assertRaises(OSError, realpath, ABSTFN+"1/..", strict=True) - self.assertRaises(OSError, realpath, ABSTFN+"1/../x", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"1/x", **kwargs) + self.assertRaises(OSError, realpath, ABSTFN+"1/..", **kwargs) + self.assertRaises(OSError, realpath, ABSTFN+"1/../x", **kwargs) os.symlink(ABSTFN+"x", ABSTFN+"y") self.assertRaises(OSError, realpath, - ABSTFN+"1/../" + basename(ABSTFN) + "y", strict=True) + ABSTFN+"1/../" + basename(ABSTFN) + "y", **kwargs) self.assertRaises(OSError, realpath, - ABSTFN+"1/../" + basename(ABSTFN) + "1", strict=True) + ABSTFN+"1/../" + basename(ABSTFN) + "1", **kwargs) os.symlink(basename(ABSTFN) + "a/b", ABSTFN+"a") - self.assertRaises(OSError, realpath, ABSTFN+"a", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"a", **kwargs) os.symlink("../" + basename(dirname(ABSTFN)) + "/" + basename(ABSTFN) + "c", ABSTFN+"c") - self.assertRaises(OSError, realpath, ABSTFN+"c", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"c", **kwargs) # Test using relative path as well. with os_helper.change_cwd(dirname(ABSTFN)): - self.assertRaises(OSError, realpath, basename(ABSTFN), strict=True) + self.assertRaises(OSError, realpath, basename(ABSTFN), **kwargs) finally: os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN+"1") @@ -658,13 +693,14 @@ class PosixPathTest(unittest.TestCase): @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_repeated_indirect_symlinks(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_repeated_indirect_symlinks(self, kwargs): # Issue #6975. try: os.mkdir(ABSTFN) os.symlink('../' + basename(ABSTFN), ABSTFN + '/self') os.symlink('self/self/self', ABSTFN + '/link') - self.assertEqual(realpath(ABSTFN + '/link'), ABSTFN) + self.assertEqual(realpath(ABSTFN + '/link', **kwargs), ABSTFN) finally: os_helper.unlink(ABSTFN + '/self') os_helper.unlink(ABSTFN + '/link') @@ -672,14 +708,15 @@ class PosixPathTest(unittest.TestCase): @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_deep_recursion(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_deep_recursion(self, kwargs): depth = 10 try: os.mkdir(ABSTFN) for i in range(depth): os.symlink('/'.join(['%d' % i] * 10), ABSTFN + '/%d' % (i + 1)) os.symlink('.', ABSTFN + '/0') - self.assertEqual(realpath(ABSTFN + '/%d' % depth), ABSTFN) + self.assertEqual(realpath(ABSTFN + '/%d' % depth, **kwargs), ABSTFN) # Test using relative path as well. with os_helper.change_cwd(ABSTFN): @@ -691,7 +728,8 @@ class PosixPathTest(unittest.TestCase): @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_resolve_parents(self): + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_resolve_parents(self, kwargs): # We also need to resolve any symlinks in the parents of a relative # path passed to realpath. E.g.: current working directory is # /usr/doc with 'doc' being a symlink to /usr/share/doc. We call @@ -702,7 +740,8 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN + "/y", ABSTFN + "/k") with os_helper.change_cwd(ABSTFN + "/k"): - self.assertEqual(realpath("a"), ABSTFN + "/y/a") + self.assertEqual(realpath("a", **kwargs), + ABSTFN + "/y/a") finally: os_helper.unlink(ABSTFN + "/k") os_helper.rmdir(ABSTFN + "/y") @@ -710,7 +749,8 @@ class PosixPathTest(unittest.TestCase): @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_resolve_before_normalizing(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_resolve_before_normalizing(self, kwargs): # Bug #990669: Symbolic links should be resolved before we # normalize the path. E.g.: if we have directories 'a', 'k' and 'y' # in the following hierarchy: @@ -725,10 +765,10 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y") # Absolute path. - self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k") + self.assertEqual(realpath(ABSTFN + "/link-y/..", **kwargs), ABSTFN + "/k") # Relative path. with os_helper.change_cwd(dirname(ABSTFN)): - self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."), + self.assertEqual(realpath(basename(ABSTFN) + "/link-y/..", **kwargs), ABSTFN + "/k") finally: os_helper.unlink(ABSTFN + "/link-y") @@ -738,7 +778,8 @@ class PosixPathTest(unittest.TestCase): @os_helper.skip_unless_symlink @skip_if_ABSTFN_contains_backslash - def test_realpath_resolve_first(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_resolve_first(self, kwargs): # Bug #1213894: The first component of the path, if not absolute, # must be resolved too. @@ -748,8 +789,8 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN, ABSTFN + "link") with os_helper.change_cwd(dirname(ABSTFN)): base = basename(ABSTFN) - self.assertEqual(realpath(base + "link"), ABSTFN) - self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k") + self.assertEqual(realpath(base + "link", **kwargs), ABSTFN) + self.assertEqual(realpath(base + "link/k", **kwargs), ABSTFN + "/k") finally: os_helper.unlink(ABSTFN + "link") os_helper.rmdir(ABSTFN + "/k") @@ -767,12 +808,67 @@ class PosixPathTest(unittest.TestCase): self.assertEqual(realpath(ABSTFN + '/foo'), ABSTFN + '/foo') self.assertEqual(realpath(ABSTFN + '/../foo'), dirname(ABSTFN) + '/foo') self.assertEqual(realpath(ABSTFN + '/foo/..'), ABSTFN) - with self.assertRaises(PermissionError): - realpath(ABSTFN, strict=True) finally: os.chmod(ABSTFN, 0o755, follow_symlinks=False) os_helper.unlink(ABSTFN) + @os_helper.skip_unless_symlink + @skip_if_ABSTFN_contains_backslash + @unittest.skipIf(os.chmod not in os.supports_follow_symlinks, "Can't set symlink permissions") + @unittest.skipIf(sys.platform != "darwin", "only macOS requires read permission to readlink()") + @_parameterize({'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_unreadable_symlink_strict(self, kwargs): + try: + os.symlink(ABSTFN+"1", ABSTFN) + os.chmod(ABSTFN, 0o000, follow_symlinks=False) + with self.assertRaises(PermissionError): + realpath(ABSTFN, **kwargs) + with self.assertRaises(PermissionError): + realpath(ABSTFN + '/foo', **kwargs), + with self.assertRaises(PermissionError): + realpath(ABSTFN + '/../foo', **kwargs) + with self.assertRaises(PermissionError): + realpath(ABSTFN + '/foo/..', **kwargs) + finally: + os.chmod(ABSTFN, 0o755, follow_symlinks=False) + os.unlink(ABSTFN) + + @skip_if_ABSTFN_contains_backslash + @os_helper.skip_unless_symlink + def test_realpath_unreadable_directory(self): + try: + os.mkdir(ABSTFN) + os.mkdir(ABSTFN + '/k') + os.chmod(ABSTFN, 0o000) + self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN) + self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN) + self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN) + + try: + os.stat(ABSTFN) + except PermissionError: + pass + else: + self.skipTest('Cannot block permissions') + + self.assertEqual(realpath(ABSTFN + '/k', strict=False), + ABSTFN + '/k') + self.assertRaises(PermissionError, realpath, ABSTFN + '/k', + strict=True) + self.assertRaises(PermissionError, realpath, ABSTFN + '/k', + strict=ALLOW_MISSING) + + self.assertEqual(realpath(ABSTFN + '/missing', strict=False), + ABSTFN + '/missing') + self.assertRaises(PermissionError, realpath, ABSTFN + '/missing', + strict=True) + self.assertRaises(PermissionError, realpath, ABSTFN + '/missing', + strict=ALLOW_MISSING) + finally: + os.chmod(ABSTFN, 0o755) + os_helper.rmdir(ABSTFN + '/k') + os_helper.rmdir(ABSTFN) + @skip_if_ABSTFN_contains_backslash def test_realpath_nonterminal_file(self): try: @@ -780,14 +876,27 @@ class PosixPathTest(unittest.TestCase): f.write('test_posixpath wuz ere') self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN) self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN) + self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN) + self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN)) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "/subdir") self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", + strict=ALLOW_MISSING) finally: os_helper.unlink(ABSTFN) @@ -800,14 +909,27 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN + "1", ABSTFN) self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "1") self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "1") + self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN + "1") + self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "1") self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "1") self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN)) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "1/subdir") self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", + strict=ALLOW_MISSING) finally: os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN + "1") @@ -822,14 +944,27 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN + "1", ABSTFN) self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN + "2") self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "2") + self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN + "2") + self.assertEqual(realpath(ABSTFN + "/", strict=False), ABSTFN + "2") self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/.", strict=False), ABSTFN + "2") self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/.", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/..", strict=False), dirname(ABSTFN)) self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/..", + strict=ALLOW_MISSING) + self.assertEqual(realpath(ABSTFN + "/subdir", strict=False), ABSTFN + "2/subdir") self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", strict=True) + self.assertRaises(NotADirectoryError, realpath, ABSTFN + "/subdir", + strict=ALLOW_MISSING) finally: os_helper.unlink(ABSTFN) os_helper.unlink(ABSTFN + "1") @@ -1017,9 +1152,12 @@ class PathLikeTests(unittest.TestCase): def test_path_abspath(self): self.assertPathEqual(self.path.abspath) - def test_path_realpath(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_path_realpath(self, kwargs): self.assertPathEqual(self.path.realpath) + self.assertPathEqual(partial(self.path.realpath, **kwargs)) + def test_path_relpath(self): self.assertPathEqual(self.path.relpath) diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py index aa3a592766d..98bae7dd703 100644 --- a/Lib/test/test_pyrepl/test_pyrepl.py +++ b/Lib/test/test_pyrepl/test_pyrepl.py @@ -1672,6 +1672,17 @@ class TestMain(ReplTestCase): self.assertEqual(exit_code, 0) self.assertNotIn("TypeError", output) + @force_not_colorized + def test_non_string_suggestion_candidates(self): + commands = ("import runpy\n" + "runpy._run_module_code('blech', {0: '', 'bluch': ''}, '')\n" + "exit()\n") + + output, exit_code = self.run_repl(commands) + self.assertEqual(exit_code, 0) + self.assertNotIn("all elements in 'candidates' must be strings", output) + self.assertIn("bluch", output) + def test_readline_history_file(self): # skip, if readline module is not available readline = import_module('readline') diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 7f4fe357034..c855fb8fe2b 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -6,7 +6,7 @@ import threading import time import unittest import weakref -from test.support import gc_collect +from test.support import gc_collect, bigmemtest from test.support import import_helper from test.support import threading_helper @@ -963,33 +963,33 @@ class BaseSimpleQueueTest: # One producer, one consumer => results appended in well-defined order self.assertEqual(results, inputs) - def test_many_threads(self): + @bigmemtest(size=50, memuse=100*2**20, dry_run=False) + def test_many_threads(self, size): # Test multiple concurrent put() and get() - N = 50 q = self.q inputs = list(range(10000)) - results = self.run_threads(N, q, inputs, self.feed, self.consume) + results = self.run_threads(size, q, inputs, self.feed, self.consume) # Multiple consumers without synchronization append the # results in random order self.assertEqual(sorted(results), inputs) - def test_many_threads_nonblock(self): + @bigmemtest(size=50, memuse=100*2**20, dry_run=False) + def test_many_threads_nonblock(self, size): # Test multiple concurrent put() and get(block=False) - N = 50 q = self.q inputs = list(range(10000)) - results = self.run_threads(N, q, inputs, + results = self.run_threads(size, q, inputs, self.feed, self.consume_nonblock) self.assertEqual(sorted(results), inputs) - def test_many_threads_timeout(self): + @bigmemtest(size=50, memuse=100*2**20, dry_run=False) + def test_many_threads_timeout(self, size): # Test multiple concurrent put() and get(timeout=...) - N = 50 q = self.q inputs = list(range(1000)) - results = self.run_threads(N, q, inputs, + results = self.run_threads(size, q, inputs, self.feed, self.consume_timeout) self.assertEqual(sorted(results), inputs) diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py index bd76d636e4f..54910cd8054 100644 --- a/Lib/test/test_random.py +++ b/Lib/test/test_random.py @@ -392,6 +392,8 @@ class TestBasicOps: self.assertRaises(TypeError, self.gen.getrandbits) self.assertRaises(TypeError, self.gen.getrandbits, 1, 2) self.assertRaises(ValueError, self.gen.getrandbits, -1) + self.assertRaises(OverflowError, self.gen.getrandbits, 1<<1000) + self.assertRaises(ValueError, self.gen.getrandbits, -1<<1000) self.assertRaises(TypeError, self.gen.getrandbits, 10.1) def test_pickling(self): @@ -435,6 +437,8 @@ class TestBasicOps: self.assertRaises(TypeError, self.gen.randbytes) self.assertRaises(TypeError, self.gen.randbytes, 1, 2) self.assertRaises(ValueError, self.gen.randbytes, -1) + self.assertRaises(OverflowError, self.gen.randbytes, 1<<1000) + self.assertRaises((ValueError, OverflowError), self.gen.randbytes, -1<<1000) self.assertRaises(TypeError, self.gen.randbytes, 1.0) def test_mu_sigma_default_args(self): @@ -806,6 +810,22 @@ class MersenneTwister_TestBasicOps(TestBasicOps, unittest.TestCase): self.assertEqual(self.gen.getrandbits(100), 97904845777343510404718956115) + def test_getrandbits_2G_bits(self): + size = 2**31 + self.gen.seed(1234567) + x = self.gen.getrandbits(size) + self.assertEqual(x.bit_length(), size) + self.assertEqual(x & (2**100-1), 890186470919986886340158459475) + self.assertEqual(x >> (size-100), 1226514312032729439655761284440) + + @support.bigmemtest(size=2**32, memuse=1/8+2/15, dry_run=False) + def test_getrandbits_4G_bits(self, size): + self.gen.seed(1234568) + x = self.gen.getrandbits(size) + self.assertEqual(x.bit_length(), size) + self.assertEqual(x & (2**100-1), 287241425661104632871036099814) + self.assertEqual(x >> (size-100), 739728759900339699429794460738) + def test_randrange_uses_getrandbits(self): # Verify use of getrandbits by randrange # Use same seed as in the cross-platform repeatability test @@ -962,6 +982,14 @@ class MersenneTwister_TestBasicOps(TestBasicOps, unittest.TestCase): self.assertEqual(self.gen.randbytes(n), gen2.getrandbits(n * 8).to_bytes(n, 'little')) + @support.bigmemtest(size=2**29, memuse=1+16/15, dry_run=False) + def test_randbytes_256M(self, size): + self.gen.seed(2849427419) + x = self.gen.randbytes(size) + self.assertEqual(len(x), size) + self.assertEqual(x[:12].hex(), 'f6fd9ae63855ab91ea238b4f') + self.assertEqual(x[-12:].hex(), '0e7af69a84ee99bf4a11becc') + def test_sample_counts_equivalence(self): # Test the documented strong equivalence to a sample with repeated elements. # We run this test on random.Random() which makes deterministic selections diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py index 8f4fc09442e..a43d2678ebd 100644 --- a/Lib/test/test_regrtest.py +++ b/Lib/test/test_regrtest.py @@ -768,13 +768,16 @@ class BaseTestCase(unittest.TestCase): self.fail(msg) return proc - def run_python(self, args, **kw): + def run_python(self, args, isolated=True, **kw): extraargs = [] if 'uops' in sys._xoptions: # Pass -X uops along extraargs.extend(['-X', 'uops']) - args = [sys.executable, *extraargs, '-X', 'faulthandler', '-I', *args] - proc = self.run_command(args, **kw) + cmd = [sys.executable, *extraargs, '-X', 'faulthandler'] + if isolated: + cmd.append('-I') + cmd.extend(args) + proc = self.run_command(cmd, **kw) return proc.stdout @@ -831,8 +834,8 @@ class ProgramsTestCase(BaseTestCase): self.check_executed_tests(output, self.tests, randomize=True, stats=len(self.tests)) - def run_tests(self, args, env=None): - output = self.run_python(args, env=env) + def run_tests(self, args, env=None, isolated=True): + output = self.run_python(args, env=env, isolated=isolated) self.check_output(output) def test_script_regrtest(self): @@ -2067,7 +2070,7 @@ class ArgsTestCase(BaseTestCase): self.check_executed_tests(output, [testname], failed=[testname], parallel=True, - stats=TestStats(1, 1, 0)) + stats=TestStats(1, 2, 1)) def _check_random_seed(self, run_workers: bool): # gh-109276: When -r/--randomize is used, random.seed() is called @@ -2276,7 +2279,6 @@ class ArgsTestCase(BaseTestCase): def test_xml(self): code = textwrap.dedent(r""" import unittest - from test import support class VerboseTests(unittest.TestCase): def test_failed(self): @@ -2311,6 +2313,39 @@ class ArgsTestCase(BaseTestCase): for out in testcase.iter('system-out'): self.assertEqual(out.text, r"abc \x1b def") + def test_nonascii(self): + code = textwrap.dedent(r""" + import unittest + + class NonASCIITests(unittest.TestCase): + def test_docstring(self): + '''docstring:\u20ac''' + + def test_subtest(self): + with self.subTest(param='subtest:\u20ac'): + pass + + def test_skip(self): + self.skipTest('skipped:\u20ac') + """) + testname = self.create_test(code=code) + + env = dict(os.environ) + env['PYTHONIOENCODING'] = 'ascii' + + def check(output): + self.check_executed_tests(output, testname, stats=TestStats(3, 0, 1)) + self.assertIn(r'docstring:\u20ac', output) + self.assertIn(r'skipped:\u20ac', output) + + # Run sequentially + output = self.run_tests('-v', testname, env=env, isolated=False) + check(output) + + # Run in parallel + output = self.run_tests('-j1', '-v', testname, env=env, isolated=False) + check(output) + class TestUtils(unittest.TestCase): def test_format_duration(self): diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py index 37e0f74f688..7f0b0f36505 100644 --- a/Lib/test/test_sqlite3/test_cli.py +++ b/Lib/test/test_sqlite3/test_cli.py @@ -1,14 +1,19 @@ """sqlite3 CLI tests.""" import sqlite3 +import sys +import textwrap import unittest from sqlite3.__main__ import main as cli +from test.support.import_helper import import_module from test.support.os_helper import TESTFN, unlink +from test.support.pty_helper import run_pty from test.support import ( captured_stdout, captured_stderr, captured_stdin, force_not_colorized_test_class, + requires_subprocess, ) @@ -200,5 +205,98 @@ class InteractiveSession(unittest.TestCase): self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: ' '\x1b[35mnear "sel": syntax error\x1b[0m', err) + +@requires_subprocess() +@force_not_colorized_test_class +class Completion(unittest.TestCase): + PS1 = "sqlite> " + + @classmethod + def setUpClass(cls): + _sqlite3 = import_module("_sqlite3") + if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): + raise unittest.SkipTest("unable to determine SQLite keywords") + + readline = import_module("readline") + if readline.backend == "editline": + raise unittest.SkipTest("libedit readline is not supported") + + def write_input(self, input_, env=None): + script = textwrap.dedent(""" + import readline + from sqlite3.__main__ import main + + readline.parse_and_bind("set colored-completion-prefix off") + main() + """) + return run_pty(script, input_, env) + + def test_complete_sql_keywords(self): + # List candidates starting with 'S', there should be multiple matches. + input_ = b"S\t\tEL\t 1;\n.quit\n" + output = self.write_input(input_) + self.assertIn(b"SELECT", output) + self.assertIn(b"SET", output) + self.assertIn(b"SAVEPOINT", output) + self.assertIn(b"(1,)", output) + + # Keywords are completed in upper case for even lower case user input. + input_ = b"sel\t\t 1;\n.quit\n" + output = self.write_input(input_) + self.assertIn(b"SELECT", output) + self.assertIn(b"(1,)", output) + + @unittest.skipIf(sys.platform.startswith("freebsd"), + "Two actual tabs are inserted when there are no matching" + " completions in the pseudo-terminal opened by run_pty()" + " on FreeBSD") + def test_complete_no_match(self): + input_ = b"xyzzy\t\t\b\b\b\b\b\b\b.quit\n" + # Set NO_COLOR to disable coloring for self.PS1. + output = self.write_input(input_, env={"NO_COLOR": "1"}) + lines = output.decode().splitlines() + indices = ( + i for i, line in enumerate(lines, 1) + if line.startswith(f"{self.PS1}xyzzy") + ) + line_num = next(indices, -1) + self.assertNotEqual(line_num, -1) + # Completions occupy lines, assert no extra lines when there is nothing + # to complete. + self.assertEqual(line_num, len(lines)) + + def test_complete_no_input(self): + from _sqlite3 import SQLITE_KEYWORDS + + script = textwrap.dedent(""" + import readline + from sqlite3.__main__ import main + + # Configure readline to ...: + # - hide control sequences surrounding each candidate + # - hide "Display all xxx possibilities? (y or n)" + # - hide "--More--" + # - show candidates one per line + readline.parse_and_bind("set colored-completion-prefix off") + readline.parse_and_bind("set colored-stats off") + readline.parse_and_bind("set completion-query-items 0") + readline.parse_and_bind("set page-completions off") + readline.parse_and_bind("set completion-display-width 0") + + main() + """) + input_ = b"\t\t.quit\n" + output = run_pty(script, input_, env={"NO_COLOR": "1"}) + lines = output.decode().splitlines() + indices = [ + i for i, line in enumerate(lines) + if line.startswith(self.PS1) + ] + self.assertEqual(len(indices), 2) + start, end = indices + candidates = [l.strip() for l in lines[start+1:end]] + self.assertEqual(candidates, sorted(SQLITE_KEYWORDS)) + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 2767a53d53c..f123f6ece40 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -31,6 +31,7 @@ import weakref import platform import sysconfig import functools +from contextlib import nullcontext try: import ctypes except ImportError: @@ -2843,6 +2844,7 @@ class ThreadedTests(unittest.TestCase): # See GH-124984: OpenSSL is not thread safe. threads = [] + warnings_filters = sys.flags.context_aware_warnings global USE_SAME_TEST_CONTEXT USE_SAME_TEST_CONTEXT = True try: @@ -2851,7 +2853,10 @@ class ThreadedTests(unittest.TestCase): self.test_alpn_protocols, self.test_getpeercert, self.test_crl_check, - self.test_check_hostname_idn, + functools.partial( + self.test_check_hostname_idn, + warnings_filters=warnings_filters, + ), self.test_wrong_cert_tls12, self.test_wrong_cert_tls13, ): @@ -3097,7 +3102,7 @@ class ThreadedTests(unittest.TestCase): cipher = s.cipher()[0].split('-') self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA')) - def test_check_hostname_idn(self): + def test_check_hostname_idn(self, warnings_filters=True): if support.verbose: sys.stdout.write("\n") @@ -3152,16 +3157,30 @@ class ThreadedTests(unittest.TestCase): server_hostname="python.example.org") as s: with self.assertRaises(ssl.CertificateError): s.connect((HOST, server.port)) - with ThreadedEchoServer(context=server_context, chatty=True) as server: - with warnings_helper.check_no_resource_warning(self): - with self.assertRaises(UnicodeError): - context.wrap_socket(socket.socket(), - server_hostname='.pythontest.net') - with ThreadedEchoServer(context=server_context, chatty=True) as server: - with warnings_helper.check_no_resource_warning(self): - with self.assertRaises(UnicodeDecodeError): - context.wrap_socket(socket.socket(), - server_hostname=b'k\xf6nig.idn.pythontest.net') + with ( + ThreadedEchoServer(context=server_context, chatty=True) as server, + ( + warnings_helper.check_no_resource_warning(self) + if warnings_filters + else nullcontext() + ), + self.assertRaises(UnicodeError), + ): + context.wrap_socket(socket.socket(), server_hostname='.pythontest.net') + + with ( + ThreadedEchoServer(context=server_context, chatty=True) as server, + ( + warnings_helper.check_no_resource_warning(self) + if warnings_filters + else nullcontext() + ), + self.assertRaises(UnicodeDecodeError), + ): + context.wrap_socket( + socket.socket(), + server_hostname=b'k\xf6nig.idn.pythontest.net', + ) def test_wrong_cert_tls12(self): """Connecting when the server rejects the client's certificate diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py index c7ac7914158..13aaba405e3 100644 --- a/Lib/test/test_syntax.py +++ b/Lib/test/test_syntax.py @@ -1436,17 +1436,17 @@ Regression tests for gh-133999: >>> try: pass ... except TypeError as name: raise from None Traceback (most recent call last): - SyntaxError: invalid syntax + SyntaxError: did you forget an expression between 'raise' and 'from'? >>> try: pass ... except* TypeError as name: raise from None Traceback (most recent call last): - SyntaxError: invalid syntax + SyntaxError: did you forget an expression between 'raise' and 'from'? >>> match 1: ... case 1 | 2 as abc: raise from None Traceback (most recent call last): - SyntaxError: invalid syntax + SyntaxError: did you forget an expression between 'raise' and 'from'? Ensure that early = are not matched by the parser as invalid comparisons >>> f(2, 4, x=34); 1 $ 2 @@ -1695,6 +1695,28 @@ Make sure that the old "raise X, Y[, Z]" form is gone: ... SyntaxError: invalid syntax +Better errors for `raise` statement: + + >>> raise ValueError from + Traceback (most recent call last): + SyntaxError: did you forget an expression after 'from'? + + >>> raise mod.ValueError() from + Traceback (most recent call last): + SyntaxError: did you forget an expression after 'from'? + + >>> raise from exc + Traceback (most recent call last): + SyntaxError: did you forget an expression between 'raise' and 'from'? + + >>> raise from None + Traceback (most recent call last): + SyntaxError: did you forget an expression between 'raise' and 'from'? + + >>> raise from + Traceback (most recent call last): + SyntaxError: did you forget an expression between 'raise' and 'from'? + Check that an multiple exception types with missing parentheses raise a custom exception only when using 'as' diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 795d1ecbb59..bf415894903 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -729,7 +729,7 @@ class SysModuleTest(unittest.TestCase): info = sys.thread_info self.assertEqual(len(info), 3) self.assertIn(info.name, ('nt', 'pthread', 'pthread-stubs', 'solaris', None)) - self.assertIn(info.lock, ('semaphore', 'mutex+cond', None)) + self.assertIn(info.lock, ('pymutex', None)) if sys.platform.startswith(("linux", "android", "freebsd")): self.assertEqual(info.name, "pthread") elif sys.platform == "win32": @@ -1135,23 +1135,12 @@ class SysModuleTest(unittest.TestCase): b = sys.getallocatedblocks() self.assertLessEqual(b, a) try: - # While we could imagine a Python session where the number of - # multiple buffer objects would exceed the sharing of references, - # it is unlikely to happen in a normal test run. - # - # In free-threaded builds each code object owns an array of - # pointers to copies of the bytecode. When the number of - # code objects is a large fraction of the total number of - # references, this can cause the total number of allocated - # blocks to exceed the total number of references. - # - # For some reason, iOS seems to trigger the "unlikely to happen" - # case reliably under CI conditions. It's not clear why; but as - # this test is checking the behavior of getallocatedblock() - # under garbage collection, we can skip this pre-condition check - # for now. See GH-130384. - if not support.Py_GIL_DISABLED and not support.is_apple_mobile: - self.assertLess(a, sys.gettotalrefcount()) + # The reported blocks will include immortalized strings, but the + # total ref count will not. This will sanity check that among all + # other objects (those eligible for garbage collection) there + # are more references being tracked than allocated blocks. + interned_immortal = sys.getunicodeinternedsize(_only_immortal=True) + self.assertLess(a - interned_immortal, sys.gettotalrefcount()) except AttributeError: # gettotalrefcount() not available pass @@ -1299,6 +1288,7 @@ class SysModuleTest(unittest.TestCase): for name in sys.stdlib_module_names: self.assertIsInstance(name, str) + @unittest.skipUnless(hasattr(sys, '_stdlib_dir'), 'need sys._stdlib_dir') def test_stdlib_dir(self): os = import_helper.import_fresh_module('os') marker = getattr(os, '__file__', None) diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index cf218a2bf14..7055e1ed147 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -2715,6 +2715,31 @@ class MiscTest(unittest.TestCase): str(excinfo.exception), ) + @unittest.skipUnless(os_helper.can_symlink(), 'requires symlink support') + @unittest.skipUnless(hasattr(os, 'chmod'), "missing os.chmod") + @unittest.mock.patch('os.chmod') + def test_deferred_directory_attributes_update(self, mock_chmod): + # Regression test for gh-127987: setting attributes on arbitrary files + tempdir = os.path.join(TEMPDIR, 'test127987') + def mock_chmod_side_effect(path, mode, **kwargs): + target_path = os.path.realpath(path) + if os.path.commonpath([target_path, tempdir]) != tempdir: + raise Exception("should not try to chmod anything outside the destination", target_path) + mock_chmod.side_effect = mock_chmod_side_effect + + outside_tree_dir = os.path.join(TEMPDIR, 'outside_tree_dir') + with ArchiveMaker() as arc: + arc.add('x', symlink_to='.') + arc.add('x', type=tarfile.DIRTYPE, mode='?rwsrwsrwt') + arc.add('x', symlink_to=outside_tree_dir) + + os.makedirs(outside_tree_dir) + try: + arc.open().extractall(path=tempdir, filter='tar') + finally: + os_helper.rmtree(outside_tree_dir) + os_helper.rmtree(tempdir) + class CommandLineTest(unittest.TestCase): @@ -3275,6 +3300,10 @@ class NoneInfoExtractTests(ReadTest): got_paths = set( p.relative_to(directory) for p in pathlib.Path(directory).glob('**/*')) + if self.extraction_filter in (None, 'data'): + # The 'data' filter is expected to reject special files + for path in 'ustar/fifotype', 'ustar/blktype', 'ustar/chrtype': + got_paths.discard(pathlib.Path(path)) self.assertEqual(self.control_paths, got_paths) @contextmanager @@ -3504,12 +3533,28 @@ class ArchiveMaker: self.bio = None def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, - mode=None, size=None, **kwargs): - """Add a member to the test archive. Call within `with`.""" + mode=None, size=None, content=None, **kwargs): + """Add a member to the test archive. Call within `with`. + + Provides many shortcuts: + - default `type` is based on symlink_to, hardlink_to, and trailing `/` + in name (which is stripped) + - size & content defaults are based on each other + - content can be str or bytes + - mode should be textual ('-rwxrwxrwx') + + (add more! this is unstable internal test-only API) + """ name = str(name) tarinfo = tarfile.TarInfo(name).replace(**kwargs) + if content is not None: + if isinstance(content, str): + content = content.encode() + size = len(content) if size is not None: tarinfo.size = size + if content is None: + content = bytes(tarinfo.size) if mode: tarinfo.mode = _filemode_to_int(mode) if symlink_to is not None: @@ -3523,7 +3568,7 @@ class ArchiveMaker: if type is not None: tarinfo.type = type if tarinfo.isreg(): - fileobj = io.BytesIO(bytes(tarinfo.size)) + fileobj = io.BytesIO(content) else: fileobj = None self.tar_w.addfile(tarinfo, fileobj) @@ -3557,7 +3602,7 @@ class TestExtractionFilters(unittest.TestCase): destdir = outerdir / 'dest' @contextmanager - def check_context(self, tar, filter): + def check_context(self, tar, filter, *, check_flag=True): """Extracts `tar` to `self.destdir` and allows checking the result If an error occurs, it must be checked using `expect_exception` @@ -3566,27 +3611,40 @@ class TestExtractionFilters(unittest.TestCase): except the destination directory itself and parent directories of other files. When checking directories, do so before their contents. + + A file called 'flag' is made in outerdir (i.e. outside destdir) + before extraction; it should not be altered nor should its contents + be read/copied. """ with os_helper.temp_dir(self.outerdir): + flag_path = self.outerdir / 'flag' + flag_path.write_text('capture me') try: tar.extractall(self.destdir, filter=filter) except Exception as exc: self.raised_exception = exc + self.reraise_exception = True self.expected_paths = set() else: self.raised_exception = None + self.reraise_exception = False self.expected_paths = set(self.outerdir.glob('**/*')) self.expected_paths.discard(self.destdir) + self.expected_paths.discard(flag_path) try: - yield + yield self finally: tar.close() - if self.raised_exception: + if self.reraise_exception: raise self.raised_exception self.assertEqual(self.expected_paths, set()) + if check_flag: + self.assertEqual(flag_path.read_text(), 'capture me') + else: + assert filter == 'fully_trusted' def expect_file(self, name, type=None, symlink_to=None, mode=None, - size=None): + size=None, content=None): """Check a single file. See check_context.""" if self.raised_exception: raise self.raised_exception @@ -3605,26 +3663,45 @@ class TestExtractionFilters(unittest.TestCase): # The symlink might be the same (textually) as what we expect, # but some systems change the link to an equivalent path, so # we fall back to samefile(). - if expected != got: - self.assertTrue(got.samefile(expected)) + try: + if expected != got: + self.assertTrue(got.samefile(expected)) + except Exception as e: + # attach a note, so it's shown even if `samefile` fails + e.add_note(f'{expected=}, {got=}') + raise elif type == tarfile.REGTYPE or type is None: self.assertTrue(path.is_file()) elif type == tarfile.DIRTYPE: self.assertTrue(path.is_dir()) elif type == tarfile.FIFOTYPE: self.assertTrue(path.is_fifo()) + elif type == tarfile.SYMTYPE: + self.assertTrue(path.is_symlink()) else: raise NotImplementedError(type) if size is not None: self.assertEqual(path.stat().st_size, size) + if content is not None: + self.assertEqual(path.read_text(), content) for parent in path.parents: self.expected_paths.discard(parent) + def expect_any_tree(self, name): + """Check a directory; forget about its contents.""" + tree_path = (self.destdir / name).resolve() + self.expect_file(tree_path, type=tarfile.DIRTYPE) + self.expected_paths = { + p for p in self.expected_paths + if tree_path not in p.parents + } + def expect_exception(self, exc_type, message_re='.'): with self.assertRaisesRegex(exc_type, message_re): if self.raised_exception is not None: raise self.raised_exception - self.raised_exception = None + self.reraise_exception = False + return self.raised_exception def test_benign_file(self): with ArchiveMaker() as arc: @@ -3710,6 +3787,80 @@ class TestExtractionFilters(unittest.TestCase): self.expect_file('parent/evil') @symlink_test + @os_helper.skip_unless_symlink + def test_realpath_limit_attack(self): + # (CVE-2025-4517) + + with ArchiveMaker() as arc: + # populate the symlinks and dirs that expand in os.path.realpath() + # The component length is chosen so that in common cases, the unexpanded + # path fits in PATH_MAX, but it overflows when the final symlink + # is expanded + steps = "abcdefghijklmnop" + if sys.platform == 'win32': + component = 'd' * 25 + elif 'PC_PATH_MAX' in os.pathconf_names: + max_path_len = os.pathconf(self.outerdir.parent, "PC_PATH_MAX") + path_sep_len = 1 + dest_len = len(str(self.destdir)) + path_sep_len + component_len = (max_path_len - dest_len) // (len(steps) + path_sep_len) + component = 'd' * component_len + else: + raise NotImplementedError("Need to guess component length for {sys.platform}") + path = "" + step_path = "" + for i in steps: + arc.add(os.path.join(path, component), type=tarfile.DIRTYPE, + mode='drwxrwxrwx') + arc.add(os.path.join(path, i), symlink_to=component) + path = os.path.join(path, component) + step_path = os.path.join(step_path, i) + # create the final symlink that exceeds PATH_MAX and simply points + # to the top dir. + # this link will never be expanded by + # os.path.realpath(strict=False), nor anything after it. + linkpath = os.path.join(*steps, "l"*254) + parent_segments = [".."] * len(steps) + arc.add(linkpath, symlink_to=os.path.join(*parent_segments)) + # make a symlink outside to keep the tar command happy + arc.add("escape", symlink_to=os.path.join(linkpath, "..")) + # use the symlinks above, that are not checked, to create a hardlink + # to a file outside of the destination path + arc.add("flaglink", hardlink_to=os.path.join("escape", "flag")) + # now that we have the hardlink we can overwrite the file + arc.add("flaglink", content='overwrite') + # we can also create new files as well! + arc.add("escape/newfile", content='new') + + with (self.subTest('fully_trusted'), + self.check_context(arc.open(), filter='fully_trusted', + check_flag=False)): + if sys.platform == 'win32': + self.expect_exception((FileNotFoundError, FileExistsError)) + elif self.raised_exception: + # Cannot symlink/hardlink: tarfile falls back to getmember() + self.expect_exception(KeyError) + # Otherwise, this block should never enter. + else: + self.expect_any_tree(component) + self.expect_file('flaglink', content='overwrite') + self.expect_file('../newfile', content='new') + self.expect_file('escape', type=tarfile.SYMTYPE) + self.expect_file('a', symlink_to=component) + + for filter in 'tar', 'data': + with self.subTest(filter), self.check_context(arc.open(), filter=filter): + exc = self.expect_exception((OSError, KeyError)) + if isinstance(exc, OSError): + if sys.platform == 'win32': + # 3: ERROR_PATH_NOT_FOUND + # 5: ERROR_ACCESS_DENIED + # 206: ERROR_FILENAME_EXCED_RANGE + self.assertIn(exc.winerror, (3, 5, 206)) + else: + self.assertEqual(exc.errno, errno.ENAMETOOLONG) + + @symlink_test def test_parent_symlink2(self): # Test interplaying symlinks # Inspired by 'dirsymlink2b' in jwilk/traversal-archives @@ -3931,8 +4082,8 @@ class TestExtractionFilters(unittest.TestCase): arc.add('symlink2', symlink_to=os.path.join( 'linkdir', 'hardlink2')) arc.add('targetdir/target', size=3) - arc.add('linkdir/hardlink', hardlink_to='targetdir/target') - arc.add('linkdir/hardlink2', hardlink_to='linkdir/symlink') + arc.add('linkdir/hardlink', hardlink_to=os.path.join('targetdir', 'target')) + arc.add('linkdir/hardlink2', hardlink_to=os.path.join('linkdir', 'symlink')) for filter in 'tar', 'data', 'fully_trusted': with self.check_context(arc.open(), filter): @@ -3948,6 +4099,129 @@ class TestExtractionFilters(unittest.TestCase): self.expect_file('linkdir/symlink', size=3) self.expect_file('symlink2', size=3) + @symlink_test + def test_sneaky_hardlink_fallback(self): + # (CVE-2025-4330) + # Test that when hardlink extraction falls back to extracting members + # from the archive, the extracted member is (re-)filtered. + with ArchiveMaker() as arc: + # Create a directory structure so the c/escape symlink stays + # inside the path + arc.add("a/t/dummy") + # Create b/ directory + arc.add("b/") + # Point "c" to the bottom of the tree in "a" + arc.add("c", symlink_to=os.path.join("a", "t")) + # link to non-existant location under "a" + arc.add("c/escape", symlink_to=os.path.join("..", "..", + "link_here")) + # Move "c" to point to "b" ("c/escape" no longer exists) + arc.add("c", symlink_to="b") + # Attempt to create a hard link to "c/escape". Since it doesn't + # exist it will attempt to extract "cescape" but at "boom". + arc.add("boom", hardlink_to=os.path.join("c", "escape")) + + with self.check_context(arc.open(), 'data'): + if not os_helper.can_symlink(): + # When 'c/escape' is extracted, 'c' is a regular + # directory, and 'c/escape' *would* point outside + # the destination if symlinks were allowed. + self.expect_exception( + tarfile.LinkOutsideDestinationError) + elif sys.platform == "win32": + # On Windows, 'c/escape' points outside the destination + self.expect_exception(tarfile.LinkOutsideDestinationError) + else: + e = self.expect_exception( + tarfile.LinkFallbackError, + "link 'boom' would be extracted as a copy of " + + "'c/escape', which was rejected") + self.assertIsInstance(e.__cause__, + tarfile.LinkOutsideDestinationError) + for filter in 'tar', 'fully_trusted': + with self.subTest(filter), self.check_context(arc.open(), filter): + if not os_helper.can_symlink(): + self.expect_file("a/t/dummy") + self.expect_file("b/") + self.expect_file("c/") + else: + self.expect_file("a/t/dummy") + self.expect_file("b/") + self.expect_file("a/t/escape", symlink_to='../../link_here') + self.expect_file("boom", symlink_to='../../link_here') + self.expect_file("c", symlink_to='b') + + @symlink_test + def test_exfiltration_via_symlink(self): + # (CVE-2025-4138) + # Test changing symlinks that result in a symlink pointing outside + # the extraction directory, unless prevented by 'data' filter's + # normalization. + with ArchiveMaker() as arc: + arc.add("escape", symlink_to=os.path.join('link', 'link', '..', '..', 'link-here')) + arc.add("link", symlink_to='./') + + for filter in 'tar', 'data', 'fully_trusted': + with self.check_context(arc.open(), filter): + if os_helper.can_symlink(): + self.expect_file("link", symlink_to='./') + if filter == 'data': + self.expect_file("escape", symlink_to='link-here') + else: + self.expect_file("escape", + symlink_to='link/link/../../link-here') + else: + # Nothing is extracted. + pass + + @symlink_test + def test_chmod_outside_dir(self): + # (CVE-2024-12718) + # Test that members used for delayed updates of directory metadata + # are (re-)filtered. + with ArchiveMaker() as arc: + # "pwn" is a veeeery innocent symlink: + arc.add("a/pwn", symlink_to='.') + # But now "pwn" is also a directory, so it's scheduled to have its + # metadata updated later: + arc.add("a/pwn/", mode='drwxrwxrwx') + # Oops, "pwn" is not so innocent any more: + arc.add("a/pwn", symlink_to='x/../') + # Newly created symlink points to the dest dir, + # so it's OK for the "data" filter. + arc.add('a/x', symlink_to=('../')) + # But now "pwn" points outside the dest dir + + for filter in 'tar', 'data', 'fully_trusted': + with self.check_context(arc.open(), filter) as cc: + if not os_helper.can_symlink(): + self.expect_file("a/pwn/") + elif filter == 'data': + self.expect_file("a/x", symlink_to='../') + self.expect_file("a/pwn", symlink_to='.') + else: + self.expect_file("a/x", symlink_to='../') + self.expect_file("a/pwn", symlink_to='x/../') + if sys.platform != "win32": + st_mode = cc.outerdir.stat().st_mode + self.assertNotEqual(st_mode & 0o777, 0o777) + + def test_link_fallback_normalizes(self): + # Make sure hardlink fallbacks work for non-normalized paths for all + # filters + with ArchiveMaker() as arc: + arc.add("dir/") + arc.add("dir/../afile") + arc.add("link1", hardlink_to='dir/../afile') + arc.add("link2", hardlink_to='dir/../dir/../afile') + + for filter in 'tar', 'data', 'fully_trusted': + with self.check_context(arc.open(), filter) as cc: + self.expect_file("dir/") + self.expect_file("afile") + self.expect_file("link1") + self.expect_file("link2") + def test_modes(self): # Test how file modes are extracted # (Note that the modes are ignored on platforms without working chmod) @@ -4072,7 +4346,7 @@ class TestExtractionFilters(unittest.TestCase): # The 'tar' filter returns TarInfo objects with the same name/type. # (It can also fail for particularly "evil" input, but we don't have # that in the test archive.) - with tarfile.TarFile.open(tarname) as tar: + with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar: for tarinfo in tar.getmembers(): try: filtered = tarfile.tar_filter(tarinfo, '') @@ -4084,7 +4358,7 @@ class TestExtractionFilters(unittest.TestCase): def test_data_filter(self): # The 'data' filter either raises, or returns TarInfo with the same # name/type. - with tarfile.TarFile.open(tarname) as tar: + with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar: for tarinfo in tar.getmembers(): try: filtered = tarfile.data_filter(tarinfo, '') @@ -4242,13 +4516,13 @@ class TestExtractionFilters(unittest.TestCase): # If errorlevel is 0, errors affected by errorlevel are ignored with self.check_context(arc.open(errorlevel=0), extracterror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=0), filtererror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=0), oserror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=0), tarerror_filter): self.expect_exception(tarfile.TarError) @@ -4259,7 +4533,7 @@ class TestExtractionFilters(unittest.TestCase): # If 1, all fatal errors are raised with self.check_context(arc.open(errorlevel=1), extracterror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=1), filtererror_filter): self.expect_exception(tarfile.FilterError) diff --git a/Lib/test/test_traceback.py b/Lib/test/test_traceback.py index b9be87f357f..74b979d0096 100644 --- a/Lib/test/test_traceback.py +++ b/Lib/test/test_traceback.py @@ -4188,6 +4188,15 @@ class SuggestionFormattingTestBase: self.assertNotIn("blech", actual) self.assertNotIn("oh no!", actual) + def test_attribute_error_with_non_string_candidates(self): + class T: + bluch = 1 + + instance = T() + instance.__dict__[0] = 1 + actual = self.get_suggestion(instance, 'blich') + self.assertIn("bluch", actual) + def test_attribute_error_with_bad_name(self): def raise_attribute_error_with_bad_name(): raise AttributeError(name=12, obj=23) @@ -4223,8 +4232,8 @@ class SuggestionFormattingTestBase: return mod_name - def get_import_from_suggestion(self, mod_dict, name): - modname = self.make_module(mod_dict) + def get_import_from_suggestion(self, code, name): + modname = self.make_module(code) def callable(): try: @@ -4301,6 +4310,13 @@ class SuggestionFormattingTestBase: self.assertIn("'_bluch'", self.get_import_from_suggestion(code, '_luch')) self.assertNotIn("'_bluch'", self.get_import_from_suggestion(code, 'bluch')) + def test_import_from_suggestions_non_string(self): + modWithNonStringAttr = textwrap.dedent("""\ + globals()[0] = 1 + bluch = 1 + """) + self.assertIn("'bluch'", self.get_import_from_suggestion(modWithNonStringAttr, 'blech')) + def test_import_from_suggestions_do_not_trigger_for_long_attributes(self): code = "blech = None" @@ -4397,6 +4413,15 @@ class SuggestionFormattingTestBase: actual = self.get_suggestion(func) self.assertIn("'ZeroDivisionError'?", actual) + def test_name_error_suggestions_with_non_string_candidates(self): + def func(): + abc = 1 + custom_globals = globals().copy() + custom_globals[0] = 1 + print(eval("abv", custom_globals, locals())) + actual = self.get_suggestion(func) + self.assertIn("abc", actual) + def test_name_error_suggestions_do_not_trigger_for_long_names(self): def func(): somethingverywronghehehehehehe = None diff --git a/Lib/test/test_urlparse.py b/Lib/test/test_urlparse.py index aabc360289a..b2bde5a9b1d 100644 --- a/Lib/test/test_urlparse.py +++ b/Lib/test/test_urlparse.py @@ -2,6 +2,7 @@ import sys import unicodedata import unittest import urllib.parse +from test import support RFC1808_BASE = "http://a/b/c/d;p?q#f" RFC2396_BASE = "http://a/b/c/d;p?q" @@ -156,27 +157,25 @@ class UrlParseTestCase(unittest.TestCase): self.assertEqual(result3.hostname, result.hostname) self.assertEqual(result3.port, result.port) - def test_qsl(self): - for orig, expect in parse_qsl_test_cases: - result = urllib.parse.parse_qsl(orig, keep_blank_values=True) - self.assertEqual(result, expect, "Error parsing %r" % orig) - expect_without_blanks = [v for v in expect if len(v[1])] - result = urllib.parse.parse_qsl(orig, keep_blank_values=False) - self.assertEqual(result, expect_without_blanks, - "Error parsing %r" % orig) - - def test_qs(self): - for orig, expect in parse_qs_test_cases: - result = urllib.parse.parse_qs(orig, keep_blank_values=True) - self.assertEqual(result, expect, "Error parsing %r" % orig) - expect_without_blanks = {v: expect[v] - for v in expect if len(expect[v][0])} - result = urllib.parse.parse_qs(orig, keep_blank_values=False) - self.assertEqual(result, expect_without_blanks, - "Error parsing %r" % orig) - - def test_roundtrips(self): - str_cases = [ + @support.subTests('orig,expect', parse_qsl_test_cases) + def test_qsl(self, orig, expect): + result = urllib.parse.parse_qsl(orig, keep_blank_values=True) + self.assertEqual(result, expect) + expect_without_blanks = [v for v in expect if len(v[1])] + result = urllib.parse.parse_qsl(orig, keep_blank_values=False) + self.assertEqual(result, expect_without_blanks) + + @support.subTests('orig,expect', parse_qs_test_cases) + def test_qs(self, orig, expect): + result = urllib.parse.parse_qs(orig, keep_blank_values=True) + self.assertEqual(result, expect) + expect_without_blanks = {v: expect[v] + for v in expect if len(expect[v][0])} + result = urllib.parse.parse_qs(orig, keep_blank_values=False) + self.assertEqual(result, expect_without_blanks) + + @support.subTests('bytes', (False, True)) + @support.subTests('url,parsed,split', [ ('path/to/file', ('', '', 'path/to/file', '', '', ''), ('', '', 'path/to/file', '', '')), @@ -263,23 +262,21 @@ class UrlParseTestCase(unittest.TestCase): ('sch_me:path/to/file', ('', '', 'sch_me:path/to/file', '', '', ''), ('', '', 'sch_me:path/to/file', '', '')), - ] - def _encode(t): - return (t[0].encode('ascii'), - tuple(x.encode('ascii') for x in t[1]), - tuple(x.encode('ascii') for x in t[2])) - bytes_cases = [_encode(x) for x in str_cases] - str_cases += [ ('schème:path/to/file', ('', '', 'schème:path/to/file', '', '', ''), ('', '', 'schème:path/to/file', '', '')), - ] - for url, parsed, split in str_cases + bytes_cases: - with self.subTest(url): - self.checkRoundtrips(url, parsed, split) - - def test_roundtrips_normalization(self): - str_cases = [ + ]) + def test_roundtrips(self, bytes, url, parsed, split): + if bytes: + if not url.isascii(): + self.skipTest('non-ASCII bytes') + url = str_encode(url) + parsed = tuple_encode(parsed) + split = tuple_encode(split) + self.checkRoundtrips(url, parsed, split) + + @support.subTests('bytes', (False, True)) + @support.subTests('url,url2,parsed,split', [ ('///path/to/file', '/path/to/file', ('', '', '/path/to/file', '', '', ''), @@ -300,22 +297,18 @@ class UrlParseTestCase(unittest.TestCase): 'https:///tmp/junk.txt', ('https', '', '/tmp/junk.txt', '', '', ''), ('https', '', '/tmp/junk.txt', '', '')), - ] - def _encode(t): - return (t[0].encode('ascii'), - t[1].encode('ascii'), - tuple(x.encode('ascii') for x in t[2]), - tuple(x.encode('ascii') for x in t[3])) - bytes_cases = [_encode(x) for x in str_cases] - for url, url2, parsed, split in str_cases + bytes_cases: - with self.subTest(url): - self.checkRoundtrips(url, parsed, split, url2) - - def test_http_roundtrips(self): - # urllib.parse.urlsplit treats 'http:' as an optimized special case, - # so we test both 'http:' and 'https:' in all the following. - # Three cheers for white box knowledge! - str_cases = [ + ]) + def test_roundtrips_normalization(self, bytes, url, url2, parsed, split): + if bytes: + url = str_encode(url) + url2 = str_encode(url2) + parsed = tuple_encode(parsed) + split = tuple_encode(split) + self.checkRoundtrips(url, parsed, split, url2) + + @support.subTests('bytes', (False, True)) + @support.subTests('scheme', ('http', 'https')) + @support.subTests('url,parsed,split', [ ('://www.python.org', ('www.python.org', '', '', '', ''), ('www.python.org', '', '', '')), @@ -331,23 +324,20 @@ class UrlParseTestCase(unittest.TestCase): ('://a/b/c/d;p?q#f', ('a', '/b/c/d', 'p', 'q', 'f'), ('a', '/b/c/d;p', 'q', 'f')), - ] - def _encode(t): - return (t[0].encode('ascii'), - tuple(x.encode('ascii') for x in t[1]), - tuple(x.encode('ascii') for x in t[2])) - bytes_cases = [_encode(x) for x in str_cases] - str_schemes = ('http', 'https') - bytes_schemes = (b'http', b'https') - str_tests = str_schemes, str_cases - bytes_tests = bytes_schemes, bytes_cases - for schemes, test_cases in (str_tests, bytes_tests): - for scheme in schemes: - for url, parsed, split in test_cases: - url = scheme + url - parsed = (scheme,) + parsed - split = (scheme,) + split - self.checkRoundtrips(url, parsed, split) + ]) + def test_http_roundtrips(self, bytes, scheme, url, parsed, split): + # urllib.parse.urlsplit treats 'http:' as an optimized special case, + # so we test both 'http:' and 'https:' in all the following. + # Three cheers for white box knowledge! + if bytes: + scheme = str_encode(scheme) + url = str_encode(url) + parsed = tuple_encode(parsed) + split = tuple_encode(split) + url = scheme + url + parsed = (scheme,) + parsed + split = (scheme,) + split + self.checkRoundtrips(url, parsed, split) def checkJoin(self, base, relurl, expected, *, relroundtrip=True): with self.subTest(base=base, relurl=relurl): @@ -363,12 +353,13 @@ class UrlParseTestCase(unittest.TestCase): relurlb = urllib.parse.urlunsplit(urllib.parse.urlsplit(relurlb)) self.assertEqual(urllib.parse.urljoin(baseb, relurlb), expectedb) - def test_unparse_parse(self): - str_cases = ['Python', './Python','x-newscheme://foo.com/stuff','x://y','x:/y','x:/','/',] - bytes_cases = [x.encode('ascii') for x in str_cases] - for u in str_cases + bytes_cases: - self.assertEqual(urllib.parse.urlunsplit(urllib.parse.urlsplit(u)), u) - self.assertEqual(urllib.parse.urlunparse(urllib.parse.urlparse(u)), u) + @support.subTests('bytes', (False, True)) + @support.subTests('u', ['Python', './Python','x-newscheme://foo.com/stuff','x://y','x:/y','x:/','/',]) + def test_unparse_parse(self, bytes, u): + if bytes: + u = str_encode(u) + self.assertEqual(urllib.parse.urlunsplit(urllib.parse.urlsplit(u)), u) + self.assertEqual(urllib.parse.urlunparse(urllib.parse.urlparse(u)), u) def test_RFC1808(self): # "normal" cases from RFC 1808: @@ -695,8 +686,8 @@ class UrlParseTestCase(unittest.TestCase): self.checkJoin('///b/c', '///w', '///w') self.checkJoin('///b/c', 'w', '///b/w') - def test_RFC2732(self): - str_cases = [ + @support.subTests('bytes', (False, True)) + @support.subTests('url,hostname,port', [ ('http://Test.python.org:5432/foo/', 'test.python.org', 5432), ('http://12.34.56.78:5432/foo/', '12.34.56.78', 5432), ('http://[::1]:5432/foo/', '::1', 5432), @@ -727,26 +718,28 @@ class UrlParseTestCase(unittest.TestCase): ('http://[::12.34.56.78]:/foo/', '::12.34.56.78', None), ('http://[::ffff:12.34.56.78]:/foo/', '::ffff:12.34.56.78', None), - ] - def _encode(t): - return t[0].encode('ascii'), t[1].encode('ascii'), t[2] - bytes_cases = [_encode(x) for x in str_cases] - for url, hostname, port in str_cases + bytes_cases: - urlparsed = urllib.parse.urlparse(url) - self.assertEqual((urlparsed.hostname, urlparsed.port) , (hostname, port)) - - str_cases = [ + ]) + def test_RFC2732(self, bytes, url, hostname, port): + if bytes: + url = str_encode(url) + hostname = str_encode(hostname) + urlparsed = urllib.parse.urlparse(url) + self.assertEqual((urlparsed.hostname, urlparsed.port), (hostname, port)) + + @support.subTests('bytes', (False, True)) + @support.subTests('invalid_url', [ 'http://::12.34.56.78]/', 'http://[::1/foo/', 'ftp://[::1/foo/bad]/bad', 'http://[::1/foo/bad]/bad', - 'http://[::ffff:12.34.56.78'] - bytes_cases = [x.encode('ascii') for x in str_cases] - for invalid_url in str_cases + bytes_cases: - self.assertRaises(ValueError, urllib.parse.urlparse, invalid_url) - - def test_urldefrag(self): - str_cases = [ + 'http://[::ffff:12.34.56.78']) + def test_RFC2732_invalid(self, bytes, invalid_url): + if bytes: + invalid_url = str_encode(invalid_url) + self.assertRaises(ValueError, urllib.parse.urlparse, invalid_url) + + @support.subTests('bytes', (False, True)) + @support.subTests('url,defrag,frag', [ ('http://python.org#frag', 'http://python.org', 'frag'), ('http://python.org', 'http://python.org', ''), ('http://python.org/#frag', 'http://python.org/', 'frag'), @@ -770,18 +763,18 @@ class UrlParseTestCase(unittest.TestCase): ('http:?q#f', 'http:?q', 'f'), ('//a/b/c;p?q#f', '//a/b/c;p?q', 'f'), ('://a/b/c;p?q#f', '://a/b/c;p?q', 'f'), - ] - def _encode(t): - return type(t)(x.encode('ascii') for x in t) - bytes_cases = [_encode(x) for x in str_cases] - for url, defrag, frag in str_cases + bytes_cases: - with self.subTest(url): - result = urllib.parse.urldefrag(url) - hash = '#' if isinstance(url, str) else b'#' - self.assertEqual(result.geturl(), url.rstrip(hash)) - self.assertEqual(result, (defrag, frag)) - self.assertEqual(result.url, defrag) - self.assertEqual(result.fragment, frag) + ]) + def test_urldefrag(self, bytes, url, defrag, frag): + if bytes: + url = str_encode(url) + defrag = str_encode(defrag) + frag = str_encode(frag) + result = urllib.parse.urldefrag(url) + hash = '#' if isinstance(url, str) else b'#' + self.assertEqual(result.geturl(), url.rstrip(hash)) + self.assertEqual(result, (defrag, frag)) + self.assertEqual(result.url, defrag) + self.assertEqual(result.fragment, frag) def test_urlsplit_scoped_IPv6(self): p = urllib.parse.urlsplit('http://[FE80::822a:a8ff:fe49:470c%tESt]:1234') @@ -981,42 +974,35 @@ class UrlParseTestCase(unittest.TestCase): self.assertEqual(p.scheme, "https") self.assertEqual(p.geturl(), "https://www.python.org/") - def test_attributes_bad_port(self): + @support.subTests('bytes', (False, True)) + @support.subTests('parse', (urllib.parse.urlsplit, urllib.parse.urlparse)) + @support.subTests('port', ("foo", "1.5", "-1", "0x10", "-0", "1_1", " 1", "1 ", "६")) + def test_attributes_bad_port(self, bytes, parse, port): """Check handling of invalid ports.""" - for bytes in (False, True): - for parse in (urllib.parse.urlsplit, urllib.parse.urlparse): - for port in ("foo", "1.5", "-1", "0x10", "-0", "1_1", " 1", "1 ", "६"): - with self.subTest(bytes=bytes, parse=parse, port=port): - netloc = "www.example.net:" + port - url = "http://" + netloc + "/" - if bytes: - if netloc.isascii() and port.isascii(): - netloc = netloc.encode("ascii") - url = url.encode("ascii") - else: - continue - p = parse(url) - self.assertEqual(p.netloc, netloc) - with self.assertRaises(ValueError): - p.port + netloc = "www.example.net:" + port + url = "http://" + netloc + "/" + if bytes: + if not (netloc.isascii() and port.isascii()): + self.skipTest('non-ASCII bytes') + netloc = str_encode(netloc) + url = str_encode(url) + p = parse(url) + self.assertEqual(p.netloc, netloc) + with self.assertRaises(ValueError): + p.port - def test_attributes_bad_scheme(self): + @support.subTests('bytes', (False, True)) + @support.subTests('parse', (urllib.parse.urlsplit, urllib.parse.urlparse)) + @support.subTests('scheme', (".", "+", "-", "0", "http&", "६http")) + def test_attributes_bad_scheme(self, bytes, parse, scheme): """Check handling of invalid schemes.""" - for bytes in (False, True): - for parse in (urllib.parse.urlsplit, urllib.parse.urlparse): - for scheme in (".", "+", "-", "0", "http&", "६http"): - with self.subTest(bytes=bytes, parse=parse, scheme=scheme): - url = scheme + "://www.example.net" - if bytes: - if url.isascii(): - url = url.encode("ascii") - else: - continue - p = parse(url) - if bytes: - self.assertEqual(p.scheme, b"") - else: - self.assertEqual(p.scheme, "") + url = scheme + "://www.example.net" + if bytes: + if not url.isascii(): + self.skipTest('non-ASCII bytes') + url = url.encode("ascii") + p = parse(url) + self.assertEqual(p.scheme, b"" if bytes else "") def test_attributes_without_netloc(self): # This example is straight from RFC 3261. It looks like it @@ -1128,24 +1114,21 @@ class UrlParseTestCase(unittest.TestCase): self.assertEqual(urllib.parse.urlparse(b"x-newscheme://foo.com/stuff?query"), (b'x-newscheme', b'foo.com', b'/stuff', b'', b'query', b'')) - def test_default_scheme(self): + @support.subTests('func', (urllib.parse.urlparse, urllib.parse.urlsplit)) + def test_default_scheme(self, func): # Exercise the scheme parameter of urlparse() and urlsplit() - for func in (urllib.parse.urlparse, urllib.parse.urlsplit): - with self.subTest(function=func): - result = func("http://example.net/", "ftp") - self.assertEqual(result.scheme, "http") - result = func(b"http://example.net/", b"ftp") - self.assertEqual(result.scheme, b"http") - self.assertEqual(func("path", "ftp").scheme, "ftp") - self.assertEqual(func("path", scheme="ftp").scheme, "ftp") - self.assertEqual(func(b"path", scheme=b"ftp").scheme, b"ftp") - self.assertEqual(func("path").scheme, "") - self.assertEqual(func(b"path").scheme, b"") - self.assertEqual(func(b"path", "").scheme, b"") - - def test_parse_fragments(self): - # Exercise the allow_fragments parameter of urlparse() and urlsplit() - tests = ( + result = func("http://example.net/", "ftp") + self.assertEqual(result.scheme, "http") + result = func(b"http://example.net/", b"ftp") + self.assertEqual(result.scheme, b"http") + self.assertEqual(func("path", "ftp").scheme, "ftp") + self.assertEqual(func("path", scheme="ftp").scheme, "ftp") + self.assertEqual(func(b"path", scheme=b"ftp").scheme, b"ftp") + self.assertEqual(func("path").scheme, "") + self.assertEqual(func(b"path").scheme, b"") + self.assertEqual(func(b"path", "").scheme, b"") + + @support.subTests('url,attr,expected_frag', ( ("http:#frag", "path", "frag"), ("//example.net#frag", "path", "frag"), ("index.html#frag", "path", "frag"), @@ -1156,24 +1139,24 @@ class UrlParseTestCase(unittest.TestCase): ("//abc#@frag", "path", "@frag"), ("//abc:80#@frag", "path", "@frag"), ("//abc#@frag:80", "path", "@frag:80"), - ) - for url, attr, expected_frag in tests: - for func in (urllib.parse.urlparse, urllib.parse.urlsplit): - if attr == "params" and func is urllib.parse.urlsplit: - attr = "path" - with self.subTest(url=url, function=func): - result = func(url, allow_fragments=False) - self.assertEqual(result.fragment, "") - self.assertEndsWith(getattr(result, attr), - "#" + expected_frag) - self.assertEqual(func(url, "", False).fragment, "") - - result = func(url, allow_fragments=True) - self.assertEqual(result.fragment, expected_frag) - self.assertNotEndsWith(getattr(result, attr), expected_frag) - self.assertEqual(func(url, "", True).fragment, - expected_frag) - self.assertEqual(func(url).fragment, expected_frag) + )) + @support.subTests('func', (urllib.parse.urlparse, urllib.parse.urlsplit)) + def test_parse_fragments(self, url, attr, expected_frag, func): + # Exercise the allow_fragments parameter of urlparse() and urlsplit() + if attr == "params" and func is urllib.parse.urlsplit: + attr = "path" + result = func(url, allow_fragments=False) + self.assertEqual(result.fragment, "") + self.assertEndsWith(getattr(result, attr), + "#" + expected_frag) + self.assertEqual(func(url, "", False).fragment, "") + + result = func(url, allow_fragments=True) + self.assertEqual(result.fragment, expected_frag) + self.assertNotEndsWith(getattr(result, attr), expected_frag) + self.assertEqual(func(url, "", True).fragment, + expected_frag) + self.assertEqual(func(url).fragment, expected_frag) def test_mixed_types_rejected(self): # Several functions that process either strings or ASCII encoded bytes @@ -1199,7 +1182,14 @@ class UrlParseTestCase(unittest.TestCase): with self.assertRaisesRegex(TypeError, "Cannot mix str"): urllib.parse.urljoin(b"http://python.org", "http://python.org") - def _check_result_type(self, str_type): + @support.subTests('result_type', [ + urllib.parse.DefragResult, + urllib.parse.SplitResult, + urllib.parse.ParseResult, + ]) + def test_result_pairs(self, result_type): + # Check encoding and decoding between result pairs + str_type = result_type num_args = len(str_type._fields) bytes_type = str_type._encoded_counterpart self.assertIs(bytes_type._decoded_counterpart, str_type) @@ -1224,16 +1214,6 @@ class UrlParseTestCase(unittest.TestCase): self.assertEqual(str_result.encode(encoding, errors), bytes_args) self.assertEqual(str_result.encode(encoding, errors), bytes_result) - def test_result_pairs(self): - # Check encoding and decoding between result pairs - result_types = [ - urllib.parse.DefragResult, - urllib.parse.SplitResult, - urllib.parse.ParseResult, - ] - for result_type in result_types: - self._check_result_type(result_type) - def test_parse_qs_encoding(self): result = urllib.parse.parse_qs("key=\u0141%E9", encoding="latin-1") self.assertEqual(result, {'key': ['\u0141\xE9']}) @@ -1265,8 +1245,7 @@ class UrlParseTestCase(unittest.TestCase): urllib.parse.parse_qsl('&'.join(['a=a']*11), max_num_fields=10) urllib.parse.parse_qsl('&'.join(['a=a']*10), max_num_fields=10) - def test_parse_qs_separator(self): - parse_qs_semicolon_cases = [ + @support.subTests('orig,expect', [ (";", {}), (";;", {}), (";a=b", {'a': ['b']}), @@ -1277,17 +1256,14 @@ class UrlParseTestCase(unittest.TestCase): (b";a=b", {b'a': [b'b']}), (b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}), (b"a=1;a=2", {b'a': [b'1', b'2']}), - ] - for orig, expect in parse_qs_semicolon_cases: - with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"): - result = urllib.parse.parse_qs(orig, separator=';') - self.assertEqual(result, expect, "Error parsing %r" % orig) - result_bytes = urllib.parse.parse_qs(orig, separator=b';') - self.assertEqual(result_bytes, expect, "Error parsing %r" % orig) - - - def test_parse_qsl_separator(self): - parse_qsl_semicolon_cases = [ + ]) + def test_parse_qs_separator(self, orig, expect): + result = urllib.parse.parse_qs(orig, separator=';') + self.assertEqual(result, expect) + result_bytes = urllib.parse.parse_qs(orig, separator=b';') + self.assertEqual(result_bytes, expect) + + @support.subTests('orig,expect', [ (";", []), (";;", []), (";a=b", [('a', 'b')]), @@ -1298,13 +1274,12 @@ class UrlParseTestCase(unittest.TestCase): (b";a=b", [(b'a', b'b')]), (b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), (b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]), - ] - for orig, expect in parse_qsl_semicolon_cases: - with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"): - result = urllib.parse.parse_qsl(orig, separator=';') - self.assertEqual(result, expect, "Error parsing %r" % orig) - result_bytes = urllib.parse.parse_qsl(orig, separator=b';') - self.assertEqual(result_bytes, expect, "Error parsing %r" % orig) + ]) + def test_parse_qsl_separator(self, orig, expect): + result = urllib.parse.parse_qsl(orig, separator=';') + self.assertEqual(result, expect) + result_bytes = urllib.parse.parse_qsl(orig, separator=b';') + self.assertEqual(result_bytes, expect) def test_parse_qsl_bytes(self): self.assertEqual(urllib.parse.parse_qsl(b'a=b'), [(b'a', b'b')]) @@ -1695,11 +1670,12 @@ class Utility_Tests(unittest.TestCase): self.assertRaises(UnicodeError, urllib.parse._to_bytes, 'http://www.python.org/medi\u00e6val') - def test_unwrap(self): - for wrapped_url in ('<URL:scheme://host/path>', '<scheme://host/path>', - 'URL:scheme://host/path', 'scheme://host/path'): - url = urllib.parse.unwrap(wrapped_url) - self.assertEqual(url, 'scheme://host/path') + @support.subTests('wrapped_url', + ('<URL:scheme://host/path>', '<scheme://host/path>', + 'URL:scheme://host/path', 'scheme://host/path')) + def test_unwrap(self, wrapped_url): + url = urllib.parse.unwrap(wrapped_url) + self.assertEqual(url, 'scheme://host/path') class DeprecationTest(unittest.TestCase): @@ -1780,5 +1756,11 @@ class DeprecationTest(unittest.TestCase): 'urllib.parse.to_bytes() is deprecated as of 3.8') +def str_encode(s): + return s.encode('ascii') + +def tuple_encode(t): + return tuple(str_encode(x) for x in t) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py index 014634e450e..d4c28aed38e 100644 --- a/Lib/test/test_zstd.py +++ b/Lib/test/test_zstd.py @@ -293,11 +293,11 @@ class CompressorTestCase(unittest.TestCase): # zstd lib doesn't support MT compression if not SUPPORT_MULTITHREADING: - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.nb_workers:4}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.job_size:4}) - with self.assertRaises(ZstdError): + with self.assertRaises(ValueError): ZstdCompressor(options={CompressionParameter.overlap_log:4}) # out of bounds error msg @@ -395,6 +395,115 @@ class CompressorTestCase(unittest.TestCase): c = ZstdCompressor() self.assertNotEqual(c.compress(b'', c.FLUSH_FRAME), b'') + def test_set_pledged_input_size(self): + DAT = DECOMPRESSED_100_PLUS_32KB + CHUNK_SIZE = len(DAT) // 3 + + # wrong value + c = ZstdCompressor() + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(-300) + # overflow + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64) + # ZSTD_CONTENTSIZE_ERROR is invalid + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64-2) + # ZSTD_CONTENTSIZE_UNKNOWN should use None + with self.assertRaisesRegex(ValueError, + r'should be a positive int less than \d+'): + c.set_pledged_input_size(2**64-1) + + # check valid values are settable + c.set_pledged_input_size(2**63) + c.set_pledged_input_size(2**64-3) + + # check that zero means empty frame + c = ZstdCompressor(level=1) + c.set_pledged_input_size(0) + c.compress(b'') + dat = c.flush() + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, 0) + + + # wrong mode + c = ZstdCompressor(level=1) + c.compress(b'123456') + self.assertEqual(c.last_mode, c.CONTINUE) + with self.assertRaisesRegex(ValueError, + r'last_mode == FLUSH_FRAME'): + c.set_pledged_input_size(300) + + # None value + c = ZstdCompressor(level=1) + c.set_pledged_input_size(None) + dat = c.compress(DAT) + c.flush() + + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, None) + + # correct value + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)) + + chunks = [] + posi = 0 + while posi < len(DAT): + dat = c.compress(DAT[posi:posi+CHUNK_SIZE]) + posi += CHUNK_SIZE + chunks.append(dat) + + dat = c.flush() + chunks.append(dat) + chunks = b''.join(chunks) + + ret = get_frame_info(chunks) + self.assertEqual(ret.decompressed_size, len(DAT)) + self.assertEqual(decompress(chunks), DAT) + + c.set_pledged_input_size(len(DAT)) # the second frame + dat = c.compress(DAT) + c.flush() + + ret = get_frame_info(dat) + self.assertEqual(ret.decompressed_size, len(DAT)) + self.assertEqual(decompress(dat), DAT) + + # not enough data + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)+1) + + for start in range(0, len(DAT), CHUNK_SIZE): + end = min(start+CHUNK_SIZE, len(DAT)) + _dat = c.compress(DAT[start:end]) + + with self.assertRaises(ZstdError): + c.flush() + + # too much data + c = ZstdCompressor(level=1) + c.set_pledged_input_size(len(DAT)) + + for start in range(0, len(DAT), CHUNK_SIZE): + end = min(start+CHUNK_SIZE, len(DAT)) + _dat = c.compress(DAT[start:end]) + + with self.assertRaises(ZstdError): + c.compress(b'extra', ZstdCompressor.FLUSH_FRAME) + + # content size not set if content_size_flag == 0 + c = ZstdCompressor(options={CompressionParameter.content_size_flag: 0}) + c.set_pledged_input_size(10) + dat1 = c.compress(b"hello") + dat2 = c.compress(b"world") + dat3 = c.flush() + frame_data = get_frame_info(dat1 + dat2 + dat3) + self.assertIsNone(frame_data.decompressed_size) + + class DecompressorTestCase(unittest.TestCase): def test_simple_decompress_bad_args(self): @@ -1138,27 +1247,41 @@ class ZstdDictTestCase(unittest.TestCase): ZstdDecompressor(zd) # wrong type - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): - ZstdCompressor(zstd_dict=(zd, b'123')) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=[zd, 1]) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=(zd, 1.0)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdCompressor(zstd_dict=(zd,)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, 1, 2)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, -1)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdCompressor(zstd_dict=(zd, 3)) - - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): - ZstdDecompressor(zstd_dict=(zd, b'123')) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaises(OverflowError): + ZstdCompressor(zstd_dict=(zd, 2**1000)) + with self.assertRaises(OverflowError): + ZstdCompressor(zstd_dict=(zd, -2**1000)) + + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor(zstd_dict=[zd, 1]) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor(zstd_dict=(zd, 1.0)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): + ZstdDecompressor((zd,)) + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, 1, 2)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, -1)) - with self.assertRaisesRegex(TypeError, r'should be ZstdDict object'): + with self.assertRaisesRegex(TypeError, r'should be a ZstdDict object'): ZstdDecompressor((zd, 3)) + with self.assertRaises(OverflowError): + ZstdDecompressor((zd, 2**1000)) + with self.assertRaises(OverflowError): + ZstdDecompressor((zd, -2**1000)) def test_train_dict(self): - - TRAINED_DICT = train_dict(SAMPLES, DICT_SIZE1) ZstdDict(TRAINED_DICT.dict_content, is_raw=False) @@ -1240,17 +1363,36 @@ class ZstdDictTestCase(unittest.TestCase): with self.assertRaises(TypeError): _zstd.train_dict({}, (), 100) with self.assertRaises(TypeError): + _zstd.train_dict(bytearray(), (), 100) + with self.assertRaises(TypeError): _zstd.train_dict(b'', 99, 100) with self.assertRaises(TypeError): + _zstd.train_dict(b'', [], 100) + with self.assertRaises(TypeError): _zstd.train_dict(b'', (), 100.1) + with self.assertRaises(TypeError): + _zstd.train_dict(b'', (99.1,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'abc', (4, -1), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'abc', (2,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (99,), 100) # size > size_t with self.assertRaises(ValueError): - _zstd.train_dict(b'', (2**64+1,), 100) + _zstd.train_dict(b'', (2**1000,), 100) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (-2**1000,), 100) # dict_size <= 0 with self.assertRaises(ValueError): _zstd.train_dict(b'', (), 0) + with self.assertRaises(ValueError): + _zstd.train_dict(b'', (), -1) + + with self.assertRaises(ZstdError): + _zstd.train_dict(b'', (), 1) def test_finalize_dict_c(self): with self.assertRaises(TypeError): @@ -1260,21 +1402,50 @@ class ZstdDictTestCase(unittest.TestCase): with self.assertRaises(TypeError): _zstd.finalize_dict({}, b'', (), 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(bytearray(TRAINED_DICT.dict_content), b'', (), 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, bytearray(), (), 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5) with self.assertRaises(TypeError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', [], 100, 5) + with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5) with self.assertRaises(TypeError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (4, -1), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'abc', (2,), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (99,), 100, 5) + # size > size_t with self.assertRaises(ValueError): - _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5) + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**1000,), 100, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (-2**1000,), 100, 5) # dict_size <= 0 with self.assertRaises(ValueError): _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5) + with self.assertRaises(ValueError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -1, 5) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 2**1000, 5) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), -2**1000, 5) + + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 2**1000) + with self.assertRaises(OverflowError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, -2**1000) + + with self.assertRaises(ZstdError): + _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5) def test_train_buffer_protocol_samples(self): def _nbytes(dat): |