aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/.ruff.toml3
-rw-r--r--Lib/test/_code_definitions.py79
-rw-r--r--Lib/test/_test_gc_fast_cycles.py48
-rw-r--r--Lib/test/_test_multiprocessing.py12
-rw-r--r--Lib/test/datetimetester.py3
-rw-r--r--Lib/test/libregrtest/utils.py2
-rw-r--r--Lib/test/pickletester.py29
-rw-r--r--Lib/test/support/strace_helper.py5
-rw-r--r--Lib/test/test_annotationlib.py42
-rw-r--r--Lib/test/test_ast/test_ast.py9
-rw-r--r--Lib/test/test_asyncio/test_ssl.py10
-rw-r--r--Lib/test/test_capi/test_import.py2
-rw-r--r--Lib/test/test_capi/test_opt.py17
-rw-r--r--Lib/test/test_clinic.py4
-rw-r--r--Lib/test/test_code.py81
-rw-r--r--Lib/test/test_codeop.py2
-rw-r--r--Lib/test/test_crossinterp.py120
-rw-r--r--Lib/test/test_dict.py23
-rw-r--r--Lib/test/test_embed.py3
-rw-r--r--Lib/test/test_free_threading/test_io.py109
-rw-r--r--Lib/test/test_functools.py19
-rw-r--r--Lib/test/test_future_stmt/test_future.py5
-rw-r--r--Lib/test/test_gc.py62
-rw-r--r--Lib/test/test_generated_cases.py183
-rw-r--r--Lib/test/test_htmlparser.py91
-rw-r--r--Lib/test/test_importlib/test_threaded_import.py15
-rw-r--r--Lib/test/test_inspect/test_inspect.py37
-rw-r--r--Lib/test/test_linecache.py37
-rw-r--r--Lib/test/test_logging.py83
-rw-r--r--Lib/test/test_pdb.py4
-rw-r--r--Lib/test/test_positional_only_arg.py12
-rw-r--r--Lib/test/test_pyrepl/test_interact.py2
-rw-r--r--Lib/test/test_pyrepl/test_pyrepl.py5
-rw-r--r--Lib/test/test_pyrepl/test_reader.py20
-rw-r--r--Lib/test/test_repl.py6
-rw-r--r--Lib/test/test_sqlite3/test_cli.py45
-rw-r--r--Lib/test/test_sqlite3/test_dbapi.py14
-rw-r--r--Lib/test/test_sqlite3/test_factory.py15
-rw-r--r--Lib/test/test_sqlite3/test_hooks.py22
-rw-r--r--Lib/test/test_sqlite3/test_userfunctions.py55
-rw-r--r--Lib/test/test_ssl.py1
-rw-r--r--Lib/test/test_syntax.py56
-rw-r--r--Lib/test/test_threadedtempfile.py4
-rw-r--r--Lib/test/test_threading.py6
-rw-r--r--Lib/test/test_unparse.py9
-rw-r--r--Lib/test/test_xml_etree.py44
-rw-r--r--Lib/test/test_zstd.py42
47 files changed, 1217 insertions, 280 deletions
diff --git a/Lib/test/.ruff.toml b/Lib/test/.ruff.toml
index a1eac32a83a..7aa8a4785d6 100644
--- a/Lib/test/.ruff.toml
+++ b/Lib/test/.ruff.toml
@@ -9,8 +9,9 @@ extend-exclude = [
"encoded_modules/module_iso_8859_1.py",
"encoded_modules/module_koi8_r.py",
# SyntaxError because of t-strings
- "test_tstring.py",
+ "test_annotationlib.py",
"test_string/test_templatelib.py",
+ "test_tstring.py",
# New grammar constructions may not yet be recognized by Ruff,
# and tests re-use the same names as only the grammar is being checked.
"test_grammar.py",
diff --git a/Lib/test/_code_definitions.py b/Lib/test/_code_definitions.py
index c3daa0dccf5..733a15b25f6 100644
--- a/Lib/test/_code_definitions.py
+++ b/Lib/test/_code_definitions.py
@@ -1,4 +1,32 @@
+def simple_script():
+ assert True
+
+
+def complex_script():
+ obj = 'a string'
+ pickle = __import__('pickle')
+ def spam_minimal():
+ pass
+ spam_minimal()
+ data = pickle.dumps(obj)
+ res = pickle.loads(data)
+ assert res == obj, (res, obj)
+
+
+def script_with_globals():
+ obj1, obj2 = spam(42)
+ assert obj1 == 42
+ assert obj2 is None
+
+
+def script_with_explicit_empty_return():
+ return None
+
+
+def script_with_return():
+ return True
+
def spam_minimal():
# no arg defaults or kwarg defaults
@@ -141,6 +169,11 @@ ham_C_closure, *_ = eggs_closure_C(2)
TOP_FUNCTIONS = [
# shallow
+ simple_script,
+ complex_script,
+ script_with_globals,
+ script_with_explicit_empty_return,
+ script_with_return,
spam_minimal,
spam_with_builtins,
spam_with_globals_and_builtins,
@@ -178,6 +211,52 @@ FUNCTIONS = [
*NESTED_FUNCTIONS,
]
+STATELESS_FUNCTIONS = [
+ simple_script,
+ complex_script,
+ script_with_explicit_empty_return,
+ script_with_return,
+ spam,
+ spam_minimal,
+ spam_with_builtins,
+ spam_args_attrs_and_builtins,
+ spam_returns_arg,
+ spam_annotated,
+ spam_with_inner_not_closure,
+ spam_with_inner_closure,
+ spam_N,
+ spam_C,
+ spam_NN,
+ spam_NC,
+ spam_CN,
+ spam_CC,
+ eggs_nested,
+ eggs_nested_N,
+ ham_nested,
+ ham_C_nested
+]
+STATELESS_CODE = [
+ *STATELESS_FUNCTIONS,
+ script_with_globals,
+ spam_with_globals_and_builtins,
+ spam_full,
+]
+
+PURE_SCRIPT_FUNCTIONS = [
+ simple_script,
+ complex_script,
+ script_with_explicit_empty_return,
+ spam_minimal,
+ spam_with_builtins,
+ spam_with_inner_not_closure,
+ spam_with_inner_closure,
+]
+SCRIPT_FUNCTIONS = [
+ *PURE_SCRIPT_FUNCTIONS,
+ script_with_globals,
+ spam_with_globals_and_builtins,
+]
+
# generators
diff --git a/Lib/test/_test_gc_fast_cycles.py b/Lib/test/_test_gc_fast_cycles.py
new file mode 100644
index 00000000000..4e2c7d72a02
--- /dev/null
+++ b/Lib/test/_test_gc_fast_cycles.py
@@ -0,0 +1,48 @@
+# Run by test_gc.
+from test import support
+import _testinternalcapi
+import gc
+import unittest
+
+class IncrementalGCTests(unittest.TestCase):
+
+ # Use small increments to emulate longer running process in a shorter time
+ @support.gc_threshold(200, 10)
+ def test_incremental_gc_handles_fast_cycle_creation(self):
+
+ class LinkedList:
+
+ #Use slots to reduce number of implicit objects
+ __slots__ = "next", "prev", "surprise"
+
+ def __init__(self, next=None, prev=None):
+ self.next = next
+ if next is not None:
+ next.prev = self
+ self.prev = prev
+ if prev is not None:
+ prev.next = self
+
+ def make_ll(depth):
+ head = LinkedList()
+ for i in range(depth):
+ head = LinkedList(head, head.prev)
+ return head
+
+ head = make_ll(1000)
+
+ assert(gc.isenabled())
+ olds = []
+ initial_heap_size = _testinternalcapi.get_tracked_heap_size()
+ for i in range(20_000):
+ newhead = make_ll(20)
+ newhead.surprise = head
+ olds.append(newhead)
+ if len(olds) == 20:
+ new_objects = _testinternalcapi.get_tracked_heap_size() - initial_heap_size
+ self.assertLess(new_objects, 27_000, f"Heap growing. Reached limit after {i} iterations")
+ del olds[:]
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 4dc9a31d22f..1b690cb88bf 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -2463,6 +2463,12 @@ class _TestValue(BaseTestCase):
self.assertNotHasAttr(arr5, 'get_lock')
self.assertNotHasAttr(arr5, 'get_obj')
+ @unittest.skipIf(c_int is None, "requires _ctypes")
+ def test_invalid_typecode(self):
+ with self.assertRaisesRegex(TypeError, 'bad typecode'):
+ self.Value('x', None)
+ with self.assertRaisesRegex(TypeError, 'bad typecode'):
+ self.RawValue('x', None)
class _TestArray(BaseTestCase):
@@ -2543,6 +2549,12 @@ class _TestArray(BaseTestCase):
self.assertNotHasAttr(arr5, 'get_lock')
self.assertNotHasAttr(arr5, 'get_obj')
+ @unittest.skipIf(c_int is None, "requires _ctypes")
+ def test_invalid_typecode(self):
+ with self.assertRaisesRegex(TypeError, 'bad typecode'):
+ self.Array('x', [])
+ with self.assertRaisesRegex(TypeError, 'bad typecode'):
+ self.RawArray('x', [])
#
#
#
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py
index 55844ec35a9..d1882a310bb 100644
--- a/Lib/test/datetimetester.py
+++ b/Lib/test/datetimetester.py
@@ -773,6 +773,9 @@ class TestTimeDelta(HarmlessMixedComparison, unittest.TestCase):
microseconds=999999)),
"999999999 days, 23:59:59.999999")
+ # test the Doc/library/datetime.rst recipe
+ eq(f'-({-td(hours=-1)!s})', "-(1:00:00)")
+
def test_repr(self):
name = 'datetime.' + self.theclass.__name__
self.assertEqual(repr(self.theclass(1)),
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index 63a2e427d18..72b8ea89e62 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -31,7 +31,7 @@ WORKER_WORK_DIR_PREFIX = WORK_DIR_PREFIX + 'worker_'
EXIT_TIMEOUT = 120.0
-ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
+ALL_RESOURCES = ('audio', 'console', 'curses', 'largefile', 'network',
'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
# Other resources excluded from --use=all:
diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py
index bdc7ef62943..dcba6369541 100644
--- a/Lib/test/pickletester.py
+++ b/Lib/test/pickletester.py
@@ -2272,7 +2272,11 @@ class AbstractPicklingErrorTests:
def test_nested_lookup_error(self):
# Nested name does not exist
- obj = REX('AbstractPickleTests.spam')
+ global TestGlobal
+ class TestGlobal:
+ class A:
+ pass
+ obj = REX('TestGlobal.A.B.C')
obj.__module__ = __name__
for proto in protocols:
with self.subTest(proto=proto):
@@ -2280,9 +2284,9 @@ class AbstractPicklingErrorTests:
self.dumps(obj, proto)
self.assertEqual(str(cm.exception),
f"Can't pickle {obj!r}: "
- f"it's not found as {__name__}.AbstractPickleTests.spam")
+ f"it's not found as {__name__}.TestGlobal.A.B.C")
self.assertEqual(str(cm.exception.__context__),
- "type object 'AbstractPickleTests' has no attribute 'spam'")
+ "type object 'A' has no attribute 'B'")
obj.__module__ = None
for proto in protocols:
@@ -2290,21 +2294,25 @@ class AbstractPicklingErrorTests:
with self.assertRaises(pickle.PicklingError) as cm:
self.dumps(obj, proto)
self.assertEqual(str(cm.exception),
- f"Can't pickle {obj!r}: it's not found as __main__.AbstractPickleTests.spam")
+ f"Can't pickle {obj!r}: "
+ f"it's not found as __main__.TestGlobal.A.B.C")
self.assertEqual(str(cm.exception.__context__),
- "module '__main__' has no attribute 'AbstractPickleTests'")
+ "module '__main__' has no attribute 'TestGlobal'")
def test_wrong_object_lookup_error(self):
# Name is bound to different object
- obj = REX('AbstractPickleTests')
+ global TestGlobal
+ class TestGlobal:
+ pass
+ obj = REX('TestGlobal')
obj.__module__ = __name__
- AbstractPickleTests.ham = []
for proto in protocols:
with self.subTest(proto=proto):
with self.assertRaises(pickle.PicklingError) as cm:
self.dumps(obj, proto)
self.assertEqual(str(cm.exception),
- f"Can't pickle {obj!r}: it's not the same object as {__name__}.AbstractPickleTests")
+ f"Can't pickle {obj!r}: "
+ f"it's not the same object as {__name__}.TestGlobal")
self.assertIsNone(cm.exception.__context__)
obj.__module__ = None
@@ -2313,9 +2321,10 @@ class AbstractPicklingErrorTests:
with self.assertRaises(pickle.PicklingError) as cm:
self.dumps(obj, proto)
self.assertEqual(str(cm.exception),
- f"Can't pickle {obj!r}: it's not found as __main__.AbstractPickleTests")
+ f"Can't pickle {obj!r}: "
+ f"it's not found as __main__.TestGlobal")
self.assertEqual(str(cm.exception.__context__),
- "module '__main__' has no attribute 'AbstractPickleTests'")
+ "module '__main__' has no attribute 'TestGlobal'")
def test_local_lookup_error(self):
# Test that whichmodule() errors out cleanly when looking up
diff --git a/Lib/test/support/strace_helper.py b/Lib/test/support/strace_helper.py
index 798d6c68869..1a9d2b520b7 100644
--- a/Lib/test/support/strace_helper.py
+++ b/Lib/test/support/strace_helper.py
@@ -178,7 +178,10 @@ def get_syscalls(code, strace_flags, prelude="", cleanup="",
# Moderately expensive (spawns a subprocess), so share results when possible.
@cache
def _can_strace():
- res = strace_python("import sys; sys.exit(0)", [], check=False)
+ res = strace_python("import sys; sys.exit(0)",
+ # --trace option needs strace 5.5 (gh-133741)
+ ["--trace=%process"],
+ check=False)
if res.strace_returncode == 0 and res.python_returncode == 0:
assert res.events(), "Should have parsed multiple calls"
return True
diff --git a/Lib/test/test_annotationlib.py b/Lib/test/test_annotationlib.py
index c3c245ddaf8..73a821d15e3 100644
--- a/Lib/test/test_annotationlib.py
+++ b/Lib/test/test_annotationlib.py
@@ -7,6 +7,7 @@ import collections
import functools
import itertools
import pickle
+from string.templatelib import Interpolation, Template
import typing
import unittest
from annotationlib import (
@@ -273,6 +274,43 @@ class TestStringFormat(unittest.TestCase):
},
)
+ def test_template_str(self):
+ def f(
+ x: t"{a}",
+ y: list[t"{a}"],
+ z: t"{a:b} {c!r} {d!s:t}",
+ a: t"a{b}c{d}e{f}g",
+ b: t"{a:{1}}",
+ c: t"{a | b * c}",
+ ): pass
+
+ annos = get_annotations(f, format=Format.STRING)
+ self.assertEqual(annos, {
+ "x": "t'{a}'",
+ "y": "list[t'{a}']",
+ "z": "t'{a:b} {c!r} {d!s:t}'",
+ "a": "t'a{b}c{d}e{f}g'",
+ # interpolations in the format spec are eagerly evaluated so we can't recover the source
+ "b": "t'{a:1}'",
+ "c": "t'{a | b * c}'",
+ })
+
+ def g(
+ x: t"{a}",
+ ): ...
+
+ annos = get_annotations(g, format=Format.FORWARDREF)
+ templ = annos["x"]
+ # Template and Interpolation don't have __eq__ so we have to compare manually
+ self.assertIsInstance(templ, Template)
+ self.assertEqual(templ.strings, ("", ""))
+ self.assertEqual(len(templ.interpolations), 1)
+ interp = templ.interpolations[0]
+ self.assertEqual(interp.value, support.EqualToForwardRef("a", owner=g))
+ self.assertEqual(interp.expression, "a")
+ self.assertIsNone(interp.conversion)
+ self.assertEqual(interp.format_spec, "")
+
def test_getitem(self):
def f(x: undef1[str, undef2]):
pass
@@ -340,7 +378,7 @@ class TestStringFormat(unittest.TestCase):
def g(
w: a[[int, str], float],
- x: a[{int, str}, 3],
+ x: a[{int}, 3],
y: a[{int: str}, 4],
z: a[(int, str), 5],
):
@@ -350,7 +388,7 @@ class TestStringFormat(unittest.TestCase):
anno,
{
"w": "a[[int, str], float]",
- "x": "a[{int, str}, 3]",
+ "x": "a[{int}, 3]",
"y": "a[{int: str}, 4]",
"z": "a[(int, str), 5]",
},
diff --git a/Lib/test/test_ast/test_ast.py b/Lib/test/test_ast/test_ast.py
index 02628868db0..0776559b900 100644
--- a/Lib/test/test_ast/test_ast.py
+++ b/Lib/test/test_ast/test_ast.py
@@ -1315,6 +1315,15 @@ class CopyTests(unittest.TestCase):
self.assertIs(repl.id, 'y')
self.assertIs(repl.ctx, context)
+ def test_replace_accept_missing_field_with_default(self):
+ node = ast.FunctionDef(name="foo", args=ast.arguments())
+ self.assertIs(node.returns, None)
+ self.assertEqual(node.decorator_list, [])
+ node2 = copy.replace(node, name="bar")
+ self.assertEqual(node2.name, "bar")
+ self.assertIs(node2.returns, None)
+ self.assertEqual(node2.decorator_list, [])
+
def test_replace_reject_known_custom_instance_fields_commits(self):
node = ast.parse('x').body[0].value
node.extra = extra = object() # add instance 'extra' field
diff --git a/Lib/test/test_asyncio/test_ssl.py b/Lib/test/test_asyncio/test_ssl.py
index 986ecc2c5a9..3a7185cd897 100644
--- a/Lib/test/test_asyncio/test_ssl.py
+++ b/Lib/test/test_asyncio/test_ssl.py
@@ -195,9 +195,10 @@ class TestSSL(test_utils.TestCase):
except (BrokenPipeError, ConnectionError):
pass
- def test_create_server_ssl_1(self):
+ @support.bigmemtest(size=25, memuse=90*2**20, dry_run=False)
+ def test_create_server_ssl_1(self, size):
CNT = 0 # number of clients that were successful
- TOTAL_CNT = 25 # total number of clients that test will create
+ TOTAL_CNT = size # total number of clients that test will create
TIMEOUT = support.LONG_TIMEOUT # timeout for this test
A_DATA = b'A' * 1024 * BUF_MULTIPLIER
@@ -1038,9 +1039,10 @@ class TestSSL(test_utils.TestCase):
self.loop.run_until_complete(run_main())
- def test_create_server_ssl_over_ssl(self):
+ @support.bigmemtest(size=25, memuse=90*2**20, dry_run=False)
+ def test_create_server_ssl_over_ssl(self, size):
CNT = 0 # number of clients that were successful
- TOTAL_CNT = 25 # total number of clients that test will create
+ TOTAL_CNT = size # total number of clients that test will create
TIMEOUT = support.LONG_TIMEOUT # timeout for this test
A_DATA = b'A' * 1024 * BUF_MULTIPLIER
diff --git a/Lib/test/test_capi/test_import.py b/Lib/test/test_capi/test_import.py
index 25136624ca4..57e0316fda8 100644
--- a/Lib/test/test_capi/test_import.py
+++ b/Lib/test/test_capi/test_import.py
@@ -134,7 +134,7 @@ class ImportTests(unittest.TestCase):
# CRASHES importmodule(NULL)
def test_importmodulenoblock(self):
- # Test deprecated PyImport_ImportModuleNoBlock()
+ # Test deprecated (stable ABI only) PyImport_ImportModuleNoBlock()
importmodulenoblock = _testlimitedcapi.PyImport_ImportModuleNoBlock
with check_warnings(('', DeprecationWarning)):
self.check_import_func(importmodulenoblock)
diff --git a/Lib/test/test_capi/test_opt.py b/Lib/test/test_capi/test_opt.py
index ba7bcb4540a..651148336f7 100644
--- a/Lib/test/test_capi/test_opt.py
+++ b/Lib/test/test_capi/test_opt.py
@@ -1942,6 +1942,23 @@ class TestUopsOptimization(unittest.TestCase):
self.assertNotIn("_COMPARE_OP_INT", uops)
self.assertNotIn("_GUARD_IS_TRUE_POP", uops)
+ def test_call_isinstance_guards_removed(self):
+ def testfunc(n):
+ x = 0
+ for _ in range(n):
+ y = isinstance(42, int)
+ if y:
+ x += 1
+ return x
+
+ res, ex = self._run_with_optimizer(testfunc, TIER2_THRESHOLD)
+ self.assertEqual(res, TIER2_THRESHOLD)
+ self.assertIsNotNone(ex)
+ uops = get_opnames(ex)
+ self.assertIn("_CALL_ISINSTANCE", uops)
+ self.assertNotIn("_GUARD_THIRD_NULL", uops)
+ self.assertNotIn("_GUARD_CALLABLE_ISINSTANCE", uops)
+
def global_identity(x):
return x
diff --git a/Lib/test/test_clinic.py b/Lib/test/test_clinic.py
index 6461b647925..f7fc3b38733 100644
--- a/Lib/test/test_clinic.py
+++ b/Lib/test/test_clinic.py
@@ -2835,6 +2835,10 @@ class ClinicExternalTest(TestCase):
"size_t",
"slice_index",
"str",
+ "uint16",
+ "uint32",
+ "uint64",
+ "uint8",
"unicode",
"unsigned_char",
"unsigned_int",
diff --git a/Lib/test/test_code.py b/Lib/test/test_code.py
index b646042a3b8..32cf8aacaf6 100644
--- a/Lib/test/test_code.py
+++ b/Lib/test/test_code.py
@@ -220,6 +220,7 @@ try:
import _testinternalcapi
except ModuleNotFoundError:
_testinternalcapi = None
+import test._code_definitions as defs
COPY_FREE_VARS = opmap['COPY_FREE_VARS']
@@ -671,8 +672,21 @@ class CodeTest(unittest.TestCase):
VARARGS = CO_FAST_LOCAL | CO_FAST_ARG_VAR | CO_FAST_ARG_POS
VARKWARGS = CO_FAST_LOCAL | CO_FAST_ARG_VAR | CO_FAST_ARG_KW
- import test._code_definitions as defs
funcs = {
+ defs.simple_script: {},
+ defs.complex_script: {
+ 'obj': CO_FAST_LOCAL,
+ 'pickle': CO_FAST_LOCAL,
+ 'spam_minimal': CO_FAST_LOCAL,
+ 'data': CO_FAST_LOCAL,
+ 'res': CO_FAST_LOCAL,
+ },
+ defs.script_with_globals: {
+ 'obj1': CO_FAST_LOCAL,
+ 'obj2': CO_FAST_LOCAL,
+ },
+ defs.script_with_explicit_empty_return: {},
+ defs.script_with_return: {},
defs.spam_minimal: {},
defs.spam_with_builtins: {
'x': CO_FAST_LOCAL,
@@ -897,8 +911,20 @@ class CodeTest(unittest.TestCase):
},
}
- import test._code_definitions as defs
funcs = {
+ defs.simple_script: new_var_counts(),
+ defs.complex_script: new_var_counts(
+ purelocals=5,
+ globalvars=1,
+ attrs=2,
+ ),
+ defs.script_with_globals: new_var_counts(
+ purelocals=2,
+ globalvars=1,
+ ),
+ defs.script_with_explicit_empty_return: new_var_counts(),
+ defs.script_with_return: new_var_counts(),
+ defs.spam_minimal: new_var_counts(),
defs.spam_minimal: new_var_counts(),
defs.spam_with_builtins: new_var_counts(
purelocals=4,
@@ -1025,42 +1051,35 @@ class CodeTest(unittest.TestCase):
counts = _testinternalcapi.get_code_var_counts(func.__code__)
self.assertEqual(counts, expected)
- def func_with_globals_and_builtins():
- mod1 = _testinternalcapi
- mod2 = dis
- mods = (mod1, mod2)
- checks = tuple(callable(m) for m in mods)
- return callable(mod2), tuple(mods), list(mods), checks
-
- func = func_with_globals_and_builtins
+ func = defs.spam_with_globals_and_builtins
with self.subTest(f'{func} code'):
expected = new_var_counts(
- purelocals=4,
- globalvars=5,
+ purelocals=5,
+ globalvars=6,
)
counts = _testinternalcapi.get_code_var_counts(func.__code__)
self.assertEqual(counts, expected)
with self.subTest(f'{func} with own globals and builtins'):
expected = new_var_counts(
- purelocals=4,
- globalvars=(2, 3),
+ purelocals=5,
+ globalvars=(2, 4),
)
counts = _testinternalcapi.get_code_var_counts(func)
self.assertEqual(counts, expected)
with self.subTest(f'{func} without globals'):
expected = new_var_counts(
- purelocals=4,
- globalvars=(0, 3, 2),
+ purelocals=5,
+ globalvars=(0, 4, 2),
)
counts = _testinternalcapi.get_code_var_counts(func, globalsns={})
self.assertEqual(counts, expected)
with self.subTest(f'{func} without both'):
expected = new_var_counts(
- purelocals=4,
- globalvars=5,
+ purelocals=5,
+ globalvars=6,
)
counts = _testinternalcapi.get_code_var_counts(func, globalsns={},
builtinsns={})
@@ -1068,12 +1087,34 @@ class CodeTest(unittest.TestCase):
with self.subTest(f'{func} without builtins'):
expected = new_var_counts(
- purelocals=4,
- globalvars=(2, 0, 3),
+ purelocals=5,
+ globalvars=(2, 0, 4),
)
counts = _testinternalcapi.get_code_var_counts(func, builtinsns={})
self.assertEqual(counts, expected)
+ @unittest.skipIf(_testinternalcapi is None, "missing _testinternalcapi")
+ def test_stateless(self):
+ self.maxDiff = None
+
+ for func in defs.STATELESS_CODE:
+ with self.subTest((func, '(code)')):
+ _testinternalcapi.verify_stateless_code(func.__code__)
+ for func in defs.STATELESS_FUNCTIONS:
+ with self.subTest((func, '(func)')):
+ _testinternalcapi.verify_stateless_code(func)
+
+ for func in defs.FUNCTIONS:
+ if func not in defs.STATELESS_CODE:
+ with self.subTest((func, '(code)')):
+ with self.assertRaises(Exception):
+ _testinternalcapi.verify_stateless_code(func.__code__)
+
+ if func not in defs.STATELESS_FUNCTIONS:
+ with self.subTest((func, '(func)')):
+ with self.assertRaises(Exception):
+ _testinternalcapi.verify_stateless_code(func)
+
def isinterned(s):
return s is sys.intern(('_' + s + '_')[1:-1])
diff --git a/Lib/test/test_codeop.py b/Lib/test/test_codeop.py
index 0eefc22d11b..ed10bd3dcb6 100644
--- a/Lib/test/test_codeop.py
+++ b/Lib/test/test_codeop.py
@@ -322,7 +322,7 @@ class CodeopTests(unittest.TestCase):
dedent("""\
def foo(x,x):
pass
- """), "duplicate argument 'x' in function definition")
+ """), "duplicate parameter 'x' in function definition")
diff --git a/Lib/test/test_crossinterp.py b/Lib/test/test_crossinterp.py
index 5ac0080db43..b366a29645e 100644
--- a/Lib/test/test_crossinterp.py
+++ b/Lib/test/test_crossinterp.py
@@ -758,6 +758,126 @@ class CodeTests(_GetXIDataTests):
])
+class PureShareableScriptTests(_GetXIDataTests):
+
+ MODE = 'script-pure'
+
+ VALID_SCRIPTS = [
+ '',
+ 'spam',
+ '# a comment',
+ 'print("spam")',
+ 'raise Exception("spam")',
+ """if True:
+ do_something()
+ """,
+ """if True:
+ def spam(x):
+ return x
+ class Spam:
+ def eggs(self):
+ return 42
+ x = Spam().eggs()
+ raise ValueError(spam(x))
+ """,
+ ]
+ INVALID_SCRIPTS = [
+ ' pass', # IndentationError
+ '----', # SyntaxError
+ """if True:
+ def spam():
+ # no body
+ spam()
+ """, # IndentationError
+ ]
+
+ def test_valid_str(self):
+ self.assert_roundtrip_not_equal([
+ *self.VALID_SCRIPTS,
+ ], expecttype=types.CodeType)
+
+ def test_invalid_str(self):
+ self.assert_not_shareable([
+ *self.INVALID_SCRIPTS,
+ ])
+
+ def test_valid_bytes(self):
+ self.assert_roundtrip_not_equal([
+ *(s.encode('utf8') for s in self.VALID_SCRIPTS),
+ ], expecttype=types.CodeType)
+
+ def test_invalid_bytes(self):
+ self.assert_not_shareable([
+ *(s.encode('utf8') for s in self.INVALID_SCRIPTS),
+ ])
+
+ def test_pure_script_code(self):
+ self.assert_roundtrip_equal_not_identical([
+ *(f.__code__ for f in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_impure_script_code(self):
+ self.assert_not_shareable([
+ *(f.__code__ for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_other_code(self):
+ self.assert_not_shareable([
+ *(f.__code__ for f in defs.FUNCTIONS
+ if f not in defs.SCRIPT_FUNCTIONS),
+ *(f.__code__ for f in defs.FUNCTION_LIKE),
+ ])
+
+ def test_pure_script_function(self):
+ self.assert_roundtrip_not_equal([
+ *defs.PURE_SCRIPT_FUNCTIONS,
+ ], expecttype=types.CodeType)
+
+ def test_impure_script_function(self):
+ self.assert_not_shareable([
+ *(f for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_other_function(self):
+ self.assert_not_shareable([
+ *(f for f in defs.FUNCTIONS
+ if f not in defs.SCRIPT_FUNCTIONS),
+ *defs.FUNCTION_LIKE,
+ ])
+
+ def test_other_objects(self):
+ self.assert_not_shareable([
+ None,
+ True,
+ False,
+ Ellipsis,
+ NotImplemented,
+ (),
+ [],
+ {},
+ object(),
+ ])
+
+
+class ShareableScriptTests(PureShareableScriptTests):
+
+ MODE = 'script'
+
+ def test_impure_script_code(self):
+ self.assert_roundtrip_equal_not_identical([
+ *(f.__code__ for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ])
+
+ def test_impure_script_function(self):
+ self.assert_roundtrip_not_equal([
+ *(f for f in defs.SCRIPT_FUNCTIONS
+ if f not in defs.PURE_SCRIPT_FUNCTIONS),
+ ], expecttype=types.CodeType)
+
+
class ShareableTypeTests(_GetXIDataTests):
MODE = 'xidata'
diff --git a/Lib/test/test_dict.py b/Lib/test/test_dict.py
index 3104cbc66cb..69f1a098920 100644
--- a/Lib/test/test_dict.py
+++ b/Lib/test/test_dict.py
@@ -338,17 +338,34 @@ class DictTest(unittest.TestCase):
self.assertRaises(Exc, baddict2.fromkeys, [1])
# test fast path for dictionary inputs
+ res = dict(zip(range(6), [0]*6))
d = dict(zip(range(6), range(6)))
- self.assertEqual(dict.fromkeys(d, 0), dict(zip(range(6), [0]*6)))
-
+ self.assertEqual(dict.fromkeys(d, 0), res)
+ # test fast path for set inputs
+ d = set(range(6))
+ self.assertEqual(dict.fromkeys(d, 0), res)
+ # test slow path for other iterable inputs
+ d = list(range(6))
+ self.assertEqual(dict.fromkeys(d, 0), res)
+
+ # test fast path when object's constructor returns large non-empty dict
class baddict3(dict):
def __new__(cls):
return d
- d = {i : i for i in range(10)}
+ d = {i : i for i in range(1000)}
res = d.copy()
res.update(a=None, b=None, c=None)
self.assertEqual(baddict3.fromkeys({"a", "b", "c"}), res)
+ # test slow path when object is a proper subclass of dict
+ class baddict4(dict):
+ def __init__(self):
+ dict.__init__(self, d)
+ d = {i : i for i in range(1000)}
+ res = d.copy()
+ res.update(a=None, b=None, c=None)
+ self.assertEqual(baddict4.fromkeys({"a", "b", "c"}), res)
+
def test_copy(self):
d = {1: 1, 2: 2, 3: 3}
self.assertIsNot(d.copy(), d)
diff --git a/Lib/test/test_embed.py b/Lib/test/test_embed.py
index 95b2d80464c..46222e521ae 100644
--- a/Lib/test/test_embed.py
+++ b/Lib/test/test_embed.py
@@ -296,7 +296,7 @@ class EmbeddingTests(EmbeddingTestsMixin, unittest.TestCase):
if MS_WINDOWS:
expected_path = self.test_exe
else:
- expected_path = os.path.join(os.getcwd(), "spam")
+ expected_path = os.path.join(os.getcwd(), "_testembed")
expected_output = f"sys.executable: {expected_path}\n"
self.assertIn(expected_output, out)
self.assertEqual(err, '')
@@ -969,7 +969,6 @@ class InitConfigTests(EmbeddingTestsMixin, unittest.TestCase):
'utf8_mode': True,
}
config = {
- 'program_name': './globalvar',
'site_import': False,
'bytes_warning': True,
'warnoptions': ['default::BytesWarning'],
diff --git a/Lib/test/test_free_threading/test_io.py b/Lib/test/test_free_threading/test_io.py
new file mode 100644
index 00000000000..f9bec740ddf
--- /dev/null
+++ b/Lib/test/test_free_threading/test_io.py
@@ -0,0 +1,109 @@
+import threading
+from unittest import TestCase
+from test.support import threading_helper
+from random import randint
+from io import BytesIO
+from sys import getsizeof
+
+
+class TestBytesIO(TestCase):
+ # Test pretty much everything that can break under free-threading.
+ # Non-deterministic, but at least one of these things will fail if
+ # BytesIO object is not free-thread safe.
+
+ def check(self, funcs, *args):
+ barrier = threading.Barrier(len(funcs))
+ threads = []
+
+ for func in funcs:
+ thread = threading.Thread(target=func, args=(barrier, *args))
+
+ threads.append(thread)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ @threading_helper.requires_working_threading()
+ @threading_helper.reap_threads
+ def test_free_threading(self):
+ """Test for segfaults and aborts."""
+
+ def write(barrier, b, *ignore):
+ barrier.wait()
+ try: b.write(b'0' * randint(100, 1000))
+ except ValueError: pass # ignore write fail to closed file
+
+ def writelines(barrier, b, *ignore):
+ barrier.wait()
+ b.write(b'0\n' * randint(100, 1000))
+
+ def truncate(barrier, b, *ignore):
+ barrier.wait()
+ try: b.truncate(0)
+ except: BufferError # ignore exported buffer
+
+ def read(barrier, b, *ignore):
+ barrier.wait()
+ b.read()
+
+ def read1(barrier, b, *ignore):
+ barrier.wait()
+ b.read1()
+
+ def readline(barrier, b, *ignore):
+ barrier.wait()
+ b.readline()
+
+ def readlines(barrier, b, *ignore):
+ barrier.wait()
+ b.readlines()
+
+ def readinto(barrier, b, into, *ignore):
+ barrier.wait()
+ b.readinto(into)
+
+ def close(barrier, b, *ignore):
+ barrier.wait()
+ b.close()
+
+ def getvalue(barrier, b, *ignore):
+ barrier.wait()
+ b.getvalue()
+
+ def getbuffer(barrier, b, *ignore):
+ barrier.wait()
+ b.getbuffer()
+
+ def iter(barrier, b, *ignore):
+ barrier.wait()
+ list(b)
+
+ def getstate(barrier, b, *ignore):
+ barrier.wait()
+ b.__getstate__()
+
+ def setstate(barrier, b, st, *ignore):
+ barrier.wait()
+ b.__setstate__(st)
+
+ def sizeof(barrier, b, *ignore):
+ barrier.wait()
+ getsizeof(b)
+
+ self.check([write] * 10, BytesIO())
+ self.check([writelines] * 10, BytesIO())
+ self.check([write] * 10 + [truncate] * 10, BytesIO())
+ self.check([truncate] + [read] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [read1] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [readline] * 10, BytesIO(b'0\n'*20480))
+ self.check([truncate] + [readlines] * 10, BytesIO(b'0\n'*20480))
+ self.check([truncate] + [readinto] * 10, BytesIO(b'0\n'*204800), bytearray(b'0\n'*204800))
+ self.check([close] + [write] * 10, BytesIO())
+ self.check([truncate] + [getvalue] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [getbuffer] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [iter] * 10, BytesIO(b'0\n'*20480))
+ self.check([truncate] + [getstate] * 10, BytesIO(b'0\n'*204800))
+ self.check([truncate] + [setstate] * 10, BytesIO(b'0\n'*204800), (b'123', 0, None))
+ self.check([truncate] + [sizeof] * 10, BytesIO(b'0\n'*204800))
+
+ # no tests for seek or tell because they don't break anything
diff --git a/Lib/test/test_functools.py b/Lib/test/test_functools.py
index 2e794b0fc95..f7e09fd771e 100644
--- a/Lib/test/test_functools.py
+++ b/Lib/test/test_functools.py
@@ -21,6 +21,7 @@ from weakref import proxy
import contextlib
from inspect import Signature
+from test.support import ALWAYS_EQ
from test.support import import_helper
from test.support import threading_helper
from test.support import cpython_only
@@ -244,6 +245,13 @@ class TestPartial:
actual_args, actual_kwds = p('x', 'y')
self.assertEqual(actual_args, ('x', 0, 'y', 1))
self.assertEqual(actual_kwds, {})
+ # Checks via `is` and not `eq`
+ # thus ALWAYS_EQ isn't treated as Placeholder
+ p = self.partial(capture, ALWAYS_EQ)
+ actual_args, actual_kwds = p()
+ self.assertEqual(len(actual_args), 1)
+ self.assertIs(actual_args[0], ALWAYS_EQ)
+ self.assertEqual(actual_kwds, {})
def test_placeholders_optimization(self):
PH = self.module.Placeholder
@@ -260,6 +268,17 @@ class TestPartial:
self.assertEqual(p2.args, (PH, 0))
self.assertEqual(p2(1), ((1, 0), {}))
+ def test_placeholders_kw_restriction(self):
+ PH = self.module.Placeholder
+ with self.assertRaisesRegex(TypeError, "Placeholder"):
+ self.partial(capture, a=PH)
+ # Passes, as checks via `is` and not `eq`
+ p = self.partial(capture, a=ALWAYS_EQ)
+ actual_args, actual_kwds = p()
+ self.assertEqual(actual_args, ())
+ self.assertEqual(len(actual_kwds), 1)
+ self.assertIs(actual_kwds['a'], ALWAYS_EQ)
+
def test_construct_placeholder_singleton(self):
PH = self.module.Placeholder
tp = type(PH)
diff --git a/Lib/test/test_future_stmt/test_future.py b/Lib/test/test_future_stmt/test_future.py
index 42c6cb3fefa..71f1e616116 100644
--- a/Lib/test/test_future_stmt/test_future.py
+++ b/Lib/test/test_future_stmt/test_future.py
@@ -422,6 +422,11 @@ class AnnotationsFutureTestCase(unittest.TestCase):
eq('(((a)))', 'a')
eq('(((a, b)))', '(a, b)')
eq("1 + 2 + 3")
+ eq("t''")
+ eq("t'{a + b}'")
+ eq("t'{a!s}'")
+ eq("t'{a:b}'")
+ eq("t'{a:b=}'")
def test_fstring_debug_annotations(self):
# f-strings with '=' don't round trip very well, so set the expected
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
index 8fae12c478c..95c98c6ac63 100644
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -7,7 +7,7 @@ from test.support import (verbose, refcount_test,
Py_GIL_DISABLED)
from test.support.import_helper import import_module
from test.support.os_helper import temp_dir, TESTFN, unlink
-from test.support.script_helper import assert_python_ok, make_script
+from test.support.script_helper import assert_python_ok, make_script, run_test_script
from test.support import threading_helper, gc_threshold
import gc
@@ -1127,64 +1127,14 @@ class GCTests(unittest.TestCase):
class IncrementalGCTests(unittest.TestCase):
-
- def setUp(self):
- # Reenable GC as it is disabled module-wide
- gc.enable()
-
- def tearDown(self):
- gc.disable()
-
@unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi")
@requires_gil_enabled("Free threading does not support incremental GC")
- # Use small increments to emulate longer running process in a shorter time
- @gc_threshold(200, 10)
def test_incremental_gc_handles_fast_cycle_creation(self):
-
- class LinkedList:
-
- #Use slots to reduce number of implicit objects
- __slots__ = "next", "prev", "surprise"
-
- def __init__(self, next=None, prev=None):
- self.next = next
- if next is not None:
- next.prev = self
- self.prev = prev
- if prev is not None:
- prev.next = self
-
- def make_ll(depth):
- head = LinkedList()
- for i in range(depth):
- head = LinkedList(head, head.prev)
- return head
-
- head = make_ll(1000)
- count = 1000
-
- # There will be some objects we aren't counting,
- # e.g. the gc stats dicts. This test checks
- # that the counts don't grow, so we try to
- # correct for the uncounted objects
- # This is just an estimate.
- CORRECTION = 20
-
- enabled = gc.isenabled()
- gc.enable()
- olds = []
- initial_heap_size = _testinternalcapi.get_tracked_heap_size()
- for i in range(20_000):
- newhead = make_ll(20)
- count += 20
- newhead.surprise = head
- olds.append(newhead)
- if len(olds) == 20:
- new_objects = _testinternalcapi.get_tracked_heap_size() - initial_heap_size
- self.assertLess(new_objects, 27_000, f"Heap growing. Reached limit after {i} iterations")
- del olds[:]
- if not enabled:
- gc.disable()
+ # Run this test in a fresh process. The number of alive objects (which can
+ # be from unit tests run before this one) can influence how quickly cyclic
+ # garbage is found.
+ script = support.findfile("_test_gc_fast_cycles.py")
+ run_test_script(script)
class GCCallbackTests(unittest.TestCase):
diff --git a/Lib/test/test_generated_cases.py b/Lib/test/test_generated_cases.py
index d481cb07f75..a71ddc01d1c 100644
--- a/Lib/test/test_generated_cases.py
+++ b/Lib/test/test_generated_cases.py
@@ -2069,6 +2069,189 @@ class TestGeneratedAbstractCases(unittest.TestCase):
with self.assertRaisesRegex(AssertionError, "All abstract uops"):
self.run_cases_test(input, input2, output)
+ def test_validate_uop_input_length_mismatch(self):
+ input = """
+ op(OP, (arg1 -- out)) {
+ SPAM();
+ }
+ """
+ input2 = """
+ op(OP, (arg1, arg2 -- out)) {
+ }
+ """
+ output = """
+ """
+ with self.assertRaisesRegex(SyntaxError,
+ "Must have the same number of inputs"):
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_output_length_mismatch(self):
+ input = """
+ op(OP, (arg1 -- out)) {
+ SPAM();
+ }
+ """
+ input2 = """
+ op(OP, (arg1 -- out1, out2)) {
+ }
+ """
+ output = """
+ """
+ with self.assertRaisesRegex(SyntaxError,
+ "Must have the same number of outputs"):
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_input_name_mismatch(self):
+ input = """
+ op(OP, (foo -- out)) {
+ SPAM();
+ }
+ """
+ input2 = """
+ op(OP, (bar -- out)) {
+ }
+ """
+ output = """
+ """
+ with self.assertRaisesRegex(SyntaxError,
+ "Inputs must have equal names"):
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_output_name_mismatch(self):
+ input = """
+ op(OP, (arg1 -- foo)) {
+ SPAM();
+ }
+ """
+ input2 = """
+ op(OP, (arg1 -- bar)) {
+ }
+ """
+ output = """
+ """
+ with self.assertRaisesRegex(SyntaxError,
+ "Outputs must have equal names"):
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_unused_input(self):
+ input = """
+ op(OP, (unused -- )) {
+ }
+ """
+ input2 = """
+ op(OP, (foo -- )) {
+ }
+ """
+ output = """
+ case OP: {
+ stack_pointer += -1;
+ assert(WITHIN_STACK_BOUNDS());
+ break;
+ }
+ """
+ self.run_cases_test(input, input2, output)
+
+ input = """
+ op(OP, (foo -- )) {
+ }
+ """
+ input2 = """
+ op(OP, (unused -- )) {
+ }
+ """
+ output = """
+ case OP: {
+ stack_pointer += -1;
+ assert(WITHIN_STACK_BOUNDS());
+ break;
+ }
+ """
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_unused_output(self):
+ input = """
+ op(OP, ( -- unused)) {
+ }
+ """
+ input2 = """
+ op(OP, ( -- foo)) {
+ foo = NULL;
+ }
+ """
+ output = """
+ case OP: {
+ JitOptSymbol *foo;
+ foo = NULL;
+ stack_pointer[0] = foo;
+ stack_pointer += 1;
+ assert(WITHIN_STACK_BOUNDS());
+ break;
+ }
+ """
+ self.run_cases_test(input, input2, output)
+
+ input = """
+ op(OP, ( -- foo)) {
+ foo = NULL;
+ }
+ """
+ input2 = """
+ op(OP, ( -- unused)) {
+ }
+ """
+ output = """
+ case OP: {
+ stack_pointer += 1;
+ assert(WITHIN_STACK_BOUNDS());
+ break;
+ }
+ """
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_input_size_mismatch(self):
+ input = """
+ op(OP, (arg1[2] -- )) {
+ }
+ """
+ input2 = """
+ op(OP, (arg1[4] -- )) {
+ }
+ """
+ output = """
+ """
+ with self.assertRaisesRegex(SyntaxError,
+ "Inputs must have equal sizes"):
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_output_size_mismatch(self):
+ input = """
+ op(OP, ( -- out[2])) {
+ }
+ """
+ input2 = """
+ op(OP, ( -- out[4])) {
+ }
+ """
+ output = """
+ """
+ with self.assertRaisesRegex(SyntaxError,
+ "Outputs must have equal sizes"):
+ self.run_cases_test(input, input2, output)
+
+ def test_validate_uop_unused_size_mismatch(self):
+ input = """
+ op(OP, (foo[2] -- )) {
+ }
+ """
+ input2 = """
+ op(OP, (unused[4] -- )) {
+ }
+ """
+ output = """
+ """
+ with self.assertRaisesRegex(SyntaxError,
+ "Inputs must have equal sizes"):
+ self.run_cases_test(input, input2, output)
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_htmlparser.py b/Lib/test/test_htmlparser.py
index 4fdba06cf4c..61fa24fab57 100644
--- a/Lib/test/test_htmlparser.py
+++ b/Lib/test/test_htmlparser.py
@@ -317,6 +317,16 @@ text
("endtag", element_lower)],
collector=Collector(convert_charrefs=False))
+ def test_EOF_in_cdata(self):
+ content = """<!-- not a comment --> &not-an-entity-ref;
+ <a href="" /> </p><p> <span></span></style>
+ '</script' + '>'"""
+ s = f'<script>{content}'
+ self._run_check(s, [
+ ("starttag", 'script', []),
+ ("data", content)
+ ])
+
def test_comments(self):
html = ("<!-- I'm a valid comment -->"
'<!--me too!-->'
@@ -566,12 +576,33 @@ text
for html, expected in data:
self._run_check(html, expected)
- def test_broken_comments(self):
+ def test_EOF_in_comments_or_decls(self):
+ data = [
+ ('<!', [('data', '<!')]),
+ ('<!-', [('data', '<!-')]),
+ ('<!--', [('data', '<!--')]),
+ ('<![', [('data', '<![')]),
+ ('<![CDATA[', [('data', '<![CDATA[')]),
+ ('<![CDATA[x', [('data', '<![CDATA[x')]),
+ ('<!DOCTYPE', [('data', '<!DOCTYPE')]),
+ ('<!DOCTYPE HTML', [('data', '<!DOCTYPE HTML')]),
+ ]
+ for html, expected in data:
+ self._run_check(html, expected)
+ def test_bogus_comments(self):
html = ('<! not really a comment >'
'<! not a comment either -->'
'<! -- close enough -->'
'<!><!<-- this was an empty comment>'
- '<!!! another bogus comment !!!>')
+ '<!!! another bogus comment !!!>'
+ # see #32876
+ '<![with square brackets]!>'
+ '<![\nmultiline\nbogusness\n]!>'
+ '<![more brackets]-[and a hyphen]!>'
+ '<![cdata[should be uppercase]]>'
+ '<![CDATA [whitespaces are not ignored]]>'
+ '<![CDATA]]>' # required '[' after CDATA
+ )
expected = [
('comment', ' not really a comment '),
('comment', ' not a comment either --'),
@@ -579,39 +610,65 @@ text
('comment', ''),
('comment', '<-- this was an empty comment'),
('comment', '!! another bogus comment !!!'),
+ ('comment', '[with square brackets]!'),
+ ('comment', '[\nmultiline\nbogusness\n]!'),
+ ('comment', '[more brackets]-[and a hyphen]!'),
+ ('comment', '[cdata[should be uppercase]]'),
+ ('comment', '[CDATA [whitespaces are not ignored]]'),
+ ('comment', '[CDATA]]'),
]
self._run_check(html, expected)
def test_broken_condcoms(self):
# these condcoms are missing the '--' after '<!' and before the '>'
+ # and they are considered bogus comments according to
+ # "8.2.4.42. Markup declaration open state"
html = ('<![if !(IE)]>broken condcom<![endif]>'
'<![if ! IE]><link href="favicon.tiff"/><![endif]>'
'<![if !IE 6]><img src="firefox.png" /><![endif]>'
'<![if !ie 6]><b>foo</b><![endif]>'
'<![if (!IE)|(lt IE 9)]><img src="mammoth.bmp" /><![endif]>')
- # According to the HTML5 specs sections "8.2.4.44 Bogus comment state"
- # and "8.2.4.45 Markup declaration open state", comment tokens should
- # be emitted instead of 'unknown decl', but calling unknown_decl
- # provides more flexibility.
- # See also Lib/_markupbase.py:parse_declaration
expected = [
- ('unknown decl', 'if !(IE)'),
+ ('comment', '[if !(IE)]'),
('data', 'broken condcom'),
- ('unknown decl', 'endif'),
- ('unknown decl', 'if ! IE'),
+ ('comment', '[endif]'),
+ ('comment', '[if ! IE]'),
('startendtag', 'link', [('href', 'favicon.tiff')]),
- ('unknown decl', 'endif'),
- ('unknown decl', 'if !IE 6'),
+ ('comment', '[endif]'),
+ ('comment', '[if !IE 6]'),
('startendtag', 'img', [('src', 'firefox.png')]),
- ('unknown decl', 'endif'),
- ('unknown decl', 'if !ie 6'),
+ ('comment', '[endif]'),
+ ('comment', '[if !ie 6]'),
('starttag', 'b', []),
('data', 'foo'),
('endtag', 'b'),
- ('unknown decl', 'endif'),
- ('unknown decl', 'if (!IE)|(lt IE 9)'),
+ ('comment', '[endif]'),
+ ('comment', '[if (!IE)|(lt IE 9)]'),
('startendtag', 'img', [('src', 'mammoth.bmp')]),
- ('unknown decl', 'endif')
+ ('comment', '[endif]')
+ ]
+ self._run_check(html, expected)
+
+ def test_cdata_declarations(self):
+ # More tests should be added. See also "8.2.4.42. Markup
+ # declaration open state", "8.2.4.69. CDATA section state",
+ # and issue 32876
+ html = ('<![CDATA[just some plain text]]>')
+ expected = [('unknown decl', 'CDATA[just some plain text')]
+ self._run_check(html, expected)
+
+ def test_cdata_declarations_multiline(self):
+ html = ('<code><![CDATA['
+ ' if (a < b && a > b) {'
+ ' printf("[<marquee>How?</marquee>]");'
+ ' }'
+ ']]></code>')
+ expected = [
+ ('starttag', 'code', []),
+ ('unknown decl',
+ 'CDATA[ if (a < b && a > b) { '
+ 'printf("[<marquee>How?</marquee>]"); }'),
+ ('endtag', 'code')
]
self._run_check(html, expected)
diff --git a/Lib/test/test_importlib/test_threaded_import.py b/Lib/test/test_importlib/test_threaded_import.py
index 9af1e4d505c..f78dc399720 100644
--- a/Lib/test/test_importlib/test_threaded_import.py
+++ b/Lib/test/test_importlib/test_threaded_import.py
@@ -135,10 +135,12 @@ class ThreadedImportTests(unittest.TestCase):
if verbose:
print("OK.")
- def test_parallel_module_init(self):
+ @support.bigmemtest(size=50, memuse=76*2**20, dry_run=False)
+ def test_parallel_module_init(self, size):
self.check_parallel_module_init()
- def test_parallel_meta_path(self):
+ @support.bigmemtest(size=50, memuse=76*2**20, dry_run=False)
+ def test_parallel_meta_path(self, size):
finder = Finder()
sys.meta_path.insert(0, finder)
try:
@@ -148,7 +150,8 @@ class ThreadedImportTests(unittest.TestCase):
finally:
sys.meta_path.remove(finder)
- def test_parallel_path_hooks(self):
+ @support.bigmemtest(size=50, memuse=76*2**20, dry_run=False)
+ def test_parallel_path_hooks(self, size):
# Here the Finder instance is only used to check concurrent calls
# to path_hook().
finder = Finder()
@@ -242,13 +245,15 @@ class ThreadedImportTests(unittest.TestCase):
__import__(TESTFN)
del sys.modules[TESTFN]
- def test_concurrent_futures_circular_import(self):
+ @support.bigmemtest(size=1, memuse=1.8*2**30, dry_run=False)
+ def test_concurrent_futures_circular_import(self, size):
# Regression test for bpo-43515
fn = os.path.join(os.path.dirname(__file__),
'partial', 'cfimport.py')
script_helper.assert_python_ok(fn)
- def test_multiprocessing_pool_circular_import(self):
+ @support.bigmemtest(size=1, memuse=1.8*2**30, dry_run=False)
+ def test_multiprocessing_pool_circular_import(self, size):
# Regression test for bpo-41567
fn = os.path.join(os.path.dirname(__file__),
'partial', 'pool_in_threads.py')
diff --git a/Lib/test/test_inspect/test_inspect.py b/Lib/test/test_inspect/test_inspect.py
index c9b37fcd8f6..1c325c0d507 100644
--- a/Lib/test/test_inspect/test_inspect.py
+++ b/Lib/test/test_inspect/test_inspect.py
@@ -4997,6 +4997,37 @@ class TestSignatureObject(unittest.TestCase):
with self.assertRaisesRegex(NameError, "undefined"):
signature_func(ida.f)
+ def test_signature_deferred_annotations(self):
+ def f(x: undef):
+ pass
+
+ class C:
+ x: undef
+
+ def __init__(self, x: undef):
+ self.x = x
+
+ sig = inspect.signature(f, annotation_format=Format.FORWARDREF)
+ self.assertEqual(list(sig.parameters), ['x'])
+ sig = inspect.signature(C, annotation_format=Format.FORWARDREF)
+ self.assertEqual(list(sig.parameters), ['x'])
+
+ class CallableWrapper:
+ def __init__(self, func):
+ self.func = func
+ self.__annotate__ = func.__annotate__
+
+ def __call__(self, *args, **kwargs):
+ return self.func(*args, **kwargs)
+
+ @property
+ def __annotations__(self):
+ return self.__annotate__(Format.VALUE)
+
+ cw = CallableWrapper(f)
+ sig = inspect.signature(cw, annotation_format=Format.FORWARDREF)
+ self.assertEqual(list(sig.parameters), ['args', 'kwargs'])
+
def test_signature_none_annotation(self):
class funclike:
# Has to be callable, and have correct
@@ -6146,12 +6177,14 @@ class TestRepl(unittest.TestCase):
object.
"""
+ # TODO(picnixz): refactor this as it's used by test_repl.py
+
# To run the REPL without using a terminal, spawn python with the command
# line option '-i' and the process name set to '<stdin>'.
# The directory of argv[0] must match the directory of the Python
# executable for the Popen() call to python to succeed as the directory
- # path may be used by Py_GetPath() to build the default module search
- # path.
+ # path may be used by PyConfig_Get("module_search_paths") to build the
+ # default module search path.
stdin_fname = os.path.join(os.path.dirname(sys.executable), "<stdin>")
cmd_line = [stdin_fname, '-E', '-i']
cmd_line.extend(args)
diff --git a/Lib/test/test_linecache.py b/Lib/test/test_linecache.py
index e4aa41ebb43..02f65338428 100644
--- a/Lib/test/test_linecache.py
+++ b/Lib/test/test_linecache.py
@@ -4,10 +4,12 @@ import linecache
import unittest
import os.path
import tempfile
+import threading
import tokenize
from importlib.machinery import ModuleSpec
from test import support
from test.support import os_helper
+from test.support import threading_helper
from test.support.script_helper import assert_python_ok
@@ -374,5 +376,40 @@ class LineCacheInvalidationTests(unittest.TestCase):
self.assertIn(self.unchanged_file, linecache.cache)
+class MultiThreadingTest(unittest.TestCase):
+ @threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
+ def test_read_write_safety(self):
+
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ filenames = []
+ for i in range(10):
+ name = os.path.join(tmpdirname, f"test_{i}.py")
+ with open(name, "w") as h:
+ h.write("import time\n")
+ h.write("import system\n")
+ filenames.append(name)
+
+ def linecache_get_line(b):
+ b.wait()
+ for _ in range(100):
+ for name in filenames:
+ linecache.getline(name, 1)
+
+ def check(funcs):
+ barrier = threading.Barrier(len(funcs))
+ threads = []
+
+ for func in funcs:
+ thread = threading.Thread(target=func, args=(barrier,))
+
+ threads.append(thread)
+
+ with threading_helper.start_threads(threads):
+ pass
+
+ check([linecache_get_line] * 20)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 3f113ec1be4..1e5adcc8db1 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -4214,6 +4214,89 @@ class ConfigDictTest(BaseTest):
handler = logging.getHandlerByName('custom')
self.assertEqual(handler.custom_kwargs, custom_kwargs)
+ # See gh-91555 and gh-90321
+ @support.requires_subprocess()
+ def test_deadlock_in_queue(self):
+ queue = multiprocessing.Queue()
+ handler = logging.handlers.QueueHandler(queue)
+ logger = multiprocessing.get_logger()
+ level = logger.level
+ try:
+ logger.setLevel(logging.DEBUG)
+ logger.addHandler(handler)
+ logger.debug("deadlock")
+ finally:
+ logger.setLevel(level)
+ logger.removeHandler(handler)
+
+ def test_recursion_in_custom_handler(self):
+ class BadHandler(logging.Handler):
+ def __init__(self):
+ super().__init__()
+ def emit(self, record):
+ logger.debug("recurse")
+ logger = logging.getLogger("test_recursion_in_custom_handler")
+ logger.addHandler(BadHandler())
+ logger.setLevel(logging.DEBUG)
+ logger.debug("boom")
+
+ @threading_helper.requires_working_threading()
+ def test_thread_supression_noninterference(self):
+ lock = threading.Lock()
+ logger = logging.getLogger("test_thread_supression_noninterference")
+
+ # Block on the first call, allow others through
+ #
+ # NOTE: We need to bypass the base class's lock, otherwise that will
+ # block multiple calls to the same handler itself.
+ class BlockOnceHandler(TestHandler):
+ def __init__(self, barrier):
+ super().__init__(support.Matcher())
+ self.barrier = barrier
+
+ def createLock(self):
+ self.lock = None
+
+ def handle(self, record):
+ self.emit(record)
+
+ def emit(self, record):
+ if self.barrier:
+ barrier = self.barrier
+ self.barrier = None
+ barrier.wait()
+ with lock:
+ pass
+ super().emit(record)
+ logger.info("blow up if not supressed")
+
+ barrier = threading.Barrier(2)
+ handler = BlockOnceHandler(barrier)
+ logger.addHandler(handler)
+ logger.setLevel(logging.DEBUG)
+
+ t1 = threading.Thread(target=logger.debug, args=("1",))
+ with lock:
+
+ # Ensure first thread is blocked in the handler, hence supressing logging...
+ t1.start()
+ barrier.wait()
+
+ # ...but the second thread should still be able to log...
+ t2 = threading.Thread(target=logger.debug, args=("2",))
+ t2.start()
+ t2.join(timeout=3)
+
+ self.assertEqual(len(handler.buffer), 1)
+ self.assertTrue(handler.matches(levelno=logging.DEBUG, message='2'))
+
+ # The first thread should still be blocked here
+ self.assertTrue(t1.is_alive())
+
+ # Now the lock has been released the first thread should complete
+ t1.join()
+ self.assertEqual(len(handler.buffer), 2)
+ self.assertTrue(handler.matches(levelno=logging.DEBUG, message='1'))
class ManagerTest(BaseTest):
def test_manager_loggerclass(self):
diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py
index 68c2b508fa6..6b74e21ad73 100644
--- a/Lib/test/test_pdb.py
+++ b/Lib/test/test_pdb.py
@@ -4749,7 +4749,9 @@ class TestREPLSession(unittest.TestCase):
@support.force_not_colorized_test_class
@support.requires_subprocess()
class PdbTestReadline(unittest.TestCase):
- def setUpClass():
+
+ @classmethod
+ def setUpClass(cls):
# Ensure that the readline module is loaded
# If this fails, the test is skipped because SkipTest will be raised
readline = import_module('readline')
diff --git a/Lib/test/test_positional_only_arg.py b/Lib/test/test_positional_only_arg.py
index eea0625012d..e412cb1d58d 100644
--- a/Lib/test/test_positional_only_arg.py
+++ b/Lib/test/test_positional_only_arg.py
@@ -37,8 +37,8 @@ class PositionalOnlyTestCase(unittest.TestCase):
check_syntax_error(self, "def f(/): pass")
check_syntax_error(self, "def f(*, a, /): pass")
check_syntax_error(self, "def f(*, /, a): pass")
- check_syntax_error(self, "def f(a, /, a): pass", "duplicate argument 'a' in function definition")
- check_syntax_error(self, "def f(a, /, *, a): pass", "duplicate argument 'a' in function definition")
+ check_syntax_error(self, "def f(a, /, a): pass", "duplicate parameter 'a' in function definition")
+ check_syntax_error(self, "def f(a, /, *, a): pass", "duplicate parameter 'a' in function definition")
check_syntax_error(self, "def f(a, b/2, c): pass")
check_syntax_error(self, "def f(a, /, c, /): pass")
check_syntax_error(self, "def f(a, /, c, /, d): pass")
@@ -59,8 +59,8 @@ class PositionalOnlyTestCase(unittest.TestCase):
check_syntax_error(self, "async def f(/): pass")
check_syntax_error(self, "async def f(*, a, /): pass")
check_syntax_error(self, "async def f(*, /, a): pass")
- check_syntax_error(self, "async def f(a, /, a): pass", "duplicate argument 'a' in function definition")
- check_syntax_error(self, "async def f(a, /, *, a): pass", "duplicate argument 'a' in function definition")
+ check_syntax_error(self, "async def f(a, /, a): pass", "duplicate parameter 'a' in function definition")
+ check_syntax_error(self, "async def f(a, /, *, a): pass", "duplicate parameter 'a' in function definition")
check_syntax_error(self, "async def f(a, b/2, c): pass")
check_syntax_error(self, "async def f(a, /, c, /): pass")
check_syntax_error(self, "async def f(a, /, c, /, d): pass")
@@ -247,8 +247,8 @@ class PositionalOnlyTestCase(unittest.TestCase):
check_syntax_error(self, "lambda /: None")
check_syntax_error(self, "lambda *, a, /: None")
check_syntax_error(self, "lambda *, /, a: None")
- check_syntax_error(self, "lambda a, /, a: None", "duplicate argument 'a' in function definition")
- check_syntax_error(self, "lambda a, /, *, a: None", "duplicate argument 'a' in function definition")
+ check_syntax_error(self, "lambda a, /, a: None", "duplicate parameter 'a' in function definition")
+ check_syntax_error(self, "lambda a, /, *, a: None", "duplicate parameter 'a' in function definition")
check_syntax_error(self, "lambda a, /, b, /: None")
check_syntax_error(self, "lambda a, /, b, /, c: None")
check_syntax_error(self, "lambda a, /, b, /, c, *, d: None")
diff --git a/Lib/test/test_pyrepl/test_interact.py b/Lib/test/test_pyrepl/test_interact.py
index a20719033fc..8c0eeab6dca 100644
--- a/Lib/test/test_pyrepl/test_interact.py
+++ b/Lib/test/test_pyrepl/test_interact.py
@@ -113,7 +113,7 @@ class TestSimpleInteract(unittest.TestCase):
r = """
def f(x, x): ...
^
-SyntaxError: duplicate argument 'x' in function definition"""
+SyntaxError: duplicate parameter 'x' in function definition"""
self.assertIn(r, f.getvalue())
def test_runsource_shows_syntax_error_for_failed_compilation(self):
diff --git a/Lib/test/test_pyrepl/test_pyrepl.py b/Lib/test/test_pyrepl/test_pyrepl.py
index 93029ab6e08..fc8114891d1 100644
--- a/Lib/test/test_pyrepl/test_pyrepl.py
+++ b/Lib/test/test_pyrepl/test_pyrepl.py
@@ -452,6 +452,11 @@ class TestPyReplAutoindent(TestCase):
)
# fmt: on
+ events = code_to_events(input_code)
+ reader = self.prepare_reader(events)
+ output = multiline_input(reader)
+ self.assertEqual(output, output_code)
+
def test_auto_indent_continuation(self):
# auto indenting according to previous user indentation
# fmt: off
diff --git a/Lib/test/test_pyrepl/test_reader.py b/Lib/test/test_pyrepl/test_reader.py
index 4ee320a5a4d..57526f88f93 100644
--- a/Lib/test/test_pyrepl/test_reader.py
+++ b/Lib/test/test_pyrepl/test_reader.py
@@ -497,6 +497,26 @@ class TestReaderInColor(ScreenEqualMixin, TestCase):
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected)
+ def test_syntax_highlighting_indentation_error(self):
+ code = dedent(
+ """\
+ def unfinished_function():
+ var = 1
+ oops
+ """
+ )
+ expected = dedent(
+ """\
+ {k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z}
+ var {o}={z} {n}1{z}
+ oops
+ """
+ ).format(**colors)
+ events = code_to_events(code)
+ reader, _ = handle_all_events(events)
+ self.assert_screen_equal(reader, code, clean=True)
+ self.assert_screen_equal(reader, expected)
+
def test_control_characters(self):
code = 'flag = "🏳️‍🌈"'
events = code_to_events(code)
diff --git a/Lib/test/test_repl.py b/Lib/test/test_repl.py
index 228b326699e..f4a4634fc62 100644
--- a/Lib/test/test_repl.py
+++ b/Lib/test/test_repl.py
@@ -38,8 +38,8 @@ def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
# line option '-i' and the process name set to '<stdin>'.
# The directory of argv[0] must match the directory of the Python
# executable for the Popen() call to python to succeed as the directory
- # path may be used by Py_GetPath() to build the default module search
- # path.
+ # path may be used by PyConfig_Get("module_search_paths") to build the
+ # default module search path.
stdin_fname = os.path.join(os.path.dirname(sys.executable), "<stdin>")
cmd_line = [stdin_fname, '-I', '-i']
cmd_line.extend(args)
@@ -197,7 +197,7 @@ class TestInteractiveInterpreter(unittest.TestCase):
expected_lines = [
' def f(x, x): ...',
' ^',
- "SyntaxError: duplicate argument 'x' in function definition"
+ "SyntaxError: duplicate parameter 'x' in function definition"
]
self.assertEqual(output.splitlines()[4:-1], expected_lines)
diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py
index ad0dcb3cccb..37e0f74f688 100644
--- a/Lib/test/test_sqlite3/test_cli.py
+++ b/Lib/test/test_sqlite3/test_cli.py
@@ -8,10 +8,11 @@ from test.support import (
captured_stdout,
captured_stderr,
captured_stdin,
- force_not_colorized,
+ force_not_colorized_test_class,
)
+@force_not_colorized_test_class
class CommandLineInterface(unittest.TestCase):
def _do_test(self, *args, expect_success=True):
@@ -37,7 +38,6 @@ class CommandLineInterface(unittest.TestCase):
self.assertEqual(out, "")
return err
- @force_not_colorized
def test_cli_help(self):
out = self.expect_success("-h")
self.assertIn("usage: ", out)
@@ -69,6 +69,7 @@ class CommandLineInterface(unittest.TestCase):
self.assertIn("(0,)", out)
+@force_not_colorized_test_class
class InteractiveSession(unittest.TestCase):
MEMORY_DB_MSG = "Connected to a transient in-memory database"
PS1 = "sqlite> "
@@ -116,6 +117,38 @@ class InteractiveSession(unittest.TestCase):
self.assertEqual(out.count(self.PS2), 0)
self.assertIn(sqlite3.sqlite_version, out)
+ def test_interact_empty_source(self):
+ out, err = self.run_cli(commands=("", " "))
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertEndsWith(out, self.PS1)
+ self.assertEqual(out.count(self.PS1), 3)
+ self.assertEqual(out.count(self.PS2), 0)
+
+ def test_interact_dot_commands_unknown(self):
+ out, err = self.run_cli(commands=(".unknown_command", ))
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertEndsWith(out, self.PS1)
+ self.assertEqual(out.count(self.PS1), 2)
+ self.assertEqual(out.count(self.PS2), 0)
+ self.assertIn("Error", err)
+ # test "unknown_command" is pointed out in the error message
+ self.assertIn("unknown_command", err)
+
+ def test_interact_dot_commands_empty(self):
+ out, err = self.run_cli(commands=("."))
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertEndsWith(out, self.PS1)
+ self.assertEqual(out.count(self.PS1), 2)
+ self.assertEqual(out.count(self.PS2), 0)
+
+ def test_interact_dot_commands_with_whitespaces(self):
+ out, err = self.run_cli(commands=(".version ", ". version"))
+ self.assertIn(self.MEMORY_DB_MSG, err)
+ self.assertEqual(out.count(sqlite3.sqlite_version + "\n"), 2)
+ self.assertEndsWith(out, self.PS1)
+ self.assertEqual(out.count(self.PS1), 3)
+ self.assertEqual(out.count(self.PS2), 0)
+
def test_interact_valid_sql(self):
out, err = self.run_cli(commands=("SELECT 1;",))
self.assertIn(self.MEMORY_DB_MSG, err)
@@ -158,6 +191,14 @@ class InteractiveSession(unittest.TestCase):
out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",))
self.assertIn("(0,)\n", out)
+ def test_color(self):
+ with unittest.mock.patch("_colorize.can_colorize", return_value=True):
+ out, err = self.run_cli(commands="TEXT\n")
+ self.assertIn("\x1b[1;35msqlite> \x1b[0m", out)
+ self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out)
+ out, err = self.run_cli(commands=("sel;",))
+ self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: '
+ '\x1b[35mnear "sel": syntax error\x1b[0m', err)
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_sqlite3/test_dbapi.py b/Lib/test/test_sqlite3/test_dbapi.py
index c3aa3bf2d7b..291e0356253 100644
--- a/Lib/test/test_sqlite3/test_dbapi.py
+++ b/Lib/test/test_sqlite3/test_dbapi.py
@@ -550,17 +550,9 @@ class ConnectionTests(unittest.TestCase):
cx.execute("insert into u values(0)")
def test_connect_positional_arguments(self):
- regex = (
- r"Passing more than 1 positional argument to sqlite3.connect\(\)"
- " is deprecated. Parameters 'timeout', 'detect_types', "
- "'isolation_level', 'check_same_thread', 'factory', "
- "'cached_statements' and 'uri' will become keyword-only "
- "parameters in Python 3.15."
- )
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
- cx = sqlite.connect(":memory:", 1.0)
- cx.close()
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(TypeError,
+ r'connect\(\) takes at most 1 positional arguments'):
+ sqlite.connect(":memory:", 1.0)
def test_connection_resource_warning(self):
with self.assertWarns(ResourceWarning):
diff --git a/Lib/test/test_sqlite3/test_factory.py b/Lib/test/test_sqlite3/test_factory.py
index cc9f1ec5c4b..776659e3b16 100644
--- a/Lib/test/test_sqlite3/test_factory.py
+++ b/Lib/test/test_sqlite3/test_factory.py
@@ -71,18 +71,9 @@ class ConnectionFactoryTests(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(Factory, self).__init__(*args, **kwargs)
- regex = (
- r"Passing more than 1 positional argument to _sqlite3.Connection\(\) "
- r"is deprecated. Parameters 'timeout', 'detect_types', "
- r"'isolation_level', 'check_same_thread', 'factory', "
- r"'cached_statements' and 'uri' will become keyword-only "
- r"parameters in Python 3.15."
- )
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
- with memory_database(5.0, 0, None, True, Factory) as con:
- self.assertIsNone(con.isolation_level)
- self.assertIsInstance(con, Factory)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(TypeError,
+ r'connect\(\) takes at most 1 positional arguments'):
+ memory_database(5.0, 0, None, True, Factory)
class CursorFactoryTests(MemoryDatabaseMixin, unittest.TestCase):
diff --git a/Lib/test/test_sqlite3/test_hooks.py b/Lib/test/test_sqlite3/test_hooks.py
index 53b8a39bf29..2b907e35131 100644
--- a/Lib/test/test_sqlite3/test_hooks.py
+++ b/Lib/test/test_sqlite3/test_hooks.py
@@ -220,16 +220,9 @@ class ProgressTests(MemoryDatabaseMixin, unittest.TestCase):
""")
def test_progress_handler_keyword_args(self):
- regex = (
- r"Passing keyword argument 'progress_handler' to "
- r"_sqlite3.Connection.set_progress_handler\(\) is deprecated. "
- r"Parameter 'progress_handler' will become positional-only in "
- r"Python 3.15."
- )
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
+ with self.assertRaisesRegex(TypeError,
+ 'takes at least 1 positional argument'):
self.con.set_progress_handler(progress_handler=lambda: None, n=1)
- self.assertEqual(cm.filename, __file__)
class TraceCallbackTests(MemoryDatabaseMixin, unittest.TestCase):
@@ -353,16 +346,9 @@ class TraceCallbackTests(MemoryDatabaseMixin, unittest.TestCase):
cx.execute("select 1")
def test_trace_keyword_args(self):
- regex = (
- r"Passing keyword argument 'trace_callback' to "
- r"_sqlite3.Connection.set_trace_callback\(\) is deprecated. "
- r"Parameter 'trace_callback' will become positional-only in "
- r"Python 3.15."
- )
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
+ with self.assertRaisesRegex(TypeError,
+ 'takes exactly 1 positional argument'):
self.con.set_trace_callback(trace_callback=lambda: None)
- self.assertEqual(cm.filename, __file__)
if __name__ == "__main__":
diff --git a/Lib/test/test_sqlite3/test_userfunctions.py b/Lib/test/test_sqlite3/test_userfunctions.py
index 3abc43a3b1a..11cf877a011 100644
--- a/Lib/test/test_sqlite3/test_userfunctions.py
+++ b/Lib/test/test_sqlite3/test_userfunctions.py
@@ -422,27 +422,9 @@ class FunctionTests(unittest.TestCase):
self.con.execute, "select badreturn()")
def test_func_keyword_args(self):
- regex = (
- r"Passing keyword arguments 'name', 'narg' and 'func' to "
- r"_sqlite3.Connection.create_function\(\) is deprecated. "
- r"Parameters 'name', 'narg' and 'func' will become "
- r"positional-only in Python 3.15."
- )
-
- def noop():
- return None
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
- self.con.create_function("noop", 0, func=noop)
- self.assertEqual(cm.filename, __file__)
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
- self.con.create_function("noop", narg=0, func=noop)
- self.assertEqual(cm.filename, __file__)
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
- self.con.create_function(name="noop", narg=0, func=noop)
- self.assertEqual(cm.filename, __file__)
+ with self.assertRaisesRegex(TypeError,
+ 'takes exactly 3 positional arguments'):
+ self.con.create_function("noop", 0, func=lambda: None)
class WindowSumInt:
@@ -737,25 +719,9 @@ class AggregateTests(unittest.TestCase):
self.assertEqual(val, txt)
def test_agg_keyword_args(self):
- regex = (
- r"Passing keyword arguments 'name', 'n_arg' and 'aggregate_class' to "
- r"_sqlite3.Connection.create_aggregate\(\) is deprecated. "
- r"Parameters 'name', 'n_arg' and 'aggregate_class' will become "
- r"positional-only in Python 3.15."
- )
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
+ with self.assertRaisesRegex(TypeError,
+ 'takes exactly 3 positional arguments'):
self.con.create_aggregate("test", 1, aggregate_class=AggrText)
- self.assertEqual(cm.filename, __file__)
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
- self.con.create_aggregate("test", n_arg=1, aggregate_class=AggrText)
- self.assertEqual(cm.filename, __file__)
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
- self.con.create_aggregate(name="test", n_arg=0,
- aggregate_class=AggrText)
- self.assertEqual(cm.filename, __file__)
class AuthorizerTests(unittest.TestCase):
@@ -800,16 +766,9 @@ class AuthorizerTests(unittest.TestCase):
self.con.execute("select c2 from t1")
def test_authorizer_keyword_args(self):
- regex = (
- r"Passing keyword argument 'authorizer_callback' to "
- r"_sqlite3.Connection.set_authorizer\(\) is deprecated. "
- r"Parameter 'authorizer_callback' will become positional-only in "
- r"Python 3.15."
- )
-
- with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
+ with self.assertRaisesRegex(TypeError,
+ 'takes exactly 1 positional argument'):
self.con.set_authorizer(authorizer_callback=lambda: None)
- self.assertEqual(cm.filename, __file__)
class AuthorizerRaiseExceptionTests(AuthorizerTests):
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 395b2ef88ab..06460d6047c 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -4488,6 +4488,7 @@ class ThreadedTests(unittest.TestCase):
@requires_tls_version('TLSv1_3')
@unittest.skipUnless(ssl.HAS_PSK, 'TLS-PSK disabled on this OpenSSL build')
+ @unittest.skipUnless(ssl.HAS_PSK_TLS13, 'TLS 1.3 PSK disabled on this OpenSSL build')
def test_psk_tls1_3(self):
psk = bytes.fromhex('deadbeef')
identity_hint = 'identity-hint'
diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py
index 0ee17849e28..0eccf03a1a9 100644
--- a/Lib/test/test_syntax.py
+++ b/Lib/test/test_syntax.py
@@ -419,7 +419,7 @@ SyntaxError: invalid syntax
>>> def foo(/,a,b=,c):
... pass
Traceback (most recent call last):
-SyntaxError: at least one argument must precede /
+SyntaxError: at least one parameter must precede /
>>> def foo(a,/,/,b,c):
... pass
@@ -454,67 +454,67 @@ SyntaxError: / must be ahead of *
>>> def foo(a,*b=3,c):
... pass
Traceback (most recent call last):
-SyntaxError: var-positional argument cannot have default value
+SyntaxError: var-positional parameter cannot have default value
>>> def foo(a,*b: int=,c):
... pass
Traceback (most recent call last):
-SyntaxError: var-positional argument cannot have default value
+SyntaxError: var-positional parameter cannot have default value
>>> def foo(a,**b=3):
... pass
Traceback (most recent call last):
-SyntaxError: var-keyword argument cannot have default value
+SyntaxError: var-keyword parameter cannot have default value
>>> def foo(a,**b: int=3):
... pass
Traceback (most recent call last):
-SyntaxError: var-keyword argument cannot have default value
+SyntaxError: var-keyword parameter cannot have default value
>>> def foo(a,*a, b, **c, d):
... pass
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> def foo(a,*a, b, **c, d=4):
... pass
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> def foo(a,*a, b, **c, *d):
... pass
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> def foo(a,*a, b, **c, **d):
... pass
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> def foo(a=1,/,**b,/,c):
... pass
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> def foo(*b,*d):
... pass
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> def foo(a,*b,c,*d,*e,c):
... pass
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> def foo(a,b,/,c,*b,c,*d,*e,c):
... pass
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> def foo(a,b,/,c,*b,c,*d,**e):
... pass
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> def foo(a=1,/*,b,c):
... pass
@@ -538,7 +538,7 @@ SyntaxError: expected default value expression
>>> lambda /,a,b,c: None
Traceback (most recent call last):
-SyntaxError: at least one argument must precede /
+SyntaxError: at least one parameter must precede /
>>> lambda a,/,/,b,c: None
Traceback (most recent call last):
@@ -570,47 +570,47 @@ SyntaxError: expected comma between / and *
>>> lambda a,*b=3,c: None
Traceback (most recent call last):
-SyntaxError: var-positional argument cannot have default value
+SyntaxError: var-positional parameter cannot have default value
>>> lambda a,**b=3: None
Traceback (most recent call last):
-SyntaxError: var-keyword argument cannot have default value
+SyntaxError: var-keyword parameter cannot have default value
>>> lambda a, *a, b, **c, d: None
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> lambda a,*a, b, **c, d=4: None
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> lambda a,*a, b, **c, *d: None
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> lambda a,*a, b, **c, **d: None
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> lambda a=1,/,**b,/,c: None
Traceback (most recent call last):
-SyntaxError: arguments cannot follow var-keyword argument
+SyntaxError: parameters cannot follow var-keyword parameter
>>> lambda *b,*d: None
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> lambda a,*b,c,*d,*e,c: None
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> lambda a,b,/,c,*b,c,*d,*e,c: None
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> lambda a,b,/,c,*b,c,*d,**e: None
Traceback (most recent call last):
-SyntaxError: * argument may appear only once
+SyntaxError: * may appear only once
>>> lambda a=1,d=,c: None
Traceback (most recent call last):
@@ -1304,7 +1304,7 @@ Missing parens after function definition
Traceback (most recent call last):
SyntaxError: expected '('
-Parenthesized arguments in function definitions
+Parenthesized parameters in function definitions
>>> def f(x, (y, z), w):
... pass
@@ -2178,7 +2178,7 @@ Corner-cases that used to fail to raise the correct error:
>>> with (lambda *:0): pass
Traceback (most recent call last):
- SyntaxError: named arguments must follow bare *
+ SyntaxError: named parameters must follow bare *
Corner-cases that used to crash:
diff --git a/Lib/test/test_threadedtempfile.py b/Lib/test/test_threadedtempfile.py
index 420fc6ec8be..acb427b0c78 100644
--- a/Lib/test/test_threadedtempfile.py
+++ b/Lib/test/test_threadedtempfile.py
@@ -15,6 +15,7 @@ provoking a 2.0 failure under Linux.
import tempfile
+from test import support
from test.support import threading_helper
import unittest
import io
@@ -49,7 +50,8 @@ class TempFileGreedy(threading.Thread):
class ThreadedTempFileTest(unittest.TestCase):
- def test_main(self):
+ @support.bigmemtest(size=NUM_THREADS, memuse=60*2**20, dry_run=False)
+ def test_main(self, size):
threads = [TempFileGreedy() for i in range(NUM_THREADS)]
with threading_helper.start_threads(threads, startEvent.set):
pass
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 4ab38c2598b..abe63c10c0a 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -530,7 +530,8 @@ class ThreadTests(BaseTestCase):
finally:
sys.setswitchinterval(old_interval)
- def test_join_from_multiple_threads(self):
+ @support.bigmemtest(size=20, memuse=72*2**20, dry_run=False)
+ def test_join_from_multiple_threads(self, size):
# Thread.join() should be thread-safe
errors = []
@@ -1431,7 +1432,8 @@ class ThreadJoinOnShutdown(BaseTestCase):
self._run_and_join(script)
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_4_daemon_threads(self):
+ @support.bigmemtest(size=40, memuse=70*2**20, dry_run=False)
+ def test_4_daemon_threads(self, size):
# Check that a daemon thread cannot crash the interpreter on shutdown
# by manipulating internal structures that are being disposed of in
# the main thread.
diff --git a/Lib/test/test_unparse.py b/Lib/test/test_unparse.py
index d3af7a8489e..d4db5e60af7 100644
--- a/Lib/test/test_unparse.py
+++ b/Lib/test/test_unparse.py
@@ -817,6 +817,15 @@ class CosmeticTestCase(ASTTestCase):
self.check_ast_roundtrip("def f[T: int = int, **P = int, *Ts = *int]():\n pass")
self.check_ast_roundtrip("class C[T: int = int, **P = int, *Ts = *int]():\n pass")
+ def test_tstr(self):
+ self.check_ast_roundtrip("t'{a + b}'")
+ self.check_ast_roundtrip("t'{a + b:x}'")
+ self.check_ast_roundtrip("t'{a + b!s}'")
+ self.check_ast_roundtrip("t'{ {a}}'")
+ self.check_ast_roundtrip("t'{ {a}=}'")
+ self.check_ast_roundtrip("t'{{a}}'")
+ self.check_ast_roundtrip("t''")
+
class ManualASTCreationTestCase(unittest.TestCase):
"""Test that AST nodes created without a type_params field unparse correctly."""
diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py
index 5fe9d688410..8f277952007 100644
--- a/Lib/test/test_xml_etree.py
+++ b/Lib/test/test_xml_etree.py
@@ -2960,6 +2960,50 @@ class BadElementTest(ElementTestCase, unittest.TestCase):
del b
gc_collect()
+ def test_deepcopy_clear(self):
+ # Prevent crashes when __deepcopy__() clears the children list.
+ # See https://github.com/python/cpython/issues/133009.
+ class X(ET.Element):
+ def __deepcopy__(self, memo):
+ root.clear()
+ return self
+
+ root = ET.Element('a')
+ evil = X('x')
+ root.extend([evil, ET.Element('y')])
+ if is_python_implementation():
+ # Mutating a list over which we iterate raises an error.
+ self.assertRaises(RuntimeError, copy.deepcopy, root)
+ else:
+ c = copy.deepcopy(root)
+ # In the C implementation, we can still copy the evil element.
+ self.assertListEqual(list(c), [evil])
+
+ def test_deepcopy_grow(self):
+ # Prevent crashes when __deepcopy__() mutates the children list.
+ # See https://github.com/python/cpython/issues/133009.
+ a = ET.Element('a')
+ b = ET.Element('b')
+ c = ET.Element('c')
+
+ class X(ET.Element):
+ def __deepcopy__(self, memo):
+ root.append(a)
+ root.append(b)
+ return self
+
+ root = ET.Element('top')
+ evil1, evil2 = X('1'), X('2')
+ root.extend([evil1, c, evil2])
+ children = list(copy.deepcopy(root))
+ # mock deep copies
+ self.assertIs(children[0], evil1)
+ self.assertIs(children[2], evil2)
+ # true deep copies
+ self.assertEqual(children[1].tag, c.tag)
+ self.assertEqual([c.tag for c in children[3:]],
+ [a.tag, b.tag, a.tag, b.tag])
+
class MutationDeleteElementPath(str):
def __new__(cls, elem, *args):
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py
index f4a25376e52..713294c4c27 100644
--- a/Lib/test/test_zstd.py
+++ b/Lib/test/test_zstd.py
@@ -281,7 +281,7 @@ class CompressorTestCase(unittest.TestCase):
with self.assertRaisesRegex(ZstdError,
(r'Error when setting zstd compression parameter "window_log", '
r'it should \d+ <= value <= \d+, provided value is 100\. '
- r'\(zstd v\d\.\d\.\d, (?:32|64)-bit build\)')):
+ r'\((?:32|64)-bit build\)')):
compress(b'', options=option)
def test_unknown_compression_parameter(self):
@@ -413,7 +413,7 @@ class DecompressorTestCase(unittest.TestCase):
with self.assertRaisesRegex(ZstdError,
(r'Error when setting zstd decompression parameter "window_log_max", '
r'it should \d+ <= value <= \d+, provided value is 100\. '
- r'\(zstd v\d\.\d\.\d, (?:32|64)-bit build\)')):
+ r'\((?:32|64)-bit build\)')):
decompress(b'', options=options)
def test_unknown_decompression_parameter(self):
@@ -1174,43 +1174,43 @@ class ZstdDictTestCase(unittest.TestCase):
def test_train_dict_c(self):
# argument wrong type
with self.assertRaises(TypeError):
- _zstd._train_dict({}, (), 100)
+ _zstd.train_dict({}, (), 100)
with self.assertRaises(TypeError):
- _zstd._train_dict(b'', 99, 100)
+ _zstd.train_dict(b'', 99, 100)
with self.assertRaises(TypeError):
- _zstd._train_dict(b'', (), 100.1)
+ _zstd.train_dict(b'', (), 100.1)
# size > size_t
with self.assertRaises(ValueError):
- _zstd._train_dict(b'', (2**64+1,), 100)
+ _zstd.train_dict(b'', (2**64+1,), 100)
# dict_size <= 0
with self.assertRaises(ValueError):
- _zstd._train_dict(b'', (), 0)
+ _zstd.train_dict(b'', (), 0)
def test_finalize_dict_c(self):
with self.assertRaises(TypeError):
- _zstd._finalize_dict(1, 2, 3, 4, 5)
+ _zstd.finalize_dict(1, 2, 3, 4, 5)
# argument wrong type
with self.assertRaises(TypeError):
- _zstd._finalize_dict({}, b'', (), 100, 5)
+ _zstd.finalize_dict({}, b'', (), 100, 5)
with self.assertRaises(TypeError):
- _zstd._finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5)
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, {}, (), 100, 5)
with self.assertRaises(TypeError):
- _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5)
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', 99, 100, 5)
with self.assertRaises(TypeError):
- _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5)
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100.1, 5)
with self.assertRaises(TypeError):
- _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1)
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 100, 5.1)
# size > size_t
with self.assertRaises(ValueError):
- _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5)
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (2**64+1,), 100, 5)
# dict_size <= 0
with self.assertRaises(ValueError):
- _zstd._finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5)
+ _zstd.finalize_dict(TRAINED_DICT.dict_content, b'', (), 0, 5)
def test_train_buffer_protocol_samples(self):
def _nbytes(dat):
@@ -1232,19 +1232,19 @@ class ZstdDictTestCase(unittest.TestCase):
# wrong size list
with self.assertRaisesRegex(ValueError,
"The samples size tuple doesn't match the concatenation's size"):
- _zstd._train_dict(concatenation, tuple(wrong_size_lst), 100*_1K)
+ _zstd.train_dict(concatenation, tuple(wrong_size_lst), 100*_1K)
# correct size list
- _zstd._train_dict(concatenation, tuple(correct_size_lst), 3*_1K)
+ _zstd.train_dict(concatenation, tuple(correct_size_lst), 3*_1K)
# wrong size list
with self.assertRaisesRegex(ValueError,
"The samples size tuple doesn't match the concatenation's size"):
- _zstd._finalize_dict(TRAINED_DICT.dict_content,
+ _zstd.finalize_dict(TRAINED_DICT.dict_content,
concatenation, tuple(wrong_size_lst), 300*_1K, 5)
# correct size list
- _zstd._finalize_dict(TRAINED_DICT.dict_content,
+ _zstd.finalize_dict(TRAINED_DICT.dict_content,
concatenation, tuple(correct_size_lst), 300*_1K, 5)
def test_as_prefix(self):
@@ -1682,10 +1682,10 @@ class FileTestCase(unittest.TestCase):
# Trailing data isn't a valid compressed stream
with ZstdFile(io.BytesIO(self.FRAME_42 + b'12345')) as f:
- self.assertEqual(f.read(), self.DECOMPRESSED_42)
+ self.assertRaises(ZstdError, f.read)
with ZstdFile(io.BytesIO(SKIPPABLE_FRAME + b'12345')) as f:
- self.assertEqual(f.read(), b'')
+ self.assertRaises(ZstdError, f.read)
def test_read_truncated(self):
# Drop stream epilogue: 4 bytes checksum