aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/libregrtest/save_env.py2
-rw-r--r--Lib/test/libregrtest/tsan.py4
-rw-r--r--Lib/test/support/channels.py8
-rw-r--r--Lib/test/test_argparse.py2
-rw-r--r--Lib/test/test_asyncgen.py2
-rw-r--r--Lib/test/test_asyncio/test_base_events.py90
-rw-r--r--Lib/test/test_asyncio/test_buffered_proto.py2
-rw-r--r--Lib/test/test_asyncio/test_context.py2
-rw-r--r--Lib/test/test_asyncio/test_eager_task_factory.py2
-rw-r--r--Lib/test/test_asyncio/test_events.py34
-rw-r--r--Lib/test/test_asyncio/test_free_threading.py2
-rw-r--r--Lib/test/test_asyncio/test_futures.py2
-rw-r--r--Lib/test/test_asyncio/test_futures2.py2
-rw-r--r--Lib/test/test_asyncio/test_graph.py2
-rw-r--r--Lib/test/test_asyncio/test_locks.py2
-rw-r--r--Lib/test/test_asyncio/test_pep492.py2
-rw-r--r--Lib/test/test_asyncio/test_proactor_events.py2
-rw-r--r--Lib/test/test_asyncio/test_protocols.py2
-rw-r--r--Lib/test/test_asyncio/test_queues.py2
-rw-r--r--Lib/test/test_asyncio/test_runners.py16
-rw-r--r--Lib/test/test_asyncio/test_selector_events.py2
-rw-r--r--Lib/test/test_asyncio/test_sendfile.py2
-rw-r--r--Lib/test/test_asyncio/test_server.py2
-rw-r--r--Lib/test/test_asyncio/test_sock_lowlevel.py2
-rw-r--r--Lib/test/test_asyncio/test_ssl.py2
-rw-r--r--Lib/test/test_asyncio/test_sslproto.py2
-rw-r--r--Lib/test/test_asyncio/test_staggered.py2
-rw-r--r--Lib/test/test_asyncio/test_streams.py2
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py2
-rw-r--r--Lib/test/test_asyncio/test_taskgroups.py2
-rw-r--r--Lib/test/test_asyncio/test_tasks.py2
-rw-r--r--Lib/test/test_asyncio/test_threads.py2
-rw-r--r--Lib/test/test_asyncio/test_timeouts.py2
-rw-r--r--Lib/test/test_asyncio/test_transports.py2
-rw-r--r--Lib/test/test_asyncio/test_unix_events.py2
-rw-r--r--Lib/test/test_asyncio/test_waitfor.py2
-rw-r--r--Lib/test/test_asyncio/test_windows_events.py14
-rw-r--r--Lib/test/test_asyncio/test_windows_utils.py2
-rw-r--r--Lib/test/test_asyncio/utils.py6
-rw-r--r--Lib/test/test_builtin.py4
-rw-r--r--Lib/test/test_cext/__init__.py47
-rw-r--r--Lib/test/test_cext/extension.c20
-rw-r--r--Lib/test/test_cext/setup.py33
-rw-r--r--Lib/test/test_concurrent_futures/test_interpreter_pool.py43
-rw-r--r--Lib/test/test_coroutines.py2
-rw-r--r--Lib/test/test_cppext/__init__.py24
-rw-r--r--Lib/test/test_cppext/extension.cpp9
-rw-r--r--Lib/test/test_cppext/setup.py4
-rw-r--r--Lib/test/test_dbm.py3
-rw-r--r--Lib/test/test_decimal.py13
-rw-r--r--Lib/test/test_dictcomps.py2
-rw-r--r--Lib/test/test_enum.py37
-rw-r--r--Lib/test/test_external_inspection.py464
-rw-r--r--Lib/test/test_fractions.py13
-rw-r--r--Lib/test/test_gc.py25
-rw-r--r--Lib/test/test_importlib/test_util.py2
-rw-r--r--Lib/test/test_inspect/test_inspect.py2
-rw-r--r--Lib/test/test_interpreters/test_api.py8
-rw-r--r--Lib/test/test_interpreters/test_channels.py16
-rw-r--r--Lib/test/test_interpreters/test_queues.py8
-rw-r--r--Lib/test/test_logging.py6
-rw-r--r--Lib/test/test_os.py2
-rw-r--r--Lib/test/test_sample_profiler.py1877
-rw-r--r--Lib/test/test_setcomps.py2
-rw-r--r--Lib/test/test_sqlite3/test_dbapi.py26
-rw-r--r--Lib/test/test_string/_support.py44
-rw-r--r--Lib/test/test_string/test_templatelib.py13
-rw-r--r--Lib/test/test_typing.py31
-rw-r--r--Lib/test/test_unittest/test_async_case.py6
-rw-r--r--Lib/test/test_unittest/testmock/testasync.py2
-rw-r--r--Lib/test/test_zipfile/_path/test_path.py2
-rw-r--r--Lib/test/test_zoneinfo/test_zoneinfo_property.py16
-rw-r--r--Lib/test/test_zstd.py8
-rw-r--r--Lib/test/typinganndata/fwdref_module.py6
74 files changed, 2753 insertions, 303 deletions
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
index ffc29fa8dc6..4cf1a075b30 100644
--- a/Lib/test/libregrtest/save_env.py
+++ b/Lib/test/libregrtest/save_env.py
@@ -97,7 +97,7 @@ class saved_test_environment:
return support.maybe_get_event_loop_policy()
def restore_asyncio_events__event_loop_policy(self, policy):
asyncio = self.get_module('asyncio')
- asyncio._set_event_loop_policy(policy)
+ asyncio.events._set_event_loop_policy(policy)
def get_sys_argv(self):
return id(sys.argv), sys.argv, sys.argv[:]
diff --git a/Lib/test/libregrtest/tsan.py b/Lib/test/libregrtest/tsan.py
index 3545c5f999f..f1f8c8bde92 100644
--- a/Lib/test/libregrtest/tsan.py
+++ b/Lib/test/libregrtest/tsan.py
@@ -3,9 +3,7 @@
TSAN_TESTS = [
'test_asyncio',
- # TODO: enable more of test_capi once bugs are fixed (GH-116908, GH-116909).
- 'test_capi.test_mem',
- 'test_capi.test_pyatomic',
+ 'test_capi',
'test_code',
'test_ctypes',
'test_concurrent_futures',
diff --git a/Lib/test/support/channels.py b/Lib/test/support/channels.py
index b2de24d9d3e..fab1797659b 100644
--- a/Lib/test/support/channels.py
+++ b/Lib/test/support/channels.py
@@ -105,12 +105,8 @@ class _ChannelEnd:
return other._id == self._id
# for pickling:
- def __getnewargs__(self):
- return (int(self._id),)
-
- # for pickling:
- def __getstate__(self):
- return None
+ def __reduce__(self):
+ return (type(self), (int(self._id),))
@property
def id(self):
diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py
index 08ff41368d9..ddd48b1bc0c 100644
--- a/Lib/test/test_argparse.py
+++ b/Lib/test/test_argparse.py
@@ -1829,7 +1829,7 @@ BIN_STDERR_SENTINEL = object()
class StdStreamComparer:
def __init__(self, attr):
# We try to use the actual stdXXX.buffer attribute as our
- # marker, but but under some test environments,
+ # marker, but under some test environments,
# sys.stdout/err are replaced by io.StringIO which won't have .buffer,
# so we use a sentinel simply to show that the tests do the right thing
# for any buffer supporting object
diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py
index 636cb33dd98..cd33878d6c7 100644
--- a/Lib/test/test_asyncgen.py
+++ b/Lib/test/test_asyncgen.py
@@ -629,7 +629,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
def tearDown(self):
self.loop.close()
self.loop = None
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def check_async_iterator_anext(self, ait_class):
with self.subTest(anext="pure-Python"):
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index 12179eb0c9e..8c02de77c24 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -29,7 +29,7 @@ class CustomError(Exception):
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def mock_socket_module():
@@ -150,6 +150,29 @@ class BaseEventTests(test_utils.TestCase):
socket.SOCK_STREAM,
socket.IPPROTO_TCP))
+ def test_interleave_addrinfos(self):
+ self.maxDiff = None
+ SIX_A = (socket.AF_INET6, 0, 0, '', ('2001:db8::1', 1))
+ SIX_B = (socket.AF_INET6, 0, 0, '', ('2001:db8::2', 2))
+ SIX_C = (socket.AF_INET6, 0, 0, '', ('2001:db8::3', 3))
+ SIX_D = (socket.AF_INET6, 0, 0, '', ('2001:db8::4', 4))
+ FOUR_A = (socket.AF_INET, 0, 0, '', ('192.0.2.1', 5))
+ FOUR_B = (socket.AF_INET, 0, 0, '', ('192.0.2.2', 6))
+ FOUR_C = (socket.AF_INET, 0, 0, '', ('192.0.2.3', 7))
+ FOUR_D = (socket.AF_INET, 0, 0, '', ('192.0.2.4', 8))
+
+ addrinfos = [SIX_A, SIX_B, SIX_C, FOUR_A, FOUR_B, FOUR_C, FOUR_D, SIX_D]
+ expected = [SIX_A, FOUR_A, SIX_B, FOUR_B, SIX_C, FOUR_C, SIX_D, FOUR_D]
+
+ self.assertEqual(expected, base_events._interleave_addrinfos(addrinfos))
+
+ expected_fafc_2 = [SIX_A, SIX_B, FOUR_A, SIX_C, FOUR_B, SIX_D, FOUR_C, FOUR_D]
+ self.assertEqual(
+ expected_fafc_2,
+ base_events._interleave_addrinfos(addrinfos, first_address_family_count=2),
+ )
+
+
class BaseEventLoopTests(test_utils.TestCase):
@@ -1053,6 +1076,71 @@ class BaseEventLoopTests(test_utils.TestCase):
test_utils.run_briefly(self.loop)
self.assertTrue(status['finalized'])
+ @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
+ @patch_socket
+ def test_create_connection_happy_eyeballs(self, m_socket):
+
+ class MyProto(asyncio.Protocol):
+ pass
+
+ async def getaddrinfo(*args, **kw):
+ return [(socket.AF_INET6, 0, 0, '', ('2001:db8::1', 1)),
+ (socket.AF_INET, 0, 0, '', ('192.0.2.1', 5))]
+
+ async def sock_connect(sock, address):
+ if address[0] == '2001:db8::1':
+ await asyncio.sleep(1)
+ sock.connect(address)
+
+ loop = asyncio.new_event_loop()
+ loop._add_writer = mock.Mock()
+ loop._add_writer = mock.Mock()
+ loop._add_reader = mock.Mock()
+ loop.getaddrinfo = getaddrinfo
+ loop.sock_connect = sock_connect
+
+ coro = loop.create_connection(MyProto, 'example.com', 80, happy_eyeballs_delay=0.3)
+ transport, protocol = loop.run_until_complete(coro)
+ try:
+ sock = transport._sock
+ sock.connect.assert_called_with(('192.0.2.1', 5))
+ finally:
+ transport.close()
+ test_utils.run_briefly(loop) # allow transport to close
+ loop.close()
+
+ @patch_socket
+ def test_create_connection_happy_eyeballs_ipv4_only(self, m_socket):
+
+ class MyProto(asyncio.Protocol):
+ pass
+
+ async def getaddrinfo(*args, **kw):
+ return [(socket.AF_INET, 0, 0, '', ('192.0.2.1', 5)),
+ (socket.AF_INET, 0, 0, '', ('192.0.2.2', 6))]
+
+ async def sock_connect(sock, address):
+ if address[0] == '192.0.2.1':
+ await asyncio.sleep(1)
+ sock.connect(address)
+
+ loop = asyncio.new_event_loop()
+ loop._add_writer = mock.Mock()
+ loop._add_writer = mock.Mock()
+ loop._add_reader = mock.Mock()
+ loop.getaddrinfo = getaddrinfo
+ loop.sock_connect = sock_connect
+
+ coro = loop.create_connection(MyProto, 'example.com', 80, happy_eyeballs_delay=0.3)
+ transport, protocol = loop.run_until_complete(coro)
+ try:
+ sock = transport._sock
+ sock.connect.assert_called_with(('192.0.2.2', 6))
+ finally:
+ transport.close()
+ test_utils.run_briefly(loop) # allow transport to close
+ loop.close()
+
class MyProto(asyncio.Protocol):
done = None
diff --git a/Lib/test/test_asyncio/test_buffered_proto.py b/Lib/test/test_asyncio/test_buffered_proto.py
index 9c386dd2e63..6d3edcc36f5 100644
--- a/Lib/test/test_asyncio/test_buffered_proto.py
+++ b/Lib/test/test_asyncio/test_buffered_proto.py
@@ -5,7 +5,7 @@ from test.test_asyncio import functional as func_tests
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class ReceiveStuffProto(asyncio.BufferedProtocol):
diff --git a/Lib/test/test_asyncio/test_context.py b/Lib/test/test_asyncio/test_context.py
index ad394f44e7e..f85f39839cb 100644
--- a/Lib/test/test_asyncio/test_context.py
+++ b/Lib/test/test_asyncio/test_context.py
@@ -4,7 +4,7 @@ import unittest
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
@unittest.skipUnless(decimal.HAVE_CONTEXTVAR, "decimal is built with a thread-local context")
diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py
index 9f3b6f9acef..da79ee9260a 100644
--- a/Lib/test/test_asyncio/test_eager_task_factory.py
+++ b/Lib/test/test_asyncio/test_eager_task_factory.py
@@ -13,7 +13,7 @@ MOCK_ANY = mock.ANY
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class EagerTaskFactoryLoopTests:
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 873c503fa02..919d543b032 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -38,7 +38,7 @@ from test.support import threading_helper
from test.support import ALWAYS_EQ, LARGEST, SMALLEST
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def broken_unix_getsockname():
@@ -2843,13 +2843,13 @@ class PolicyTests(unittest.TestCase):
self.assertIsInstance(policy, asyncio.DefaultEventLoopPolicy)
def test_event_loop_policy(self):
- policy = asyncio._AbstractEventLoopPolicy()
+ policy = asyncio.events._AbstractEventLoopPolicy()
self.assertRaises(NotImplementedError, policy.get_event_loop)
self.assertRaises(NotImplementedError, policy.set_event_loop, object())
self.assertRaises(NotImplementedError, policy.new_event_loop)
def test_get_event_loop(self):
- policy = asyncio._DefaultEventLoopPolicy()
+ policy = test_utils.DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
with self.assertRaises(RuntimeError):
@@ -2857,7 +2857,7 @@ class PolicyTests(unittest.TestCase):
self.assertIsNone(policy._local._loop)
def test_get_event_loop_does_not_call_set_event_loop(self):
- policy = asyncio._DefaultEventLoopPolicy()
+ policy = test_utils.DefaultEventLoopPolicy()
with mock.patch.object(
policy, "set_event_loop",
@@ -2869,7 +2869,7 @@ class PolicyTests(unittest.TestCase):
m_set_event_loop.assert_not_called()
def test_get_event_loop_after_set_none(self):
- policy = asyncio._DefaultEventLoopPolicy()
+ policy = test_utils.DefaultEventLoopPolicy()
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)
@@ -2877,7 +2877,7 @@ class PolicyTests(unittest.TestCase):
def test_get_event_loop_thread(self, m_current_thread):
def f():
- policy = asyncio._DefaultEventLoopPolicy()
+ policy = test_utils.DefaultEventLoopPolicy()
self.assertRaises(RuntimeError, policy.get_event_loop)
th = threading.Thread(target=f)
@@ -2885,14 +2885,14 @@ class PolicyTests(unittest.TestCase):
th.join()
def test_new_event_loop(self):
- policy = asyncio._DefaultEventLoopPolicy()
+ policy = test_utils.DefaultEventLoopPolicy()
loop = policy.new_event_loop()
self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
def test_set_event_loop(self):
- policy = asyncio._DefaultEventLoopPolicy()
+ policy = test_utils.DefaultEventLoopPolicy()
old_loop = policy.new_event_loop()
policy.set_event_loop(old_loop)
@@ -2909,7 +2909,7 @@ class PolicyTests(unittest.TestCase):
with self.assertWarnsRegex(
DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
policy = asyncio.get_event_loop_policy()
- self.assertIsInstance(policy, asyncio._AbstractEventLoopPolicy)
+ self.assertIsInstance(policy, asyncio.events._AbstractEventLoopPolicy)
self.assertIs(policy, asyncio.get_event_loop_policy())
def test_set_event_loop_policy(self):
@@ -2922,7 +2922,7 @@ class PolicyTests(unittest.TestCase):
DeprecationWarning, "'asyncio.get_event_loop_policy' is deprecated"):
old_policy = asyncio.get_event_loop_policy()
- policy = asyncio._DefaultEventLoopPolicy()
+ policy = test_utils.DefaultEventLoopPolicy()
with self.assertWarnsRegex(
DeprecationWarning, "'asyncio.set_event_loop_policy' is deprecated"):
asyncio.set_event_loop_policy(policy)
@@ -3034,13 +3034,13 @@ class GetEventLoopTestsMixin:
class TestError(Exception):
pass
- class Policy(asyncio._DefaultEventLoopPolicy):
+ class Policy(test_utils.DefaultEventLoopPolicy):
def get_event_loop(self):
raise TestError
- old_policy = asyncio._get_event_loop_policy()
+ old_policy = asyncio.events._get_event_loop_policy()
try:
- asyncio._set_event_loop_policy(Policy())
+ asyncio.events._set_event_loop_policy(Policy())
loop = asyncio.new_event_loop()
with self.assertRaises(TestError):
@@ -3068,7 +3068,7 @@ class GetEventLoopTestsMixin:
asyncio.get_event_loop()
finally:
- asyncio._set_event_loop_policy(old_policy)
+ asyncio.events._set_event_loop_policy(old_policy)
if loop is not None:
loop.close()
@@ -3078,9 +3078,9 @@ class GetEventLoopTestsMixin:
self.assertIs(asyncio._get_running_loop(), None)
def test_get_event_loop_returns_running_loop2(self):
- old_policy = asyncio._get_event_loop_policy()
+ old_policy = asyncio.events._get_event_loop_policy()
try:
- asyncio._set_event_loop_policy(asyncio._DefaultEventLoopPolicy())
+ asyncio.events._set_event_loop_policy(test_utils.DefaultEventLoopPolicy())
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)
@@ -3106,7 +3106,7 @@ class GetEventLoopTestsMixin:
asyncio.get_event_loop()
finally:
- asyncio._set_event_loop_policy(old_policy)
+ asyncio.events._set_event_loop_policy(old_policy)
if loop is not None:
loop.close()
diff --git a/Lib/test/test_asyncio/test_free_threading.py b/Lib/test/test_asyncio/test_free_threading.py
index 110996c3485..d874ed00bd7 100644
--- a/Lib/test/test_asyncio/test_free_threading.py
+++ b/Lib/test/test_asyncio/test_free_threading.py
@@ -15,7 +15,7 @@ class MyException(Exception):
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class TestFreeThreading:
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index 39bef465bdb..666f9c9ee18 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -17,7 +17,7 @@ from test import support
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def _fakefunc(f):
diff --git a/Lib/test/test_asyncio/test_futures2.py b/Lib/test/test_asyncio/test_futures2.py
index e2cddea01ec..c7c0ebdac1b 100644
--- a/Lib/test/test_asyncio/test_futures2.py
+++ b/Lib/test/test_asyncio/test_futures2.py
@@ -7,7 +7,7 @@ from asyncio import tasks
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class FutureTests:
diff --git a/Lib/test/test_asyncio/test_graph.py b/Lib/test/test_asyncio/test_graph.py
index 62f6593c31d..2f22fbccba4 100644
--- a/Lib/test/test_asyncio/test_graph.py
+++ b/Lib/test/test_asyncio/test_graph.py
@@ -5,7 +5,7 @@ import unittest
# To prevent a warning "test altered the execution environment"
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def capture_test_stack(*, fut=None, depth=1):
diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py
index 047f03cbb14..e025d2990a3 100644
--- a/Lib/test/test_asyncio/test_locks.py
+++ b/Lib/test/test_asyncio/test_locks.py
@@ -20,7 +20,7 @@ RGX_REPR = re.compile(STR_RGX_REPR)
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class LockTests(unittest.IsolatedAsyncioTestCase):
diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py
index 48f4a75e0fd..a0c8434c945 100644
--- a/Lib/test/test_asyncio/test_pep492.py
+++ b/Lib/test/test_asyncio/test_pep492.py
@@ -11,7 +11,7 @@ from test.test_asyncio import utils as test_utils
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
# Test that asyncio.iscoroutine() uses collections.abc.Coroutine
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
index 24c4e8546b1..b25daaface0 100644
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -18,7 +18,7 @@ from test.test_asyncio import utils as test_utils
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def close_transport(transport):
diff --git a/Lib/test/test_asyncio/test_protocols.py b/Lib/test/test_asyncio/test_protocols.py
index 4484a031988..29d3bd22705 100644
--- a/Lib/test/test_asyncio/test_protocols.py
+++ b/Lib/test/test_asyncio/test_protocols.py
@@ -7,7 +7,7 @@ import asyncio
def tearDownModule():
# not needed for the test file but added for uniformness with all other
# asyncio test files for the sake of unified cleanup
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class ProtocolsAbsTests(unittest.TestCase):
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py
index 090b9774c22..54bbe79f81f 100644
--- a/Lib/test/test_asyncio/test_queues.py
+++ b/Lib/test/test_asyncio/test_queues.py
@@ -6,7 +6,7 @@ from types import GenericAlias
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class QueueBasicTests(unittest.IsolatedAsyncioTestCase):
diff --git a/Lib/test/test_asyncio/test_runners.py b/Lib/test/test_asyncio/test_runners.py
index 21f277bc2d8..8a4d7f5c796 100644
--- a/Lib/test/test_asyncio/test_runners.py
+++ b/Lib/test/test_asyncio/test_runners.py
@@ -12,14 +12,14 @@ from unittest.mock import patch
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def interrupt_self():
_thread.interrupt_main()
-class TestPolicy(asyncio._AbstractEventLoopPolicy):
+class TestPolicy(asyncio.events._AbstractEventLoopPolicy):
def __init__(self, loop_factory):
self.loop_factory = loop_factory
@@ -61,15 +61,15 @@ class BaseTest(unittest.TestCase):
super().setUp()
policy = TestPolicy(self.new_loop)
- asyncio._set_event_loop_policy(policy)
+ asyncio.events._set_event_loop_policy(policy)
def tearDown(self):
- policy = asyncio._get_event_loop_policy()
+ policy = asyncio.events._get_event_loop_policy()
if policy.loop is not None:
self.assertTrue(policy.loop.is_closed())
self.assertTrue(policy.loop.shutdown_ag_run)
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
super().tearDown()
@@ -208,7 +208,7 @@ class RunTests(BaseTest):
await asyncio.sleep(0)
return 42
- policy = asyncio._get_event_loop_policy()
+ policy = asyncio.events._get_event_loop_policy()
policy.set_event_loop = mock.Mock()
asyncio.run(main())
self.assertTrue(policy.set_event_loop.called)
@@ -259,7 +259,7 @@ class RunTests(BaseTest):
loop.set_task_factory(Task)
return loop
- asyncio._set_event_loop_policy(TestPolicy(new_event_loop))
+ asyncio.events._set_event_loop_policy(TestPolicy(new_event_loop))
with self.assertRaises(asyncio.CancelledError):
asyncio.run(main())
@@ -495,7 +495,7 @@ class RunnerTests(BaseTest):
async def coro():
pass
- policy = asyncio._get_event_loop_policy()
+ policy = asyncio.events._get_event_loop_policy()
policy.set_event_loop = mock.Mock()
runner = asyncio.Runner()
runner.run(coro())
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
index aab6a779170..7b6d1bce5e4 100644
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -24,7 +24,7 @@ MOCK_ANY = mock.ANY
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class TestBaseSelectorEventLoop(BaseSelectorEventLoop):
diff --git a/Lib/test/test_asyncio/test_sendfile.py b/Lib/test/test_asyncio/test_sendfile.py
index e1b766d06cb..dcd963b3355 100644
--- a/Lib/test/test_asyncio/test_sendfile.py
+++ b/Lib/test/test_asyncio/test_sendfile.py
@@ -22,7 +22,7 @@ except ImportError:
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class MySendfileProto(asyncio.Protocol):
diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py
index 32211f4cba3..5bd0f7e2af4 100644
--- a/Lib/test/test_asyncio/test_server.py
+++ b/Lib/test/test_asyncio/test_server.py
@@ -11,7 +11,7 @@ from test.test_asyncio import functional as func_tests
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class BaseStartServer(func_tests.FunctionalTestCaseMixin):
diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py
index 4f7b9a1dda6..df4ec794897 100644
--- a/Lib/test/test_asyncio/test_sock_lowlevel.py
+++ b/Lib/test/test_asyncio/test_sock_lowlevel.py
@@ -15,7 +15,7 @@ if socket_helper.tcp_blackhole():
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class MyProto(asyncio.Protocol):
diff --git a/Lib/test/test_asyncio/test_ssl.py b/Lib/test/test_asyncio/test_ssl.py
index 3a7185cd897..06118f3a615 100644
--- a/Lib/test/test_asyncio/test_ssl.py
+++ b/Lib/test/test_asyncio/test_ssl.py
@@ -30,7 +30,7 @@ BUF_MULTIPLIER = 1024 if not MACOS else 64
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class MyBaseProto(asyncio.Protocol):
diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py
index aa248c5786f..3e304c16642 100644
--- a/Lib/test/test_asyncio/test_sslproto.py
+++ b/Lib/test/test_asyncio/test_sslproto.py
@@ -21,7 +21,7 @@ from test.test_asyncio import functional as func_tests
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
@unittest.skipIf(ssl is None, 'No ssl module')
diff --git a/Lib/test/test_asyncio/test_staggered.py b/Lib/test/test_asyncio/test_staggered.py
index ad34aa6da01..32e4817b70d 100644
--- a/Lib/test/test_asyncio/test_staggered.py
+++ b/Lib/test/test_asyncio/test_staggered.py
@@ -8,7 +8,7 @@ support.requires_working_socket(module=True)
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class StaggeredTests(unittest.IsolatedAsyncioTestCase):
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 4fa4384346f..f93ee54abc6 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -18,7 +18,7 @@ from test.support import socket_helper
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class StreamTests(test_utils.TestCase):
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index 341e3e979e0..3a17c169c34 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -37,7 +37,7 @@ PROGRAM_CAT = [
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
diff --git a/Lib/test/test_asyncio/test_taskgroups.py b/Lib/test/test_asyncio/test_taskgroups.py
index 0d69a436fdb..91f6b03b459 100644
--- a/Lib/test/test_asyncio/test_taskgroups.py
+++ b/Lib/test/test_asyncio/test_taskgroups.py
@@ -15,7 +15,7 @@ from test.test_asyncio.utils import await_without_task
# To prevent a warning "test altered the execution environment"
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class MyExc(Exception):
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index f6f976f213a..931a43816a2 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -24,7 +24,7 @@ from test.support.warnings_helper import ignore_warnings
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
async def coroutine_function():
diff --git a/Lib/test/test_asyncio/test_threads.py b/Lib/test/test_asyncio/test_threads.py
index c98c9a9b395..8ad5f9b2c9e 100644
--- a/Lib/test/test_asyncio/test_threads.py
+++ b/Lib/test/test_asyncio/test_threads.py
@@ -8,7 +8,7 @@ from unittest import mock
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class ToThreadTests(unittest.IsolatedAsyncioTestCase):
diff --git a/Lib/test/test_asyncio/test_timeouts.py b/Lib/test/test_asyncio/test_timeouts.py
index 3ba84d63b2c..f60722c48b7 100644
--- a/Lib/test/test_asyncio/test_timeouts.py
+++ b/Lib/test/test_asyncio/test_timeouts.py
@@ -9,7 +9,7 @@ from test.test_asyncio.utils import await_without_task
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class TimeoutTests(unittest.IsolatedAsyncioTestCase):
diff --git a/Lib/test/test_asyncio/test_transports.py b/Lib/test/test_asyncio/test_transports.py
index af10d3dc2a8..dbb572e2e15 100644
--- a/Lib/test/test_asyncio/test_transports.py
+++ b/Lib/test/test_asyncio/test_transports.py
@@ -10,7 +10,7 @@ from asyncio import transports
def tearDownModule():
# not needed for the test file but added for uniformness with all other
# asyncio test files for the sake of unified cleanup
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class TransportTests(unittest.TestCase):
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index e020c1f3e4f..22982dc9d8a 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -30,7 +30,7 @@ from test.test_asyncio import utils as test_utils
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
MOCK_ANY = mock.ANY
diff --git a/Lib/test/test_asyncio/test_waitfor.py b/Lib/test/test_asyncio/test_waitfor.py
index d083f6b4d2a..dedc6bf69d7 100644
--- a/Lib/test/test_asyncio/test_waitfor.py
+++ b/Lib/test/test_asyncio/test_waitfor.py
@@ -5,7 +5,7 @@ from test import support
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
# The following value can be used as a very small timeout:
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
index 69e9905205e..0af3368627a 100644
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -19,7 +19,7 @@ from test.test_asyncio import utils as test_utils
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class UpperProto(asyncio.Protocol):
@@ -330,16 +330,16 @@ class WinPolicyTests(WindowsEventsTestCase):
async def main():
self.assertIsInstance(asyncio.get_running_loop(), asyncio.SelectorEventLoop)
- old_policy = asyncio._get_event_loop_policy()
+ old_policy = asyncio.events._get_event_loop_policy()
try:
with self.assertWarnsRegex(
DeprecationWarning,
"'asyncio.WindowsSelectorEventLoopPolicy' is deprecated",
):
- asyncio._set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+ asyncio.events._set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
finally:
- asyncio._set_event_loop_policy(old_policy)
+ asyncio.events._set_event_loop_policy(old_policy)
def test_proactor_win_policy(self):
async def main():
@@ -347,16 +347,16 @@ class WinPolicyTests(WindowsEventsTestCase):
asyncio.get_running_loop(),
asyncio.ProactorEventLoop)
- old_policy = asyncio._get_event_loop_policy()
+ old_policy = asyncio.events._get_event_loop_policy()
try:
with self.assertWarnsRegex(
DeprecationWarning,
"'asyncio.WindowsProactorEventLoopPolicy' is deprecated",
):
- asyncio._set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
+ asyncio.events._set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(main())
finally:
- asyncio._set_event_loop_policy(old_policy)
+ asyncio.events._set_event_loop_policy(old_policy)
if __name__ == '__main__':
diff --git a/Lib/test/test_asyncio/test_windows_utils.py b/Lib/test/test_asyncio/test_windows_utils.py
index a6b207567c4..97f078ff911 100644
--- a/Lib/test/test_asyncio/test_windows_utils.py
+++ b/Lib/test/test_asyncio/test_windows_utils.py
@@ -16,7 +16,7 @@ from test import support
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class PipeTests(unittest.TestCase):
diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py
index 0a96573a81c..a480e16e81b 100644
--- a/Lib/test/test_asyncio/utils.py
+++ b/Lib/test/test_asyncio/utils.py
@@ -601,3 +601,9 @@ async def await_without_task(coro):
await asyncio.sleep(0)
if exc is not None:
raise exc
+
+
+if sys.platform == 'win32':
+ DefaultEventLoopPolicy = asyncio.windows_events._DefaultEventLoopPolicy
+else:
+ DefaultEventLoopPolicy = asyncio.unix_events._DefaultEventLoopPolicy
diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py
index 14fe3355239..8830641f0ab 100644
--- a/Lib/test/test_builtin.py
+++ b/Lib/test/test_builtin.py
@@ -436,7 +436,7 @@ class BuiltinTest(ComplexesAreIdenticalMixin, unittest.TestCase):
# test both direct compilation and compilation via AST
codeobjs = []
codeobjs.append(compile(codestr, "<test>", "exec", optimize=optval))
- tree = ast.parse(codestr)
+ tree = ast.parse(codestr, optimize=optval)
codeobjs.append(compile(tree, "<test>", "exec", optimize=optval))
for code in codeobjs:
ns = {}
@@ -624,7 +624,7 @@ class BuiltinTest(ComplexesAreIdenticalMixin, unittest.TestCase):
for opt in [opt1, opt2]:
opt_right = opt.value.right
self.assertIsInstance(opt_right, ast.Constant)
- self.assertEqual(opt_right.value, True)
+ self.assertEqual(opt_right.value, __debug__)
def test_delattr(self):
sys.spam = 1
diff --git a/Lib/test/test_cext/__init__.py b/Lib/test/test_cext/__init__.py
index 46fde541494..93e7b2043d3 100644
--- a/Lib/test/test_cext/__init__.py
+++ b/Lib/test/test_cext/__init__.py
@@ -28,29 +28,13 @@ SETUP = os.path.join(os.path.dirname(__file__), 'setup.py')
@support.requires_venv_with_pip()
@support.requires_subprocess()
@support.requires_resource('cpu')
-class TestExt(unittest.TestCase):
+class BaseTests:
+ TEST_INTERNAL_C_API = False
+
# Default build with no options
def test_build(self):
self.check_build('_test_cext')
- def test_build_c11(self):
- self.check_build('_test_c11_cext', std='c11')
-
- @unittest.skipIf(support.MS_WINDOWS, "MSVC doesn't support /std:c99")
- def test_build_c99(self):
- # In public docs, we say C API is compatible with C11. However,
- # in practice we do maintain C99 compatibility in public headers.
- # Please ask the C API WG before adding a new C11-only feature.
- self.check_build('_test_c99_cext', std='c99')
-
- @support.requires_gil_enabled('incompatible with Free Threading')
- def test_build_limited(self):
- self.check_build('_test_limited_cext', limited=True)
-
- @support.requires_gil_enabled('broken for now with Free Threading')
- def test_build_limited_c11(self):
- self.check_build('_test_limited_c11_cext', limited=True, std='c11')
-
def check_build(self, extension_name, std=None, limited=False):
venv_dir = 'env'
with support.setup_venv_with_pip_setuptools(venv_dir) as python_exe:
@@ -70,6 +54,7 @@ class TestExt(unittest.TestCase):
if limited:
env['CPYTHON_TEST_LIMITED'] = '1'
env['CPYTHON_TEST_EXT_NAME'] = extension_name
+ env['TEST_INTERNAL_C_API'] = str(int(self.TEST_INTERNAL_C_API))
if support.verbose:
print('Run:', ' '.join(map(shlex.quote, cmd)))
subprocess.run(cmd, check=True, env=env)
@@ -110,5 +95,29 @@ class TestExt(unittest.TestCase):
run_cmd('Import', cmd)
+class TestPublicCAPI(BaseTests, unittest.TestCase):
+ @support.requires_gil_enabled('incompatible with Free Threading')
+ def test_build_limited(self):
+ self.check_build('_test_limited_cext', limited=True)
+
+ @support.requires_gil_enabled('broken for now with Free Threading')
+ def test_build_limited_c11(self):
+ self.check_build('_test_limited_c11_cext', limited=True, std='c11')
+
+ def test_build_c11(self):
+ self.check_build('_test_c11_cext', std='c11')
+
+ @unittest.skipIf(support.MS_WINDOWS, "MSVC doesn't support /std:c99")
+ def test_build_c99(self):
+ # In public docs, we say C API is compatible with C11. However,
+ # in practice we do maintain C99 compatibility in public headers.
+ # Please ask the C API WG before adding a new C11-only feature.
+ self.check_build('_test_c99_cext', std='c99')
+
+
+class TestInteralCAPI(BaseTests, unittest.TestCase):
+ TEST_INTERNAL_C_API = True
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_cext/extension.c b/Lib/test/test_cext/extension.c
index 64629c5a6da..4be2f24c60d 100644
--- a/Lib/test/test_cext/extension.c
+++ b/Lib/test/test_cext/extension.c
@@ -1,11 +1,31 @@
// gh-116869: Basic C test extension to check that the Python C API
// does not emit C compiler warnings.
+//
+// Test also the internal C API if the TEST_INTERNAL_C_API macro is defined.
// Always enable assertions
#undef NDEBUG
+#ifdef TEST_INTERNAL_C_API
+# define Py_BUILD_CORE_MODULE 1
+#endif
+
#include "Python.h"
+#ifdef TEST_INTERNAL_C_API
+ // gh-135906: Check for compiler warnings in the internal C API.
+ // - Cython uses pycore_frame.h.
+ // - greenlet uses pycore_frame.h, pycore_interpframe_structs.h and
+ // pycore_interpframe.h.
+# include "internal/pycore_frame.h"
+# include "internal/pycore_gc.h"
+# include "internal/pycore_interp.h"
+# include "internal/pycore_interpframe.h"
+# include "internal/pycore_interpframe_structs.h"
+# include "internal/pycore_object.h"
+# include "internal/pycore_pystate.h"
+#endif
+
#ifndef MODULE_NAME
# error "MODULE_NAME macro must be defined"
#endif
diff --git a/Lib/test/test_cext/setup.py b/Lib/test/test_cext/setup.py
index 1275282983f..587585e8086 100644
--- a/Lib/test/test_cext/setup.py
+++ b/Lib/test/test_cext/setup.py
@@ -14,10 +14,15 @@ SOURCE = 'extension.c'
if not support.MS_WINDOWS:
# C compiler flags for GCC and clang
- CFLAGS = [
+ BASE_CFLAGS = [
# The purpose of test_cext extension is to check that building a C
# extension using the Python C API does not emit C compiler warnings.
'-Werror',
+ ]
+
+ # C compiler flags for GCC and clang
+ PUBLIC_CFLAGS = [
+ *BASE_CFLAGS,
# gh-120593: Check the 'const' qualifier
'-Wcast-qual',
@@ -26,27 +31,40 @@ if not support.MS_WINDOWS:
'-pedantic-errors',
]
if not support.Py_GIL_DISABLED:
- CFLAGS.append(
+ PUBLIC_CFLAGS.append(
# gh-116869: The Python C API must be compatible with building
# with the -Werror=declaration-after-statement compiler flag.
'-Werror=declaration-after-statement',
)
+ INTERNAL_CFLAGS = [*BASE_CFLAGS]
else:
# MSVC compiler flags
- CFLAGS = [
- # Display warnings level 1 to 4
- '/W4',
+ BASE_CFLAGS = [
# Treat all compiler warnings as compiler errors
'/WX',
]
+ PUBLIC_CFLAGS = [
+ *BASE_CFLAGS,
+ # Display warnings level 1 to 4
+ '/W4',
+ ]
+ INTERNAL_CFLAGS = [
+ *BASE_CFLAGS,
+ # Display warnings level 1 to 3
+ '/W3',
+ ]
def main():
std = os.environ.get("CPYTHON_TEST_STD", "")
module_name = os.environ["CPYTHON_TEST_EXT_NAME"]
limited = bool(os.environ.get("CPYTHON_TEST_LIMITED", ""))
+ internal = bool(int(os.environ.get("TEST_INTERNAL_C_API", "0")))
- cflags = list(CFLAGS)
+ if not internal:
+ cflags = list(PUBLIC_CFLAGS)
+ else:
+ cflags = list(INTERNAL_CFLAGS)
cflags.append(f'-DMODULE_NAME={module_name}')
# Add -std=STD or /std:STD (MSVC) compiler flag
@@ -75,6 +93,9 @@ def main():
version = sys.hexversion
cflags.append(f'-DPy_LIMITED_API={version:#x}')
+ if internal:
+ cflags.append('-DTEST_INTERNAL_C_API=1')
+
# On Windows, add PCbuild\amd64\ to include and library directories
include_dirs = []
library_dirs = []
diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py
index 844dfdd6fc9..d5c032d01cd 100644
--- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py
+++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py
@@ -2,7 +2,9 @@ import asyncio
import contextlib
import io
import os
+import subprocess
import sys
+import textwrap
import time
import unittest
from concurrent.futures.interpreter import BrokenInterpreterPool
@@ -457,6 +459,45 @@ class InterpreterPoolExecutorTest(
# Weak references don't cross between interpreters.
raise unittest.SkipTest('not applicable')
+ @support.requires_subprocess()
+ def test_import_interpreter_pool_executor(self):
+ # Test the import behavior normally if _interpreters is unavailable.
+ code = textwrap.dedent("""
+ import sys
+ # Set it to None to emulate the case when _interpreter is unavailable.
+ sys.modules['_interpreters'] = None
+ from concurrent import futures
+
+ try:
+ futures.InterpreterPoolExecutor
+ except AttributeError:
+ pass
+ else:
+ print('AttributeError not raised!', file=sys.stderr)
+ sys.exit(1)
+
+ try:
+ from concurrent.futures import InterpreterPoolExecutor
+ except ImportError:
+ pass
+ else:
+ print('ImportError not raised!', file=sys.stderr)
+ sys.exit(1)
+
+ from concurrent.futures import *
+
+ if 'InterpreterPoolExecutor' in globals():
+ print('InterpreterPoolExecutor should not be imported!',
+ file=sys.stderr)
+ sys.exit(1)
+ """)
+
+ cmd = [sys.executable, '-c', code]
+ p = subprocess.run(cmd, capture_output=True)
+ self.assertEqual(p.returncode, 0, p.stderr.decode())
+ self.assertEqual(p.stdout.decode(), '')
+ self.assertEqual(p.stderr.decode(), '')
+
class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):
@@ -471,7 +512,7 @@ class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):
# tests left a policy in place, just in case.
policy = support.maybe_get_event_loop_policy()
assert policy is None, policy
- cls.addClassCleanup(lambda: asyncio._set_event_loop_policy(None))
+ cls.addClassCleanup(lambda: asyncio.events._set_event_loop_policy(None))
def setUp(self):
super().setUp()
diff --git a/Lib/test/test_coroutines.py b/Lib/test/test_coroutines.py
index 4755046fe19..42b13064bef 100644
--- a/Lib/test/test_coroutines.py
+++ b/Lib/test/test_coroutines.py
@@ -2307,7 +2307,7 @@ class CoroAsyncIOCompatTest(unittest.TestCase):
pass
finally:
loop.close()
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
self.assertEqual(buffer, [1, 2, 'MyException'])
diff --git a/Lib/test/test_cppext/__init__.py b/Lib/test/test_cppext/__init__.py
index 2b7adac4bcc..2f54b3ccb35 100644
--- a/Lib/test/test_cppext/__init__.py
+++ b/Lib/test/test_cppext/__init__.py
@@ -24,7 +24,7 @@ SETUP = os.path.join(os.path.dirname(__file__), 'setup.py')
@support.requires_venv_with_pip()
@support.requires_subprocess()
@support.requires_resource('cpu')
-class TestCPPExt(unittest.TestCase):
+class BaseTests:
def test_build(self):
self.check_build('_testcppext')
@@ -34,10 +34,6 @@ class TestCPPExt(unittest.TestCase):
# Please ask the C API WG before adding a new C++11-only feature.
self.check_build('_testcpp03ext', std='c++03')
- @support.requires_gil_enabled('incompatible with Free Threading')
- def test_build_limited_cpp03(self):
- self.check_build('_test_limited_cpp03ext', std='c++03', limited=True)
-
@unittest.skipIf(support.MS_WINDOWS, "MSVC doesn't support /std:c++11")
def test_build_cpp11(self):
self.check_build('_testcpp11ext', std='c++11')
@@ -48,10 +44,6 @@ class TestCPPExt(unittest.TestCase):
def test_build_cpp14(self):
self.check_build('_testcpp14ext', std='c++14')
- @support.requires_gil_enabled('incompatible with Free Threading')
- def test_build_limited(self):
- self.check_build('_testcppext_limited', limited=True)
-
def check_build(self, extension_name, std=None, limited=False):
venv_dir = 'env'
with support.setup_venv_with_pip_setuptools(venv_dir) as python_exe:
@@ -111,5 +103,19 @@ class TestCPPExt(unittest.TestCase):
run_cmd('Import', cmd)
+class TestPublicCAPI(BaseTests, unittest.TestCase):
+ @support.requires_gil_enabled('incompatible with Free Threading')
+ def test_build_limited_cpp03(self):
+ self.check_build('_test_limited_cpp03ext', std='c++03', limited=True)
+
+ @support.requires_gil_enabled('incompatible with Free Threading')
+ def test_build_limited(self):
+ self.check_build('_testcppext_limited', limited=True)
+
+
+class TestInteralCAPI(BaseTests, unittest.TestCase):
+ TEST_INTERNAL_C_API = True
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_cppext/extension.cpp b/Lib/test/test_cppext/extension.cpp
index 5b3571b295b..1affa176088 100644
--- a/Lib/test/test_cppext/extension.cpp
+++ b/Lib/test/test_cppext/extension.cpp
@@ -6,8 +6,17 @@
// Always enable assertions
#undef NDEBUG
+#ifdef TEST_INTERNAL_C_API
+# define Py_BUILD_CORE 1
+#endif
+
#include "Python.h"
+#ifdef TEST_INTERNAL_C_API
+ // gh-135906: Check for compiler warnings in the internal C API
+# include "internal/pycore_frame.h"
+#endif
+
#ifndef MODULE_NAME
# error "MODULE_NAME macro must be defined"
#endif
diff --git a/Lib/test/test_cppext/setup.py b/Lib/test/test_cppext/setup.py
index ea1ed64bf7a..98442b106b6 100644
--- a/Lib/test/test_cppext/setup.py
+++ b/Lib/test/test_cppext/setup.py
@@ -47,6 +47,7 @@ def main():
std = os.environ.get("CPYTHON_TEST_CPP_STD", "")
module_name = os.environ["CPYTHON_TEST_EXT_NAME"]
limited = bool(os.environ.get("CPYTHON_TEST_LIMITED", ""))
+ internal = bool(int(os.environ.get("TEST_INTERNAL_C_API", "0")))
cppflags = list(CPPFLAGS)
cppflags.append(f'-DMODULE_NAME={module_name}')
@@ -82,6 +83,9 @@ def main():
version = sys.hexversion
cppflags.append(f'-DPy_LIMITED_API={version:#x}')
+ if internal:
+ cppflags.append('-DTEST_INTERNAL_C_API=1')
+
# On Windows, add PCbuild\amd64\ to include and library directories
include_dirs = []
library_dirs = []
diff --git a/Lib/test/test_dbm.py b/Lib/test/test_dbm.py
index 7e8d78b8940..ae9faabd536 100644
--- a/Lib/test/test_dbm.py
+++ b/Lib/test/test_dbm.py
@@ -274,7 +274,8 @@ class WhichDBTestCase(unittest.TestCase):
@unittest.skipUnless(ndbm, reason='Test requires ndbm')
def test_whichdb_ndbm(self):
# Issue 17198: check that ndbm which is referenced in whichdb is defined
- with open(_fname + '.db', 'wb'): pass
+ with open(_fname + '.db', 'wb') as f:
+ f.write(b'spam')
_bytes_fname = os.fsencode(_fname)
fnames = [_fname, os_helper.FakePath(_fname),
_bytes_fname, os_helper.FakePath(_bytes_fname)]
diff --git a/Lib/test/test_decimal.py b/Lib/test/test_decimal.py
index ef64b878805..08a8f4c3b36 100644
--- a/Lib/test/test_decimal.py
+++ b/Lib/test/test_decimal.py
@@ -1089,6 +1089,15 @@ class FormatTest:
('07_', '1234.56', '1_234.56'),
('_', '1.23456789', '1.23456789'),
('_%', '123.456789', '12_345.6789%'),
+ # and now for something completely different...
+ ('.,', '1.23456789', '1.234,567,89'),
+ ('._', '1.23456789', '1.234_567_89'),
+ ('.6_f', '12345.23456789', '12345.234_568'),
+ (',._%', '123.456789', '12,345.678_9%'),
+ (',._e', '123456', '1.234_56e+5'),
+ (',.4_e', '123456', '1.234_6e+5'),
+ (',.3_e', '123456', '1.235e+5'),
+ (',._E', '123456', '1.234_56E+5'),
# negative zero: default behavior
('.1f', '-0', '-0.0'),
@@ -1162,6 +1171,10 @@ class FormatTest:
# bytes format argument
self.assertRaises(TypeError, Decimal(1).__format__, b'-020')
+ # precision or fractional part separator should follow after dot
+ self.assertRaises(ValueError, format, Decimal(1), '.f')
+ self.assertRaises(ValueError, format, Decimal(1), '._6f')
+
def test_negative_zero_format_directed_rounding(self):
with self.decimal.localcontext() as ctx:
ctx.rounding = ROUND_CEILING
diff --git a/Lib/test/test_dictcomps.py b/Lib/test/test_dictcomps.py
index 26b56dac503..a7a46216787 100644
--- a/Lib/test/test_dictcomps.py
+++ b/Lib/test/test_dictcomps.py
@@ -132,7 +132,7 @@ class DictComprehensionTest(unittest.TestCase):
def test_exception_locations(self):
# The location of an exception raised from __init__ or
- # __next__ should should be the iterator expression
+ # __next__ should be the iterator expression
def init_raises():
try:
{x:x for x in BrokenIter(init_raises=True)}
diff --git a/Lib/test/test_enum.py b/Lib/test/test_enum.py
index bbc7630fa83..2dd585f246d 100644
--- a/Lib/test/test_enum.py
+++ b/Lib/test/test_enum.py
@@ -1002,12 +1002,18 @@ class _FlagTests:
self.assertIs(~(A|B), OpenAB(252))
self.assertIs(~AB_MASK, OpenAB(0))
self.assertIs(~OpenAB(0), AB_MASK)
+ self.assertIs(OpenAB(~4), OpenAB(251))
else:
self.assertIs(~A, B)
self.assertIs(~B, A)
+ self.assertIs(OpenAB(~1), B)
+ self.assertIs(OpenAB(~2), A)
self.assertIs(~(A|B), OpenAB(0))
self.assertIs(~AB_MASK, OpenAB(0))
self.assertIs(~OpenAB(0), (A|B))
+ self.assertIs(OpenAB(~3), OpenAB(0))
+ self.assertIs(OpenAB(~4), OpenAB(3))
+ self.assertIs(OpenAB(~33), B)
#
class OpenXYZ(self.enum_type):
X = 4
@@ -1031,6 +1037,9 @@ class _FlagTests:
self.assertIs(~X, Y|Z)
self.assertIs(~Y, X|Z)
self.assertIs(~Z, X|Y)
+ self.assertIs(OpenXYZ(~4), Y|Z)
+ self.assertIs(OpenXYZ(~2), X|Z)
+ self.assertIs(OpenXYZ(~1), X|Y)
self.assertIs(~(X|Y), Z)
self.assertIs(~(X|Z), Y)
self.assertIs(~(Y|Z), X)
@@ -1038,6 +1047,28 @@ class _FlagTests:
self.assertIs(~XYZ_MASK, OpenXYZ(0))
self.assertTrue(~OpenXYZ(0), (X|Y|Z))
+ def test_assigned_negative_value(self):
+ class X(self.enum_type):
+ A = auto()
+ B = auto()
+ C = A | B
+ D = ~A
+ self.assertEqual(list(X), [X.A, X.B])
+ self.assertIs(~X.A, X.B)
+ self.assertIs(X.D, X.B)
+ self.assertEqual(X.D.value, 2)
+ #
+ class Y(self.enum_type):
+ A = auto()
+ B = auto()
+ C = A | B
+ D = ~A
+ E = auto()
+ self.assertEqual(list(Y), [Y.A, Y.B, Y.E])
+ self.assertIs(~Y.A, Y.B|Y.E)
+ self.assertIs(Y.D, Y.B|Y.E)
+ self.assertEqual(Y.D.value, 6)
+
class TestPlainEnumClass(_EnumTests, _PlainOutputTests, unittest.TestCase):
enum_type = Enum
@@ -3680,6 +3711,8 @@ class OldTestFlag(unittest.TestCase):
C = 4 | B
#
self.assertTrue(SkipFlag.C in (SkipFlag.A|SkipFlag.C))
+ self.assertTrue(SkipFlag.B in SkipFlag.C)
+ self.assertIs(SkipFlag(~1), SkipFlag.B)
self.assertRaisesRegex(ValueError, 'SkipFlag.. invalid value 42', SkipFlag, 42)
#
class SkipIntFlag(enum.IntFlag):
@@ -3688,6 +3721,8 @@ class OldTestFlag(unittest.TestCase):
C = 4 | B
#
self.assertTrue(SkipIntFlag.C in (SkipIntFlag.A|SkipIntFlag.C))
+ self.assertTrue(SkipIntFlag.B in SkipIntFlag.C)
+ self.assertIs(SkipIntFlag(~1), SkipIntFlag.B|SkipIntFlag.C)
self.assertEqual(SkipIntFlag(42).value, 42)
#
class MethodHint(Flag):
@@ -4727,6 +4762,8 @@ class TestVerify(unittest.TestCase):
BLUE = 4
WHITE = -1
# no error means success
+ self.assertEqual(list(Color.WHITE), [Color.RED, Color.GREEN, Color.BLUE])
+ self.assertEqual(Color.WHITE.value, 7)
class TestInternals(unittest.TestCase):
diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py
index 0f31c225e68..354a82a800f 100644
--- a/Lib/test/test_external_inspection.py
+++ b/Lib/test/test_external_inspection.py
@@ -5,9 +5,15 @@ import importlib
import sys
import socket
import threading
+import time
from asyncio import staggered, taskgroups, base_events, tasks
from unittest.mock import ANY
-from test.support import os_helper, SHORT_TIMEOUT, busy_retry, requires_gil_enabled
+from test.support import (
+ os_helper,
+ SHORT_TIMEOUT,
+ busy_retry,
+ requires_gil_enabled,
+)
from test.support.script_helper import make_script
from test.support.socket_helper import find_unused_port
@@ -235,55 +241,162 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
+ # First check all the tasks are present
+ tasks_names = [
+ task.task_name for task in stack_trace[0].awaited_by
+ ]
+ for task_name in ["c2_root", "sub_main_1", "sub_main_2"]:
+ self.assertIn(task_name, tasks_names)
+
+ # Now ensure that the awaited_by_relationships are correct
+ id_to_task = {
+ task.task_id: task for task in stack_trace[0].awaited_by
+ }
+ task_name_to_awaited_by = {
+ task.task_name: set(
+ id_to_task[awaited.task_name].task_name
+ for awaited in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ task_name_to_awaited_by,
+ {
+ "c2_root": {"Task-1", "sub_main_1", "sub_main_2"},
+ "Task-1": set(),
+ "sub_main_1": {"Task-1"},
+ "sub_main_2": {"Task-1"},
+ },
+ )
- expected_stack_trace = [
- [
- FrameInfo([script_name, 10, "c5"]),
- FrameInfo([script_name, 14, "c4"]),
- FrameInfo([script_name, 17, "c3"]),
- FrameInfo([script_name, 20, "c2"]),
- ],
- "c2_root",
- [
- CoroInfo(
- [
- [
- FrameInfo(
+ # Now ensure that the coroutine stacks are correct
+ coroutine_stacks = {
+ task.task_name: sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ coroutine_stacks,
+ {
+ "Task-1": [
+ (
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup._aexit",
+ ]
+ ),
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup.__aexit__",
+ ]
+ ),
+ tuple([script_name, 26, "main"]),
+ )
+ ],
+ "c2_root": [
+ (
+ tuple([script_name, 10, "c5"]),
+ tuple([script_name, 14, "c4"]),
+ tuple([script_name, 17, "c3"]),
+ tuple([script_name, 20, "c2"]),
+ )
+ ],
+ "sub_main_1": [(tuple([script_name, 23, "c1"]),)],
+ "sub_main_2": [(tuple([script_name, 23, "c1"]),)],
+ },
+ )
+
+ # Now ensure the coroutine stacks for the awaited_by relationships are correct.
+ awaited_by_coroutine_stacks = {
+ task.task_name: sorted(
+ (
+ id_to_task[coro.task_name].task_name,
+ tuple(tuple(frame) for frame in coro.call_stack),
+ )
+ for coro in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ awaited_by_coroutine_stacks,
+ {
+ "Task-1": [],
+ "c2_root": [
+ (
+ "Task-1",
+ (
+ tuple(
[
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
]
),
- FrameInfo(
+ tuple(
[
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
]
),
- FrameInfo([script_name, 26, "main"]),
- ],
+ tuple([script_name, 26, "main"]),
+ ),
+ ),
+ ("sub_main_1", (tuple([script_name, 23, "c1"]),)),
+ ("sub_main_2", (tuple([script_name, 23, "c1"]),)),
+ ],
+ "sub_main_1": [
+ (
"Task-1",
- ]
- ),
- CoroInfo(
- [
- [FrameInfo([script_name, 23, "c1"])],
- "sub_main_1",
- ]
- ),
- CoroInfo(
- [
- [FrameInfo([script_name, 23, "c1"])],
- "sub_main_2",
- ]
- ),
- ],
- ]
- self.assertEqual(stack_trace, expected_stack_trace)
+ (
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup._aexit",
+ ]
+ ),
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup.__aexit__",
+ ]
+ ),
+ tuple([script_name, 26, "main"]),
+ ),
+ )
+ ],
+ "sub_main_2": [
+ (
+ "Task-1",
+ (
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup._aexit",
+ ]
+ ),
+ tuple(
+ [
+ taskgroups.__file__,
+ ANY,
+ "TaskGroup.__aexit__",
+ ]
+ ),
+ tuple([script_name, 26, "main"]),
+ ),
+ )
+ ],
+ },
+ )
@skip_if_not_supported
@unittest.skipIf(
@@ -349,19 +462,29 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
-
- expected_stack_trace = [
+ # For this simple asyncgen test, we only expect one task with the full coroutine stack
+ self.assertEqual(len(stack_trace[0].awaited_by), 1)
+ task = stack_trace[0].awaited_by[0]
+ self.assertEqual(task.task_name, "Task-1")
+
+ # Check the coroutine stack - based on actual output, only shows main
+ coroutine_stack = sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ self.assertEqual(
+ coroutine_stack,
[
- FrameInfo([script_name, 10, "gen_nested_call"]),
- FrameInfo([script_name, 16, "gen"]),
- FrameInfo([script_name, 19, "main"]),
+ (
+ tuple([script_name, 10, "gen_nested_call"]),
+ tuple([script_name, 16, "gen"]),
+ tuple([script_name, 19, "main"]),
+ )
],
- "Task-1",
- [],
- ]
- self.assertEqual(stack_trace, expected_stack_trace)
+ )
+
+ # No awaited_by relationships expected for this simple case
+ self.assertEqual(task.awaited_by, [])
@skip_if_not_supported
@unittest.skipIf(
@@ -428,18 +551,73 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
-
- expected_stack_trace = [
- [
- FrameInfo([script_name, 11, "deep"]),
- FrameInfo([script_name, 15, "c1"]),
- ],
- "Task-2",
- [CoroInfo([[FrameInfo([script_name, 21, "main"])], "Task-1"])],
+ # First check all the tasks are present
+ tasks_names = [
+ task.task_name for task in stack_trace[0].awaited_by
]
- self.assertEqual(stack_trace, expected_stack_trace)
+ for task_name in ["Task-1", "Task-2"]:
+ self.assertIn(task_name, tasks_names)
+
+ # Now ensure that the awaited_by_relationships are correct
+ id_to_task = {
+ task.task_id: task for task in stack_trace[0].awaited_by
+ }
+ task_name_to_awaited_by = {
+ task.task_name: set(
+ id_to_task[awaited.task_name].task_name
+ for awaited in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ task_name_to_awaited_by,
+ {
+ "Task-1": set(),
+ "Task-2": {"Task-1"},
+ },
+ )
+
+ # Now ensure that the coroutine stacks are correct
+ coroutine_stacks = {
+ task.task_name: sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ coroutine_stacks,
+ {
+ "Task-1": [(tuple([script_name, 21, "main"]),)],
+ "Task-2": [
+ (
+ tuple([script_name, 11, "deep"]),
+ tuple([script_name, 15, "c1"]),
+ )
+ ],
+ },
+ )
+
+ # Now ensure the coroutine stacks for the awaited_by relationships are correct.
+ awaited_by_coroutine_stacks = {
+ task.task_name: sorted(
+ (
+ id_to_task[coro.task_name].task_name,
+ tuple(tuple(frame) for frame in coro.call_stack),
+ )
+ for coro in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ awaited_by_coroutine_stacks,
+ {
+ "Task-1": [],
+ "Task-2": [
+ ("Task-1", (tuple([script_name, 21, "main"]),))
+ ],
+ },
+ )
@skip_if_not_supported
@unittest.skipIf(
@@ -509,36 +687,93 @@ class TestGetStackTrace(unittest.TestCase):
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
- # sets are unordered, so we want to sort "awaited_by"s
- stack_trace[2].sort(key=lambda x: x[1])
- expected_stack_trace = [
- [
- FrameInfo([script_name, 11, "deep"]),
- FrameInfo([script_name, 15, "c1"]),
- FrameInfo(
- [
- staggered.__file__,
- ANY,
- "staggered_race.<locals>.run_one_coro",
- ]
- ),
- ],
- "Task-2",
- [
- CoroInfo(
- [
- [
- FrameInfo(
+ # First check all the tasks are present
+ tasks_names = [
+ task.task_name for task in stack_trace[0].awaited_by
+ ]
+ for task_name in ["Task-1", "Task-2"]:
+ self.assertIn(task_name, tasks_names)
+
+ # Now ensure that the awaited_by_relationships are correct
+ id_to_task = {
+ task.task_id: task for task in stack_trace[0].awaited_by
+ }
+ task_name_to_awaited_by = {
+ task.task_name: set(
+ id_to_task[awaited.task_name].task_name
+ for awaited in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ task_name_to_awaited_by,
+ {
+ "Task-1": set(),
+ "Task-2": {"Task-1"},
+ },
+ )
+
+ # Now ensure that the coroutine stacks are correct
+ coroutine_stacks = {
+ task.task_name: sorted(
+ tuple(tuple(frame) for frame in coro.call_stack)
+ for coro in task.coroutine_stack
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ coroutine_stacks,
+ {
+ "Task-1": [
+ (
+ tuple([staggered.__file__, ANY, "staggered_race"]),
+ tuple([script_name, 21, "main"]),
+ )
+ ],
+ "Task-2": [
+ (
+ tuple([script_name, 11, "deep"]),
+ tuple([script_name, 15, "c1"]),
+ tuple(
+ [
+ staggered.__file__,
+ ANY,
+ "staggered_race.<locals>.run_one_coro",
+ ]
+ ),
+ )
+ ],
+ },
+ )
+
+ # Now ensure the coroutine stacks for the awaited_by relationships are correct.
+ awaited_by_coroutine_stacks = {
+ task.task_name: sorted(
+ (
+ id_to_task[coro.task_name].task_name,
+ tuple(tuple(frame) for frame in coro.call_stack),
+ )
+ for coro in task.awaited_by
+ )
+ for task in stack_trace[0].awaited_by
+ }
+ self.assertEqual(
+ awaited_by_coroutine_stacks,
+ {
+ "Task-1": [],
+ "Task-2": [
+ (
+ "Task-1",
+ (
+ tuple(
[staggered.__file__, ANY, "staggered_race"]
),
- FrameInfo([script_name, 21, "main"]),
- ],
- "Task-1",
- ]
- )
- ],
- ]
- self.assertEqual(stack_trace, expected_stack_trace)
+ tuple([script_name, 21, "main"]),
+ ),
+ )
+ ],
+ },
+ )
@skip_if_not_supported
@unittest.skipIf(
@@ -930,9 +1165,6 @@ class TestGetStackTrace(unittest.TestCase):
# Signal threads to start waiting
ready_event.set()
- # Give threads time to start sleeping
- time.sleep(0.1)
-
# Now do busy work to hold the GIL
main_work()
"""
@@ -967,7 +1199,28 @@ class TestGetStackTrace(unittest.TestCase):
# Get stack trace with all threads
unwinder_all = RemoteUnwinder(p.pid, all_threads=True)
- all_traces = unwinder_all.get_stack_trace()
+ for _ in range(10):
+ # Wait for the main thread to start its busy work
+ all_traces = unwinder_all.get_stack_trace()
+ found = False
+ for thread_id, stack in all_traces:
+ if not stack:
+ continue
+ current_frame = stack[0]
+ if (
+ current_frame.funcname == "main_work"
+ and current_frame.lineno > 15
+ ):
+ found = True
+
+ if found:
+ break
+ # Give a bit of time to take the next sample
+ time.sleep(0.1)
+ else:
+ self.fail(
+ "Main thread did not start its busy work on time"
+ )
# Get stack trace with only GIL holder
unwinder_gil = RemoteUnwinder(p.pid, only_active_thread=True)
@@ -985,16 +1238,39 @@ class TestGetStackTrace(unittest.TestCase):
p.wait(timeout=SHORT_TIMEOUT)
# Verify we got multiple threads in all_traces
- self.assertGreater(len(all_traces), 1, "Should have multiple threads")
+ self.assertGreater(
+ len(all_traces), 1, "Should have multiple threads"
+ )
# Verify we got exactly one thread in gil_traces
- self.assertEqual(len(gil_traces), 1, "Should have exactly one GIL holder")
+ self.assertEqual(
+ len(gil_traces), 1, "Should have exactly one GIL holder"
+ )
# The GIL holder should be in the all_traces list
gil_thread_id = gil_traces[0][0]
all_thread_ids = [trace[0] for trace in all_traces]
- self.assertIn(gil_thread_id, all_thread_ids,
- "GIL holder should be among all threads")
+ self.assertIn(
+ gil_thread_id,
+ all_thread_ids,
+ "GIL holder should be among all threads",
+ )
+
+
+class TestUnsupportedPlatformHandling(unittest.TestCase):
+ @unittest.skipIf(
+ sys.platform in ("linux", "darwin", "win32"),
+ "Test only runs on unsupported platforms (not Linux, macOS, or Windows)",
+ )
+ @unittest.skipIf(sys.platform == "android", "Android raises Linux-specific exception")
+ def test_unsupported_platform_error(self):
+ with self.assertRaises(RuntimeError) as cm:
+ RemoteUnwinder(os.getpid())
+
+ self.assertIn(
+ "Reading the PyRuntime section is not supported on this platform",
+ str(cm.exception)
+ )
if __name__ == "__main__":
diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py
index 1875a2f529c..cf42b86358d 100644
--- a/Lib/test/test_fractions.py
+++ b/Lib/test/test_fractions.py
@@ -1322,6 +1322,8 @@ class FractionTest(unittest.TestCase):
# Thousands separators
(F('1234567.123456'), ',.5e', '1.23457e+06'),
(F('123.123456'), '012_.2e', '0_001.23e+02'),
+ # Thousands separators for fractional part (or for integral too)
+ (F('1234567.123456'), '.5_e', '1.234_57e+06'),
# z flag is legal, but never makes a difference to the output
(F(-1, 7**100), 'z.6e', '-3.091690e-85'),
]
@@ -1447,6 +1449,12 @@ class FractionTest(unittest.TestCase):
(F('1234567'), ',.2f', '1,234,567.00'),
(F('12345678'), ',.2f', '12,345,678.00'),
(F('12345678'), ',f', '12,345,678.000000'),
+ # Thousands separators for fractional part (or for integral too)
+ (F('123456.789123123'), '._f', '123456.789_123'),
+ (F('123456.789123123'), '.7_f', '123456.789_123_1'),
+ (F('123456.789123123'), '.9_f', '123456.789_123_123'),
+ (F('123456.789123123'), '.,f', '123456.789,123'),
+ (F('123456.789123123'), '_.,f', '123_456.789,123'),
# Underscore as thousands separator
(F(2, 3), '_.2f', '0.67'),
(F(2, 3), '_.7f', '0.6666667'),
@@ -1620,6 +1628,11 @@ class FractionTest(unittest.TestCase):
'.f',
'.g',
'.%',
+ # Thousands separators before precision
+ '._6e',
+ '._6f',
+ '._6g',
+ '._6%',
# Z instead of z for negative zero suppression
'Z.2f'
# z flag not supported for general formatting
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
index b4cbfb6d774..96ebb23f73d 100644
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -262,9 +262,11 @@ class GCTests(unittest.TestCase):
# finalizer.
def __del__(self):
- # 5. Create a weakref to `func` now. If we had created
- # it earlier, it would have been cleared by the
- # garbage collector before calling the finalizers.
+ # 5. Create a weakref to `func` now. In previous
+ # versions of Python, this would avoid having it
+ # cleared by the garbage collector before calling
+ # the finalizers. Now, weakrefs get cleared after
+ # calling finalizers.
self[1].ref = weakref.ref(self[0])
# 6. Drop the global reference to `latefin`. The only
@@ -293,14 +295,18 @@ class GCTests(unittest.TestCase):
# which will find `cyc` and `func` as garbage.
gc.collect()
- # 9. Previously, this would crash because `func_qualname`
- # had been NULL-ed out by func_clear().
+ # 9. Previously, this would crash because the weakref
+ # created in the finalizer revealed the function after
+ # `tp_clear` was called and `func_qualname`
+ # had been NULL-ed out by func_clear(). Now, we clear
+ # weakrefs to unreachable objects before calling `tp_clear`
+ # but after calling finalizers.
print(f"{func=}")
"""
- # We're mostly just checking that this doesn't crash.
rc, stdout, stderr = assert_python_ok("-c", code)
self.assertEqual(rc, 0)
- self.assertRegex(stdout, rb"""\A\s*func=<function at \S+>\s*\z""")
+ # The `func` global is None because the weakref was cleared.
+ self.assertRegex(stdout, rb"""\A\s*func=None""")
self.assertFalse(stderr)
@refcount_test
@@ -726,6 +732,9 @@ class GCTests(unittest.TestCase):
self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
b"shutdown; use", stderr)
self.assertNotIn(b"<X 'first'>", stderr)
+ one_line_re = b"gc: uncollectable <X 0x[0-9A-Fa-f]+>"
+ expected_re = one_line_re + b"\r?\n" + one_line_re
+ self.assertNotRegex(stderr, expected_re)
# With DEBUG_UNCOLLECTABLE, the garbage list gets printed
stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
@@ -733,6 +742,8 @@ class GCTests(unittest.TestCase):
self.assertTrue(
(b"[<X 'first'>, <X 'second'>]" in stderr) or
(b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
+ # we expect two lines with uncollectable objects
+ self.assertRegex(stderr, expected_re)
# With DEBUG_SAVEALL, no additional message should get printed
# (because gc.garbage also contains normally reclaimable cyclic
# references, and its elements get printed at runtime anyway).
diff --git a/Lib/test/test_importlib/test_util.py b/Lib/test/test_importlib/test_util.py
index 5de89714eb5..6d6d5f96aab 100644
--- a/Lib/test/test_importlib/test_util.py
+++ b/Lib/test/test_importlib/test_util.py
@@ -635,7 +635,7 @@ class MagicNumberTests(unittest.TestCase):
# stakeholders such as OS package maintainers must be notified
# in advance. Such exceptional releases will then require an
# adjustment to this test case.
- EXPECTED_MAGIC_NUMBER = 3495
+ EXPECTED_MAGIC_NUMBER = 3625
actual = int.from_bytes(importlib.util.MAGIC_NUMBER[:2], 'little')
msg = (
diff --git a/Lib/test/test_inspect/test_inspect.py b/Lib/test/test_inspect/test_inspect.py
index 79eb103224b..0ea029b977b 100644
--- a/Lib/test/test_inspect/test_inspect.py
+++ b/Lib/test/test_inspect/test_inspect.py
@@ -2820,7 +2820,7 @@ class TestGetAsyncGenState(unittest.IsolatedAsyncioTestCase):
@classmethod
def tearDownClass(cls):
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
def _asyncgenstate(self):
return inspect.getasyncgenstate(self.asyncgen)
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index 0ee4582b5d1..a34b20beaca 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -412,9 +412,11 @@ class InterpreterObjectTests(TestBase):
def test_pickle(self):
interp = interpreters.create()
- data = pickle.dumps(interp)
- unpickled = pickle.loads(data)
- self.assertEqual(unpickled, interp)
+ for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=protocol):
+ data = pickle.dumps(interp, protocol)
+ unpickled = pickle.loads(data)
+ self.assertEqual(unpickled, interp)
class TestInterpreterIsRunning(TestBase):
diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py
index 109ddf34453..52827357078 100644
--- a/Lib/test/test_interpreters/test_channels.py
+++ b/Lib/test/test_interpreters/test_channels.py
@@ -121,9 +121,11 @@ class TestRecvChannelAttrs(TestBase):
def test_pickle(self):
ch, _ = channels.create()
- data = pickle.dumps(ch)
- unpickled = pickle.loads(data)
- self.assertEqual(unpickled, ch)
+ for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=protocol):
+ data = pickle.dumps(ch, protocol)
+ unpickled = pickle.loads(data)
+ self.assertEqual(unpickled, ch)
class TestSendChannelAttrs(TestBase):
@@ -152,9 +154,11 @@ class TestSendChannelAttrs(TestBase):
def test_pickle(self):
_, ch = channels.create()
- data = pickle.dumps(ch)
- unpickled = pickle.loads(data)
- self.assertEqual(unpickled, ch)
+ for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=protocol):
+ data = pickle.dumps(ch, protocol)
+ unpickled = pickle.loads(data)
+ self.assertEqual(unpickled, ch)
class TestSendRecv(TestBase):
diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py
index cb17340f581..5451c6654ac 100644
--- a/Lib/test/test_interpreters/test_queues.py
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -188,9 +188,11 @@ class QueueTests(TestBase):
def test_pickle(self):
queue = queues.create()
- data = pickle.dumps(queue)
- unpickled = pickle.loads(data)
- self.assertEqual(unpickled, queue)
+ for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=protocol):
+ data = pickle.dumps(queue, protocol)
+ unpickled = pickle.loads(data)
+ self.assertEqual(unpickled, queue)
class TestQueueOps(TestBase):
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 3819965ed2c..275f7ce47d0 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -5421,7 +5421,7 @@ class LogRecordTest(BaseTest):
logging.logAsyncioTasks = False
runner.run(make_record(self.assertIsNone))
finally:
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
@support.requires_working_socket()
def test_taskName_without_asyncio_imported(self):
@@ -5433,7 +5433,7 @@ class LogRecordTest(BaseTest):
logging.logAsyncioTasks = False
runner.run(make_record(self.assertIsNone))
finally:
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class BasicConfigTest(unittest.TestCase):
@@ -5758,7 +5758,7 @@ class BasicConfigTest(unittest.TestCase):
data = f.read().strip()
self.assertRegex(data, r'Task-\d+ - hello world')
finally:
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
if handler:
handler.close()
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 5217037ae9d..1e50dc43c35 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -104,7 +104,7 @@ requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class MiscTests(unittest.TestCase):
diff --git a/Lib/test/test_sample_profiler.py b/Lib/test/test_sample_profiler.py
new file mode 100644
index 00000000000..2c7fa1cba71
--- /dev/null
+++ b/Lib/test/test_sample_profiler.py
@@ -0,0 +1,1877 @@
+"""Tests for the sampling profiler (profile.sample)."""
+
+import contextlib
+import io
+import marshal
+import os
+import socket
+import subprocess
+import sys
+import tempfile
+import unittest
+from unittest import mock
+
+from profile.pstats_collector import PstatsCollector
+from profile.stack_collector import (
+ CollapsedStackCollector,
+)
+
+from test.support.os_helper import unlink
+from test.support import force_not_colorized_test_class, SHORT_TIMEOUT
+from test.support.socket_helper import find_unused_port
+from test.support import requires_subprocess
+
+PROCESS_VM_READV_SUPPORTED = False
+
+try:
+ from _remote_debugging import PROCESS_VM_READV_SUPPORTED
+ import _remote_debugging
+except ImportError:
+ raise unittest.SkipTest(
+ "Test only runs when _remote_debugging is available"
+ )
+else:
+ import profile.sample
+ from profile.sample import SampleProfiler
+
+
+
+class MockFrameInfo:
+ """Mock FrameInfo for testing since the real one isn't accessible."""
+
+ def __init__(self, filename, lineno, funcname):
+ self.filename = filename
+ self.lineno = lineno
+ self.funcname = funcname
+
+ def __repr__(self):
+ return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"
+
+
+skip_if_not_supported = unittest.skipIf(
+ (
+ sys.platform != "darwin"
+ and sys.platform != "linux"
+ and sys.platform != "win32"
+ ),
+ "Test only runs on Linux, Windows and MacOS",
+)
+
+
+@contextlib.contextmanager
+def test_subprocess(script):
+ # Find an unused port for socket communication
+ port = find_unused_port()
+
+ # Inject socket connection code at the beginning of the script
+ socket_code = f'''
+import socket
+_test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+_test_sock.connect(('localhost', {port}))
+_test_sock.sendall(b"ready")
+'''
+
+ # Combine socket code with user script
+ full_script = socket_code + script
+
+ # Create server socket to wait for process to be ready
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_socket.bind(("localhost", port))
+ server_socket.settimeout(SHORT_TIMEOUT)
+ server_socket.listen(1)
+
+ proc = subprocess.Popen(
+ [sys.executable, "-c", full_script],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ client_socket = None
+ try:
+ # Wait for process to connect and send ready signal
+ client_socket, _ = server_socket.accept()
+ server_socket.close()
+ response = client_socket.recv(1024)
+ if response != b"ready":
+ raise RuntimeError(f"Unexpected response from subprocess: {response}")
+
+ yield proc
+ finally:
+ if client_socket is not None:
+ client_socket.close()
+ if proc.poll() is None:
+ proc.kill()
+ proc.wait()
+
+
+def close_and_unlink(file):
+ file.close()
+ unlink(file.name)
+
+
+class TestSampleProfilerComponents(unittest.TestCase):
+ """Unit tests for individual profiler components."""
+
+ def test_mock_frame_info_with_empty_and_unicode_values(self):
+ """Test MockFrameInfo handles empty strings, unicode characters, and very long names correctly."""
+ # Test with empty strings
+ frame = MockFrameInfo("", 0, "")
+ self.assertEqual(frame.filename, "")
+ self.assertEqual(frame.lineno, 0)
+ self.assertEqual(frame.funcname, "")
+ self.assertIn("filename=''", repr(frame))
+
+ # Test with unicode characters
+ frame = MockFrameInfo("文件.py", 42, "函数名")
+ self.assertEqual(frame.filename, "文件.py")
+ self.assertEqual(frame.funcname, "函数名")
+
+ # Test with very long names
+ long_filename = "x" * 1000 + ".py"
+ long_funcname = "func_" + "x" * 1000
+ frame = MockFrameInfo(long_filename, 999999, long_funcname)
+ self.assertEqual(frame.filename, long_filename)
+ self.assertEqual(frame.lineno, 999999)
+ self.assertEqual(frame.funcname, long_funcname)
+
+ def test_pstats_collector_with_extreme_intervals_and_empty_data(self):
+ """Test PstatsCollector handles zero/large intervals, empty frames, None thread IDs, and duplicate frames."""
+ # Test with zero interval
+ collector = PstatsCollector(sample_interval_usec=0)
+ self.assertEqual(collector.sample_interval_usec, 0)
+
+ # Test with very large interval
+ collector = PstatsCollector(sample_interval_usec=1000000000)
+ self.assertEqual(collector.sample_interval_usec, 1000000000)
+
+ # Test collecting empty frames list
+ collector = PstatsCollector(sample_interval_usec=1000)
+ collector.collect([])
+ self.assertEqual(len(collector.result), 0)
+
+ # Test collecting frames with None thread id
+ test_frames = [(None, [MockFrameInfo("file.py", 10, "func")])]
+ collector.collect(test_frames)
+ # Should still process the frames
+ self.assertEqual(len(collector.result), 1)
+
+ # Test collecting duplicate frames in same sample
+ test_frames = [
+ (
+ 1,
+ [
+ MockFrameInfo("file.py", 10, "func1"),
+ MockFrameInfo("file.py", 10, "func1"), # Duplicate
+ ],
+ )
+ ]
+ collector = PstatsCollector(sample_interval_usec=1000)
+ collector.collect(test_frames)
+ # Should count both occurrences
+ self.assertEqual(
+ collector.result[("file.py", 10, "func1")]["cumulative_calls"], 2
+ )
+
+ def test_pstats_collector_single_frame_stacks(self):
+ """Test PstatsCollector with single-frame call stacks to trigger len(frames) <= 1 branch."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Test with exactly one frame (should trigger the <= 1 condition)
+ single_frame = [(1, [MockFrameInfo("single.py", 10, "single_func")])]
+ collector.collect(single_frame)
+
+ # Should record the single frame with inline call
+ self.assertEqual(len(collector.result), 1)
+ single_key = ("single.py", 10, "single_func")
+ self.assertIn(single_key, collector.result)
+ self.assertEqual(collector.result[single_key]["direct_calls"], 1)
+ self.assertEqual(collector.result[single_key]["cumulative_calls"], 1)
+
+ # Test with empty frames (should also trigger <= 1 condition)
+ empty_frames = [(1, [])]
+ collector.collect(empty_frames)
+
+ # Should not add any new entries
+ self.assertEqual(
+ len(collector.result), 1
+ ) # Still just the single frame
+
+ # Test mixed single and multi-frame stacks
+ mixed_frames = [
+ (
+ 1,
+ [MockFrameInfo("single2.py", 20, "single_func2")],
+ ), # Single frame
+ (
+ 2,
+ [ # Multi-frame stack
+ MockFrameInfo("multi.py", 30, "multi_func1"),
+ MockFrameInfo("multi.py", 40, "multi_func2"),
+ ],
+ ),
+ ]
+ collector.collect(mixed_frames)
+
+ # Should have recorded all functions
+ self.assertEqual(
+ len(collector.result), 4
+ ) # single + single2 + multi1 + multi2
+
+ # Verify single frame handling
+ single2_key = ("single2.py", 20, "single_func2")
+ self.assertIn(single2_key, collector.result)
+ self.assertEqual(collector.result[single2_key]["direct_calls"], 1)
+ self.assertEqual(collector.result[single2_key]["cumulative_calls"], 1)
+
+ # Verify multi-frame handling still works
+ multi1_key = ("multi.py", 30, "multi_func1")
+ multi2_key = ("multi.py", 40, "multi_func2")
+ self.assertIn(multi1_key, collector.result)
+ self.assertIn(multi2_key, collector.result)
+ self.assertEqual(collector.result[multi1_key]["direct_calls"], 1)
+ self.assertEqual(
+ collector.result[multi2_key]["cumulative_calls"], 1
+ ) # Called from multi1
+
+ def test_collapsed_stack_collector_with_empty_and_deep_stacks(self):
+ """Test CollapsedStackCollector handles empty frames, single-frame stacks, and very deep call stacks."""
+ collector = CollapsedStackCollector()
+
+ # Test with empty frames
+ collector.collect([])
+ self.assertEqual(len(collector.call_trees), 0)
+
+ # Test with single frame stack
+ test_frames = [(1, [("file.py", 10, "func")])]
+ collector.collect(test_frames)
+ self.assertEqual(len(collector.call_trees), 1)
+ self.assertEqual(collector.call_trees[0], [("file.py", 10, "func")])
+
+ # Test with very deep stack
+ deep_stack = [(f"file{i}.py", i, f"func{i}") for i in range(100)]
+ test_frames = [(1, deep_stack)]
+ collector = CollapsedStackCollector()
+ collector.collect(test_frames)
+ self.assertEqual(len(collector.call_trees[0]), 100)
+ # Check it's properly reversed
+ self.assertEqual(
+ collector.call_trees[0][0], ("file99.py", 99, "func99")
+ )
+ self.assertEqual(collector.call_trees[0][-1], ("file0.py", 0, "func0"))
+
+ def test_pstats_collector_basic(self):
+ """Test basic PstatsCollector functionality."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Test empty state
+ self.assertEqual(len(collector.result), 0)
+ self.assertEqual(len(collector.stats), 0)
+
+ # Test collecting sample data
+ test_frames = [
+ (
+ 1,
+ [
+ MockFrameInfo("file.py", 10, "func1"),
+ MockFrameInfo("file.py", 20, "func2"),
+ ],
+ )
+ ]
+ collector.collect(test_frames)
+
+ # Should have recorded calls for both functions
+ self.assertEqual(len(collector.result), 2)
+ self.assertIn(("file.py", 10, "func1"), collector.result)
+ self.assertIn(("file.py", 20, "func2"), collector.result)
+
+ # Top-level function should have direct call
+ self.assertEqual(
+ collector.result[("file.py", 10, "func1")]["direct_calls"], 1
+ )
+ self.assertEqual(
+ collector.result[("file.py", 10, "func1")]["cumulative_calls"], 1
+ )
+
+ # Calling function should have cumulative call but no direct calls
+ self.assertEqual(
+ collector.result[("file.py", 20, "func2")]["cumulative_calls"], 1
+ )
+ self.assertEqual(
+ collector.result[("file.py", 20, "func2")]["direct_calls"], 0
+ )
+
+ def test_pstats_collector_create_stats(self):
+ """Test PstatsCollector stats creation."""
+ collector = PstatsCollector(
+ sample_interval_usec=1000000
+ ) # 1 second intervals
+
+ test_frames = [
+ (
+ 1,
+ [
+ MockFrameInfo("file.py", 10, "func1"),
+ MockFrameInfo("file.py", 20, "func2"),
+ ],
+ )
+ ]
+ collector.collect(test_frames)
+ collector.collect(test_frames) # Collect twice
+
+ collector.create_stats()
+
+ # Check stats format: (direct_calls, cumulative_calls, tt, ct, callers)
+ func1_stats = collector.stats[("file.py", 10, "func1")]
+ self.assertEqual(func1_stats[0], 2) # direct_calls (top of stack)
+ self.assertEqual(func1_stats[1], 2) # cumulative_calls
+ self.assertEqual(
+ func1_stats[2], 2.0
+ ) # tt (total time - 2 samples * 1 sec)
+ self.assertEqual(func1_stats[3], 2.0) # ct (cumulative time)
+
+ func2_stats = collector.stats[("file.py", 20, "func2")]
+ self.assertEqual(
+ func2_stats[0], 0
+ ) # direct_calls (never top of stack)
+ self.assertEqual(
+ func2_stats[1], 2
+ ) # cumulative_calls (appears in stack)
+ self.assertEqual(func2_stats[2], 0.0) # tt (no direct calls)
+ self.assertEqual(func2_stats[3], 2.0) # ct (cumulative time)
+
+ def test_collapsed_stack_collector_basic(self):
+ collector = CollapsedStackCollector()
+
+ # Test empty state
+ self.assertEqual(len(collector.call_trees), 0)
+ self.assertEqual(len(collector.function_samples), 0)
+
+ # Test collecting sample data
+ test_frames = [
+ (1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])
+ ]
+ collector.collect(test_frames)
+
+ # Should store call tree (reversed)
+ self.assertEqual(len(collector.call_trees), 1)
+ expected_tree = [("file.py", 20, "func2"), ("file.py", 10, "func1")]
+ self.assertEqual(collector.call_trees[0], expected_tree)
+
+ # Should count function samples
+ self.assertEqual(
+ collector.function_samples[("file.py", 10, "func1")], 1
+ )
+ self.assertEqual(
+ collector.function_samples[("file.py", 20, "func2")], 1
+ )
+
+ def test_collapsed_stack_collector_export(self):
+ collapsed_out = tempfile.NamedTemporaryFile(delete=False)
+ self.addCleanup(close_and_unlink, collapsed_out)
+
+ collector = CollapsedStackCollector()
+
+ test_frames1 = [
+ (1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])
+ ]
+ test_frames2 = [
+ (1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])
+ ] # Same stack
+ test_frames3 = [(1, [("other.py", 5, "other_func")])]
+
+ collector.collect(test_frames1)
+ collector.collect(test_frames2)
+ collector.collect(test_frames3)
+
+ collector.export(collapsed_out.name)
+ # Check file contents
+ with open(collapsed_out.name, "r") as f:
+ content = f.read()
+
+ lines = content.strip().split("\n")
+ self.assertEqual(len(lines), 2) # Two unique stacks
+
+ # Check collapsed format: file:func:line;file:func:line count
+ stack1_expected = "file.py:func2:20;file.py:func1:10 2"
+ stack2_expected = "other.py:other_func:5 1"
+
+ self.assertIn(stack1_expected, lines)
+ self.assertIn(stack2_expected, lines)
+
+ def test_pstats_collector_export(self):
+ collector = PstatsCollector(
+ sample_interval_usec=1000000
+ ) # 1 second intervals
+
+ test_frames1 = [
+ (
+ 1,
+ [
+ MockFrameInfo("file.py", 10, "func1"),
+ MockFrameInfo("file.py", 20, "func2"),
+ ],
+ )
+ ]
+ test_frames2 = [
+ (
+ 1,
+ [
+ MockFrameInfo("file.py", 10, "func1"),
+ MockFrameInfo("file.py", 20, "func2"),
+ ],
+ )
+ ] # Same stack
+ test_frames3 = [(1, [MockFrameInfo("other.py", 5, "other_func")])]
+
+ collector.collect(test_frames1)
+ collector.collect(test_frames2)
+ collector.collect(test_frames3)
+
+ pstats_out = tempfile.NamedTemporaryFile(
+ suffix=".pstats", delete=False
+ )
+ self.addCleanup(close_and_unlink, pstats_out)
+ collector.export(pstats_out.name)
+
+ # Check file can be loaded with marshal
+ with open(pstats_out.name, "rb") as f:
+ stats_data = marshal.load(f)
+
+ # Should be a dictionary with the sampled marker
+ self.assertIsInstance(stats_data, dict)
+ self.assertIn(("__sampled__",), stats_data)
+ self.assertTrue(stats_data[("__sampled__",)])
+
+ # Should have function data
+ function_entries = [
+ k for k in stats_data.keys() if k != ("__sampled__",)
+ ]
+ self.assertGreater(len(function_entries), 0)
+
+ # Check specific function stats format: (cc, nc, tt, ct, callers)
+ func1_key = ("file.py", 10, "func1")
+ func2_key = ("file.py", 20, "func2")
+ other_key = ("other.py", 5, "other_func")
+
+ self.assertIn(func1_key, stats_data)
+ self.assertIn(func2_key, stats_data)
+ self.assertIn(other_key, stats_data)
+
+ # Check func1 stats (should have 2 samples)
+ func1_stats = stats_data[func1_key]
+ self.assertEqual(func1_stats[0], 2) # total_calls
+ self.assertEqual(func1_stats[1], 2) # nc (non-recursive calls)
+ self.assertEqual(func1_stats[2], 2.0) # tt (total time)
+ self.assertEqual(func1_stats[3], 2.0) # ct (cumulative time)
+
+
+class TestSampleProfiler(unittest.TestCase):
+ """Test the SampleProfiler class."""
+
+ def test_sample_profiler_initialization(self):
+ """Test SampleProfiler initialization with various parameters."""
+ from profile.sample import SampleProfiler
+
+ # Mock RemoteUnwinder to avoid permission issues
+ with mock.patch(
+ "_remote_debugging.RemoteUnwinder"
+ ) as mock_unwinder_class:
+ mock_unwinder_class.return_value = mock.MagicMock()
+
+ # Test basic initialization
+ profiler = SampleProfiler(
+ pid=12345, sample_interval_usec=1000, all_threads=False
+ )
+ self.assertEqual(profiler.pid, 12345)
+ self.assertEqual(profiler.sample_interval_usec, 1000)
+ self.assertEqual(profiler.all_threads, False)
+
+ # Test with all_threads=True
+ profiler = SampleProfiler(
+ pid=54321, sample_interval_usec=5000, all_threads=True
+ )
+ self.assertEqual(profiler.pid, 54321)
+ self.assertEqual(profiler.sample_interval_usec, 5000)
+ self.assertEqual(profiler.all_threads, True)
+
+ def test_sample_profiler_sample_method_timing(self):
+ """Test that the sample method respects duration and handles timing correctly."""
+ from profile.sample import SampleProfiler
+
+ # Mock the unwinder to avoid needing a real process
+ mock_unwinder = mock.MagicMock()
+ mock_unwinder.get_stack_trace.return_value = [
+ (
+ 1,
+ [
+ mock.MagicMock(
+ filename="test.py", lineno=10, funcname="test_func"
+ )
+ ],
+ )
+ ]
+
+ with mock.patch(
+ "_remote_debugging.RemoteUnwinder"
+ ) as mock_unwinder_class:
+ mock_unwinder_class.return_value = mock_unwinder
+
+ profiler = SampleProfiler(
+ pid=12345, sample_interval_usec=100000, all_threads=False
+ ) # 100ms interval
+
+ # Mock collector
+ mock_collector = mock.MagicMock()
+
+ # Mock time to control the sampling loop
+ start_time = 1000.0
+ times = [
+ start_time + i * 0.1 for i in range(12)
+ ] # 0, 0.1, 0.2, ..., 1.1 seconds
+
+ with mock.patch("time.perf_counter", side_effect=times):
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ profiler.sample(mock_collector, duration_sec=1)
+
+ result = output.getvalue()
+
+ # Should have captured approximately 10 samples (1 second / 0.1 second interval)
+ self.assertIn("Captured", result)
+ self.assertIn("samples", result)
+
+ # Verify collector was called multiple times
+ self.assertGreaterEqual(mock_collector.collect.call_count, 5)
+ self.assertLessEqual(mock_collector.collect.call_count, 11)
+
+ def test_sample_profiler_error_handling(self):
+ """Test that the sample method handles errors gracefully."""
+ from profile.sample import SampleProfiler
+
+ # Mock unwinder that raises errors
+ mock_unwinder = mock.MagicMock()
+ error_sequence = [
+ RuntimeError("Process died"),
+ [
+ (
+ 1,
+ [
+ mock.MagicMock(
+ filename="test.py", lineno=10, funcname="test_func"
+ )
+ ],
+ )
+ ],
+ UnicodeDecodeError("utf-8", b"", 0, 1, "invalid"),
+ [
+ (
+ 1,
+ [
+ mock.MagicMock(
+ filename="test.py",
+ lineno=20,
+ funcname="test_func2",
+ )
+ ],
+ )
+ ],
+ OSError("Permission denied"),
+ ]
+ mock_unwinder.get_stack_trace.side_effect = error_sequence
+
+ with mock.patch(
+ "_remote_debugging.RemoteUnwinder"
+ ) as mock_unwinder_class:
+ mock_unwinder_class.return_value = mock_unwinder
+
+ profiler = SampleProfiler(
+ pid=12345, sample_interval_usec=10000, all_threads=False
+ )
+
+ mock_collector = mock.MagicMock()
+
+ # Control timing to run exactly 5 samples
+ times = [0.0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06]
+
+ with mock.patch("time.perf_counter", side_effect=times):
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ profiler.sample(mock_collector, duration_sec=0.05)
+
+ result = output.getvalue()
+
+ # Should report error rate
+ self.assertIn("Error rate:", result)
+ self.assertIn("%", result)
+
+ # Collector should have been called only for successful samples (should be > 0)
+ self.assertGreater(mock_collector.collect.call_count, 0)
+ self.assertLessEqual(mock_collector.collect.call_count, 3)
+
+ def test_sample_profiler_missed_samples_warning(self):
+ """Test that the profiler warns about missed samples when sampling is too slow."""
+ from profile.sample import SampleProfiler
+
+ mock_unwinder = mock.MagicMock()
+ mock_unwinder.get_stack_trace.return_value = [
+ (
+ 1,
+ [
+ mock.MagicMock(
+ filename="test.py", lineno=10, funcname="test_func"
+ )
+ ],
+ )
+ ]
+
+ with mock.patch(
+ "_remote_debugging.RemoteUnwinder"
+ ) as mock_unwinder_class:
+ mock_unwinder_class.return_value = mock_unwinder
+
+ # Use very short interval that we'll miss
+ profiler = SampleProfiler(
+ pid=12345, sample_interval_usec=1000, all_threads=False
+ ) # 1ms interval
+
+ mock_collector = mock.MagicMock()
+
+ # Simulate slow sampling where we miss many samples
+ times = [
+ 0.0,
+ 0.1,
+ 0.2,
+ 0.3,
+ 0.4,
+ 0.5,
+ 0.6,
+ 0.7,
+ ] # Extra time points to avoid StopIteration
+
+ with mock.patch("time.perf_counter", side_effect=times):
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ profiler.sample(mock_collector, duration_sec=0.5)
+
+ result = output.getvalue()
+
+ # Should warn about missed samples
+ self.assertIn("Warning: missed", result)
+ self.assertIn("samples from the expected total", result)
+
+
+@force_not_colorized_test_class
+class TestPrintSampledStats(unittest.TestCase):
+ """Test the print_sampled_stats function."""
+
+ def setUp(self):
+ """Set up test data."""
+ # Mock stats data
+ self.mock_stats = mock.MagicMock()
+ self.mock_stats.stats = {
+ ("file1.py", 10, "func1"): (
+ 100,
+ 100,
+ 0.5,
+ 0.5,
+ {},
+ ), # cc, nc, tt, ct, callers
+ ("file2.py", 20, "func2"): (50, 50, 0.25, 0.3, {}),
+ ("file3.py", 30, "func3"): (200, 200, 1.5, 2.0, {}),
+ ("file4.py", 40, "func4"): (
+ 10,
+ 10,
+ 0.001,
+ 0.001,
+ {},
+ ), # millisecond range
+ ("file5.py", 50, "func5"): (
+ 5,
+ 5,
+ 0.000001,
+ 0.000002,
+ {},
+ ), # microsecond range
+ }
+
+ def test_print_sampled_stats_basic(self):
+ """Test basic print_sampled_stats functionality."""
+ from profile.sample import print_sampled_stats
+
+ # Capture output
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(self.mock_stats, sample_interval_usec=100)
+
+ result = output.getvalue()
+
+ # Check header is present
+ self.assertIn("Profile Stats:", result)
+ self.assertIn("nsamples", result)
+ self.assertIn("tottime", result)
+ self.assertIn("cumtime", result)
+
+ # Check functions are present
+ self.assertIn("func1", result)
+ self.assertIn("func2", result)
+ self.assertIn("func3", result)
+
+ def test_print_sampled_stats_sorting(self):
+ """Test different sorting options."""
+ from profile.sample import print_sampled_stats
+
+ # Test sort by calls
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ self.mock_stats, sort=0, sample_interval_usec=100
+ )
+
+ result = output.getvalue()
+ lines = result.strip().split("\n")
+
+ # Find the data lines (skip header)
+ data_lines = [l for l in lines if "file" in l and ".py" in l]
+ # func3 should be first (200 calls)
+ self.assertIn("func3", data_lines[0])
+
+ # Test sort by time
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ self.mock_stats, sort=1, sample_interval_usec=100
+ )
+
+ result = output.getvalue()
+ lines = result.strip().split("\n")
+
+ data_lines = [l for l in lines if "file" in l and ".py" in l]
+ # func3 should be first (1.5s time)
+ self.assertIn("func3", data_lines[0])
+
+ def test_print_sampled_stats_limit(self):
+ """Test limiting output rows."""
+ from profile.sample import print_sampled_stats
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ self.mock_stats, limit=2, sample_interval_usec=100
+ )
+
+ result = output.getvalue()
+
+ # Count function entries in the main stats section (not in summary)
+ lines = result.split("\n")
+ # Find where the main stats section ends (before summary)
+ main_section_lines = []
+ for line in lines:
+ if "Summary of Interesting Functions:" in line:
+ break
+ main_section_lines.append(line)
+
+ # Count function entries only in main section
+ func_count = sum(
+ 1
+ for line in main_section_lines
+ if "func" in line and ".py" in line
+ )
+ self.assertEqual(func_count, 2)
+
+ def test_print_sampled_stats_time_units(self):
+ """Test proper time unit selection."""
+ from profile.sample import print_sampled_stats
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(self.mock_stats, sample_interval_usec=100)
+
+ result = output.getvalue()
+
+ # Should use seconds for the header since max time is > 1s
+ self.assertIn("tottime (s)", result)
+ self.assertIn("cumtime (s)", result)
+
+ # Test with only microsecond-range times
+ micro_stats = mock.MagicMock()
+ micro_stats.stats = {
+ ("file1.py", 10, "func1"): (100, 100, 0.000005, 0.000010, {}),
+ }
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(micro_stats, sample_interval_usec=100)
+
+ result = output.getvalue()
+
+ # Should use microseconds
+ self.assertIn("tottime (μs)", result)
+ self.assertIn("cumtime (μs)", result)
+
+ def test_print_sampled_stats_summary(self):
+ """Test summary section generation."""
+ from profile.sample import print_sampled_stats
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ self.mock_stats,
+ show_summary=True,
+ sample_interval_usec=100,
+ )
+
+ result = output.getvalue()
+
+ # Check summary sections are present
+ self.assertIn("Summary of Interesting Functions:", result)
+ self.assertIn(
+ "Functions with Highest Direct/Cumulative Ratio (Hot Spots):",
+ result,
+ )
+ self.assertIn(
+ "Functions with Highest Call Frequency (Indirect Calls):", result
+ )
+ self.assertIn(
+ "Functions with Highest Call Magnification (Cumulative/Direct):",
+ result,
+ )
+
+ def test_print_sampled_stats_no_summary(self):
+ """Test disabling summary output."""
+ from profile.sample import print_sampled_stats
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ self.mock_stats,
+ show_summary=False,
+ sample_interval_usec=100,
+ )
+
+ result = output.getvalue()
+
+ # Summary should not be present
+ self.assertNotIn("Summary of Interesting Functions:", result)
+
+ def test_print_sampled_stats_empty_stats(self):
+ """Test with empty stats."""
+ from profile.sample import print_sampled_stats
+
+ empty_stats = mock.MagicMock()
+ empty_stats.stats = {}
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(empty_stats, sample_interval_usec=100)
+
+ result = output.getvalue()
+
+ # Should still print header
+ self.assertIn("Profile Stats:", result)
+
+ def test_print_sampled_stats_sample_percentage_sorting(self):
+ """Test sample percentage sorting options."""
+ from profile.sample import print_sampled_stats
+
+ # Add a function with high sample percentage (more direct calls than func3's 200)
+ self.mock_stats.stats[("expensive.py", 60, "expensive_func")] = (
+ 300, # direct calls (higher than func3's 200)
+ 300, # cumulative calls
+ 1.0, # total time
+ 1.0, # cumulative time
+ {},
+ )
+
+ # Test sort by sample percentage
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ self.mock_stats, sort=3, sample_interval_usec=100
+ ) # sample percentage
+
+ result = output.getvalue()
+ lines = result.strip().split("\n")
+
+ data_lines = [l for l in lines if ".py" in l and "func" in l]
+ # expensive_func should be first (highest sample percentage)
+ self.assertIn("expensive_func", data_lines[0])
+
+ def test_print_sampled_stats_with_recursive_calls(self):
+ """Test print_sampled_stats with recursive calls where nc != cc."""
+ from profile.sample import print_sampled_stats
+
+ # Create stats with recursive calls (nc != cc)
+ recursive_stats = mock.MagicMock()
+ recursive_stats.stats = {
+ # (direct_calls, cumulative_calls, tt, ct, callers) - recursive function
+ ("recursive.py", 10, "factorial"): (
+ 5, # direct_calls
+ 10, # cumulative_calls (appears more times in stack due to recursion)
+ 0.5,
+ 0.6,
+ {},
+ ),
+ ("normal.py", 20, "normal_func"): (
+ 3, # direct_calls
+ 3, # cumulative_calls (same as direct for non-recursive)
+ 0.2,
+ 0.2,
+ {},
+ ),
+ }
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(recursive_stats, sample_interval_usec=100)
+
+ result = output.getvalue()
+
+ # Should display recursive calls as "5/10" format
+ self.assertIn("5/10", result) # nc/cc format for recursive calls
+ self.assertIn("3", result) # just nc for non-recursive calls
+ self.assertIn("factorial", result)
+ self.assertIn("normal_func", result)
+
+ def test_print_sampled_stats_with_zero_call_counts(self):
+ """Test print_sampled_stats with zero call counts to trigger division protection."""
+ from profile.sample import print_sampled_stats
+
+ # Create stats with zero call counts
+ zero_stats = mock.MagicMock()
+ zero_stats.stats = {
+ ("file.py", 10, "zero_calls"): (0, 0, 0.0, 0.0, {}), # Zero calls
+ ("file.py", 20, "normal_func"): (
+ 5,
+ 5,
+ 0.1,
+ 0.1,
+ {},
+ ), # Normal function
+ }
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(zero_stats, sample_interval_usec=100)
+
+ result = output.getvalue()
+
+ # Should handle zero call counts gracefully
+ self.assertIn("zero_calls", result)
+ self.assertIn("zero_calls", result)
+ self.assertIn("normal_func", result)
+
+ def test_print_sampled_stats_sort_by_name(self):
+ """Test sort by function name option."""
+ from profile.sample import print_sampled_stats
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ self.mock_stats, sort=-1, sample_interval_usec=100
+ ) # sort by name
+
+ result = output.getvalue()
+ lines = result.strip().split("\n")
+
+ # Find the data lines (skip header and summary)
+ # Data lines start with whitespace and numbers, and contain filename:lineno(function)
+ data_lines = []
+ for line in lines:
+ # Skip header lines and summary sections
+ if (
+ line.startswith(" ")
+ and "(" in line
+ and ")" in line
+ and not line.startswith(
+ " 1."
+ ) # Skip summary lines that start with times
+ and not line.startswith(
+ " 0."
+ ) # Skip summary lines that start with times
+ and not "per call" in line # Skip summary lines
+ and not "calls" in line # Skip summary lines
+ and not "total time" in line # Skip summary lines
+ and not "cumulative time" in line
+ ): # Skip summary lines
+ data_lines.append(line)
+
+ # Extract just the function names for comparison
+ func_names = []
+ import re
+
+ for line in data_lines:
+ # Function name is between the last ( and ), accounting for ANSI color codes
+ match = re.search(r"\(([^)]+)\)$", line)
+ if match:
+ func_name = match.group(1)
+ # Remove ANSI color codes
+ func_name = re.sub(r"\x1b\[[0-9;]*m", "", func_name)
+ func_names.append(func_name)
+
+ # Verify we extracted function names and they are sorted
+ self.assertGreater(
+ len(func_names), 0, "Should have extracted some function names"
+ )
+ self.assertEqual(
+ func_names,
+ sorted(func_names),
+ f"Function names {func_names} should be sorted alphabetically",
+ )
+
+ def test_print_sampled_stats_with_zero_time_functions(self):
+ """Test summary sections with functions that have zero time."""
+ from profile.sample import print_sampled_stats
+
+ # Create stats with zero-time functions
+ zero_time_stats = mock.MagicMock()
+ zero_time_stats.stats = {
+ ("file1.py", 10, "zero_time_func"): (
+ 5,
+ 5,
+ 0.0,
+ 0.0,
+ {},
+ ), # Zero time
+ ("file2.py", 20, "normal_func"): (
+ 3,
+ 3,
+ 0.1,
+ 0.1,
+ {},
+ ), # Normal time
+ }
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ zero_time_stats,
+ show_summary=True,
+ sample_interval_usec=100,
+ )
+
+ result = output.getvalue()
+
+ # Should handle zero-time functions gracefully in summary
+ self.assertIn("Summary of Interesting Functions:", result)
+ self.assertIn("zero_time_func", result)
+ self.assertIn("normal_func", result)
+
+ def test_print_sampled_stats_with_malformed_qualified_names(self):
+ """Test summary generation with function names that don't contain colons."""
+ from profile.sample import print_sampled_stats
+
+ # Create stats with function names that would create malformed qualified names
+ malformed_stats = mock.MagicMock()
+ malformed_stats.stats = {
+ # Function name without clear module separation
+ ("no_colon_func", 10, "func"): (3, 3, 0.1, 0.1, {}),
+ ("", 20, "empty_filename_func"): (2, 2, 0.05, 0.05, {}),
+ ("normal.py", 30, "normal_func"): (5, 5, 0.2, 0.2, {}),
+ }
+
+ with io.StringIO() as output:
+ with mock.patch("sys.stdout", output):
+ print_sampled_stats(
+ malformed_stats,
+ show_summary=True,
+ sample_interval_usec=100,
+ )
+
+ result = output.getvalue()
+
+ # Should handle malformed names gracefully in summary aggregation
+ self.assertIn("Summary of Interesting Functions:", result)
+ # All function names should appear somewhere in the output
+ self.assertIn("func", result)
+ self.assertIn("empty_filename_func", result)
+ self.assertIn("normal_func", result)
+
+ def test_print_sampled_stats_with_recursive_call_stats_creation(self):
+ """Test create_stats with recursive call data to trigger total_rec_calls branch."""
+ collector = PstatsCollector(sample_interval_usec=1000000) # 1 second
+
+ # Simulate recursive function data where total_rec_calls would be set
+ # We need to manually manipulate the collector result to test this branch
+ collector.result = {
+ ("recursive.py", 10, "factorial"): {
+ "total_rec_calls": 3, # Non-zero recursive calls
+ "direct_calls": 5,
+ "cumulative_calls": 10,
+ },
+ ("normal.py", 20, "normal_func"): {
+ "total_rec_calls": 0, # Zero recursive calls
+ "direct_calls": 2,
+ "cumulative_calls": 5,
+ },
+ }
+
+ collector.create_stats()
+
+ # Check that recursive calls are handled differently from non-recursive
+ factorial_stats = collector.stats[("recursive.py", 10, "factorial")]
+ normal_stats = collector.stats[("normal.py", 20, "normal_func")]
+
+ # factorial should use cumulative_calls (10) as nc
+ self.assertEqual(
+ factorial_stats[1], 10
+ ) # nc should be cumulative_calls
+ self.assertEqual(factorial_stats[0], 5) # cc should be direct_calls
+
+ # normal_func should use cumulative_calls as nc
+ self.assertEqual(normal_stats[1], 5) # nc should be cumulative_calls
+ self.assertEqual(normal_stats[0], 2) # cc should be direct_calls
+
+
+@skip_if_not_supported
+@unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+)
+class TestRecursiveFunctionProfiling(unittest.TestCase):
+ """Test profiling of recursive functions and complex call patterns."""
+
+ def test_recursive_function_call_counting(self):
+ """Test that recursive function calls are counted correctly."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Simulate a recursive call pattern: fibonacci(5) calling itself
+ recursive_frames = [
+ (
+ 1,
+ [ # First sample: deep in recursion
+ MockFrameInfo("fib.py", 10, "fibonacci"),
+ MockFrameInfo("fib.py", 10, "fibonacci"), # recursive call
+ MockFrameInfo(
+ "fib.py", 10, "fibonacci"
+ ), # deeper recursion
+ MockFrameInfo("fib.py", 10, "fibonacci"), # even deeper
+ MockFrameInfo("main.py", 5, "main"), # main caller
+ ],
+ ),
+ (
+ 1,
+ [ # Second sample: different recursion depth
+ MockFrameInfo("fib.py", 10, "fibonacci"),
+ MockFrameInfo("fib.py", 10, "fibonacci"), # recursive call
+ MockFrameInfo("main.py", 5, "main"), # main caller
+ ],
+ ),
+ (
+ 1,
+ [ # Third sample: back to deeper recursion
+ MockFrameInfo("fib.py", 10, "fibonacci"),
+ MockFrameInfo("fib.py", 10, "fibonacci"),
+ MockFrameInfo("fib.py", 10, "fibonacci"),
+ MockFrameInfo("main.py", 5, "main"),
+ ],
+ ),
+ ]
+
+ for frames in recursive_frames:
+ collector.collect([frames])
+
+ collector.create_stats()
+
+ # Check that recursive calls are counted properly
+ fib_key = ("fib.py", 10, "fibonacci")
+ main_key = ("main.py", 5, "main")
+
+ self.assertIn(fib_key, collector.stats)
+ self.assertIn(main_key, collector.stats)
+
+ # Fibonacci should have many calls due to recursion
+ fib_stats = collector.stats[fib_key]
+ direct_calls, cumulative_calls, tt, ct, callers = fib_stats
+
+ # Should have recorded multiple calls (9 total appearances in samples)
+ self.assertEqual(cumulative_calls, 9)
+ self.assertGreater(tt, 0) # Should have some total time
+ self.assertGreater(ct, 0) # Should have some cumulative time
+
+ # Main should have fewer calls
+ main_stats = collector.stats[main_key]
+ main_direct_calls, main_cumulative_calls = main_stats[0], main_stats[1]
+ self.assertEqual(main_direct_calls, 0) # Never directly executing
+ self.assertEqual(main_cumulative_calls, 3) # Appears in all 3 samples
+
+ def test_nested_function_hierarchy(self):
+ """Test profiling of deeply nested function calls."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Simulate a deep call hierarchy
+ deep_call_frames = [
+ (
+ 1,
+ [
+ MockFrameInfo("level1.py", 10, "level1_func"),
+ MockFrameInfo("level2.py", 20, "level2_func"),
+ MockFrameInfo("level3.py", 30, "level3_func"),
+ MockFrameInfo("level4.py", 40, "level4_func"),
+ MockFrameInfo("level5.py", 50, "level5_func"),
+ MockFrameInfo("main.py", 5, "main"),
+ ],
+ ),
+ (
+ 1,
+ [ # Same hierarchy sampled again
+ MockFrameInfo("level1.py", 10, "level1_func"),
+ MockFrameInfo("level2.py", 20, "level2_func"),
+ MockFrameInfo("level3.py", 30, "level3_func"),
+ MockFrameInfo("level4.py", 40, "level4_func"),
+ MockFrameInfo("level5.py", 50, "level5_func"),
+ MockFrameInfo("main.py", 5, "main"),
+ ],
+ ),
+ ]
+
+ for frames in deep_call_frames:
+ collector.collect([frames])
+
+ collector.create_stats()
+
+ # All levels should be recorded
+ for level in range(1, 6):
+ key = (f"level{level}.py", level * 10, f"level{level}_func")
+ self.assertIn(key, collector.stats)
+
+ stats = collector.stats[key]
+ direct_calls, cumulative_calls, tt, ct, callers = stats
+
+ # Each level should appear in stack twice (2 samples)
+ self.assertEqual(cumulative_calls, 2)
+
+ # Only level1 (deepest) should have direct calls
+ if level == 1:
+ self.assertEqual(direct_calls, 2)
+ else:
+ self.assertEqual(direct_calls, 0)
+
+ # Deeper levels should have lower cumulative time than higher levels
+ # (since they don't include time from functions they call)
+ if level == 1: # Deepest level with most time
+ self.assertGreater(ct, 0)
+
+ def test_alternating_call_patterns(self):
+ """Test profiling with alternating call patterns."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Simulate alternating execution paths
+ pattern_frames = [
+ # Pattern A: path through func_a
+ (
+ 1,
+ [
+ MockFrameInfo("module.py", 10, "func_a"),
+ MockFrameInfo("module.py", 30, "shared_func"),
+ MockFrameInfo("main.py", 5, "main"),
+ ],
+ ),
+ # Pattern B: path through func_b
+ (
+ 1,
+ [
+ MockFrameInfo("module.py", 20, "func_b"),
+ MockFrameInfo("module.py", 30, "shared_func"),
+ MockFrameInfo("main.py", 5, "main"),
+ ],
+ ),
+ # Pattern A again
+ (
+ 1,
+ [
+ MockFrameInfo("module.py", 10, "func_a"),
+ MockFrameInfo("module.py", 30, "shared_func"),
+ MockFrameInfo("main.py", 5, "main"),
+ ],
+ ),
+ # Pattern B again
+ (
+ 1,
+ [
+ MockFrameInfo("module.py", 20, "func_b"),
+ MockFrameInfo("module.py", 30, "shared_func"),
+ MockFrameInfo("main.py", 5, "main"),
+ ],
+ ),
+ ]
+
+ for frames in pattern_frames:
+ collector.collect([frames])
+
+ collector.create_stats()
+
+ # Check that both paths are recorded equally
+ func_a_key = ("module.py", 10, "func_a")
+ func_b_key = ("module.py", 20, "func_b")
+ shared_key = ("module.py", 30, "shared_func")
+ main_key = ("main.py", 5, "main")
+
+ # func_a and func_b should each be directly executing twice
+ self.assertEqual(collector.stats[func_a_key][0], 2) # direct_calls
+ self.assertEqual(collector.stats[func_a_key][1], 2) # cumulative_calls
+ self.assertEqual(collector.stats[func_b_key][0], 2) # direct_calls
+ self.assertEqual(collector.stats[func_b_key][1], 2) # cumulative_calls
+
+ # shared_func should appear in all samples (4 times) but never directly executing
+ self.assertEqual(collector.stats[shared_key][0], 0) # direct_calls
+ self.assertEqual(collector.stats[shared_key][1], 4) # cumulative_calls
+
+ # main should appear in all samples but never directly executing
+ self.assertEqual(collector.stats[main_key][0], 0) # direct_calls
+ self.assertEqual(collector.stats[main_key][1], 4) # cumulative_calls
+
+ def test_collapsed_stack_with_recursion(self):
+ """Test collapsed stack collector with recursive patterns."""
+ collector = CollapsedStackCollector()
+
+ # Recursive call pattern
+ recursive_frames = [
+ (
+ 1,
+ [
+ ("factorial.py", 10, "factorial"),
+ ("factorial.py", 10, "factorial"), # recursive
+ ("factorial.py", 10, "factorial"), # deeper
+ ("main.py", 5, "main"),
+ ],
+ ),
+ (
+ 1,
+ [
+ ("factorial.py", 10, "factorial"),
+ ("factorial.py", 10, "factorial"), # different depth
+ ("main.py", 5, "main"),
+ ],
+ ),
+ ]
+
+ for frames in recursive_frames:
+ collector.collect([frames])
+
+ # Should capture both call trees
+ self.assertEqual(len(collector.call_trees), 2)
+
+ # First tree should be longer (deeper recursion)
+ tree1 = collector.call_trees[0]
+ tree2 = collector.call_trees[1]
+
+ # Trees should be different lengths due to different recursion depths
+ self.assertNotEqual(len(tree1), len(tree2))
+
+ # Both should contain factorial calls
+ self.assertTrue(any("factorial" in str(frame) for frame in tree1))
+ self.assertTrue(any("factorial" in str(frame) for frame in tree2))
+
+ # Function samples should count all occurrences
+ factorial_key = ("factorial.py", 10, "factorial")
+ main_key = ("main.py", 5, "main")
+
+ # factorial appears 5 times total (3 + 2)
+ self.assertEqual(collector.function_samples[factorial_key], 5)
+ # main appears 2 times total
+ self.assertEqual(collector.function_samples[main_key], 2)
+
+
+@requires_subprocess()
+@skip_if_not_supported
+class TestSampleProfilerIntegration(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.test_script = '''
+import time
+import os
+
+def slow_fibonacci(n):
+ """Recursive fibonacci - should show up prominently in profiler."""
+ if n <= 1:
+ return n
+ return slow_fibonacci(n-1) + slow_fibonacci(n-2)
+
+def cpu_intensive_work():
+ """CPU intensive work that should show in profiler."""
+ result = 0
+ for i in range(10000):
+ result += i * i
+ if i % 100 == 0:
+ result = result % 1000000
+ return result
+
+def medium_computation():
+ """Medium complexity function."""
+ result = 0
+ for i in range(100):
+ result += i * i
+ return result
+
+def fast_loop():
+ """Fast simple loop."""
+ total = 0
+ for i in range(50):
+ total += i
+ return total
+
+def nested_calls():
+ """Test nested function calls."""
+ def level1():
+ def level2():
+ return medium_computation()
+ return level2()
+ return level1()
+
+def main_loop():
+ """Main test loop with different execution paths."""
+ iteration = 0
+
+ while True:
+ iteration += 1
+
+ # Different execution paths - focus on CPU intensive work
+ if iteration % 3 == 0:
+ # Very CPU intensive
+ result = cpu_intensive_work()
+ elif iteration % 5 == 0:
+ # Expensive recursive operation
+ result = slow_fibonacci(12)
+ else:
+ # Medium operation
+ result = nested_calls()
+
+ # No sleep - keep CPU busy
+
+if __name__ == "__main__":
+ main_loop()
+'''
+
+ def test_sampling_basic_functionality(self):
+ with (
+ test_subprocess(self.test_script) as proc,
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ ):
+ try:
+ profile.sample.sample(
+ proc.pid,
+ duration_sec=2,
+ sample_interval_usec=1000, # 1ms
+ show_summary=False,
+ )
+ except PermissionError:
+ self.skipTest("Insufficient permissions for remote profiling")
+
+ output = captured_output.getvalue()
+
+ # Basic checks on output
+ self.assertIn("Captured", output)
+ self.assertIn("samples", output)
+ self.assertIn("Profile Stats", output)
+
+ # Should see some of our test functions
+ self.assertIn("slow_fibonacci", output)
+
+ def test_sampling_with_pstats_export(self):
+ pstats_out = tempfile.NamedTemporaryFile(
+ suffix=".pstats", delete=False
+ )
+ self.addCleanup(close_and_unlink, pstats_out)
+
+ with test_subprocess(self.test_script) as proc:
+ # Suppress profiler output when testing file export
+ with (
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ ):
+ try:
+ profile.sample.sample(
+ proc.pid,
+ duration_sec=1,
+ filename=pstats_out.name,
+ sample_interval_usec=10000,
+ )
+ except PermissionError:
+ self.skipTest(
+ "Insufficient permissions for remote profiling"
+ )
+
+ # Verify file was created and contains valid data
+ self.assertTrue(os.path.exists(pstats_out.name))
+ self.assertGreater(os.path.getsize(pstats_out.name), 0)
+
+ # Try to load the stats file
+ with open(pstats_out.name, "rb") as f:
+ stats_data = marshal.load(f)
+
+ # Should be a dictionary with the sampled marker
+ self.assertIsInstance(stats_data, dict)
+ self.assertIn(("__sampled__",), stats_data)
+ self.assertTrue(stats_data[("__sampled__",)])
+
+ # Should have some function data
+ function_entries = [
+ k for k in stats_data.keys() if k != ("__sampled__",)
+ ]
+ self.assertGreater(len(function_entries), 0)
+
+ def test_sampling_with_collapsed_export(self):
+ collapsed_file = tempfile.NamedTemporaryFile(
+ suffix=".txt", delete=False
+ )
+ self.addCleanup(close_and_unlink, collapsed_file)
+
+ with (
+ test_subprocess(self.test_script) as proc,
+ ):
+ # Suppress profiler output when testing file export
+ with (
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ ):
+ try:
+ profile.sample.sample(
+ proc.pid,
+ duration_sec=1,
+ filename=collapsed_file.name,
+ output_format="collapsed",
+ sample_interval_usec=10000,
+ )
+ except PermissionError:
+ self.skipTest(
+ "Insufficient permissions for remote profiling"
+ )
+
+ # Verify file was created and contains valid data
+ self.assertTrue(os.path.exists(collapsed_file.name))
+ self.assertGreater(os.path.getsize(collapsed_file.name), 0)
+
+ # Check file format
+ with open(collapsed_file.name, "r") as f:
+ content = f.read()
+
+ lines = content.strip().split("\n")
+ self.assertGreater(len(lines), 0)
+
+ # Each line should have format: stack_trace count
+ for line in lines:
+ parts = line.rsplit(" ", 1)
+ self.assertEqual(len(parts), 2)
+
+ stack_trace, count_str = parts
+ self.assertGreater(len(stack_trace), 0)
+ self.assertTrue(count_str.isdigit())
+ self.assertGreater(int(count_str), 0)
+
+ # Stack trace should contain semicolon-separated entries
+ if ";" in stack_trace:
+ stack_parts = stack_trace.split(";")
+ for part in stack_parts:
+ # Each part should be file:function:line
+ self.assertIn(":", part)
+
+ def test_sampling_all_threads(self):
+ with (
+ test_subprocess(self.test_script) as proc,
+ # Suppress profiler output
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ ):
+ try:
+ profile.sample.sample(
+ proc.pid,
+ duration_sec=1,
+ all_threads=True,
+ sample_interval_usec=10000,
+ show_summary=False,
+ )
+ except PermissionError:
+ self.skipTest("Insufficient permissions for remote profiling")
+
+ # Just verify that sampling completed without error
+ # We're not testing output format here
+
+
+@skip_if_not_supported
+@unittest.skipIf(
+ sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
+ "Test only runs on Linux with process_vm_readv support",
+)
+class TestSampleProfilerErrorHandling(unittest.TestCase):
+ def test_invalid_pid(self):
+ with self.assertRaises((OSError, RuntimeError)):
+ profile.sample.sample(-1, duration_sec=1)
+
+ def test_process_dies_during_sampling(self):
+ with test_subprocess("import time; time.sleep(0.5); exit()") as proc:
+ with (
+ io.StringIO() as captured_output,
+ mock.patch("sys.stdout", captured_output),
+ ):
+ try:
+ profile.sample.sample(
+ proc.pid,
+ duration_sec=2, # Longer than process lifetime
+ sample_interval_usec=50000,
+ )
+ except PermissionError:
+ self.skipTest(
+ "Insufficient permissions for remote profiling"
+ )
+
+ output = captured_output.getvalue()
+
+ self.assertIn("Error rate", output)
+
+ def test_invalid_output_format(self):
+ with self.assertRaises(ValueError):
+ profile.sample.sample(
+ os.getpid(),
+ duration_sec=1,
+ output_format="invalid_format",
+ )
+
+ def test_invalid_output_format_with_mocked_profiler(self):
+ """Test invalid output format with proper mocking to avoid permission issues."""
+ with mock.patch(
+ "profile.sample.SampleProfiler"
+ ) as mock_profiler_class:
+ mock_profiler = mock.MagicMock()
+ mock_profiler_class.return_value = mock_profiler
+
+ with self.assertRaises(ValueError) as cm:
+ profile.sample.sample(
+ 12345,
+ duration_sec=1,
+ output_format="unknown_format",
+ )
+
+ # Should raise ValueError with the invalid format name
+ self.assertIn(
+ "Invalid output format: unknown_format", str(cm.exception)
+ )
+
+ def test_is_process_running(self):
+ with test_subprocess("import time; time.sleep(1000)") as proc:
+ try:
+ profiler = SampleProfiler(pid=proc.pid, sample_interval_usec=1000, all_threads=False)
+ except PermissionError:
+ self.skipTest(
+ "Insufficient permissions to read the stack trace"
+ )
+ self.assertTrue(profiler._is_process_running())
+ self.assertIsNotNone(profiler.unwinder.get_stack_trace())
+ proc.kill()
+ proc.wait()
+ # ValueError on MacOS (yeah I know), ProcessLookupError on Linux and Windows
+ self.assertRaises((ValueError, ProcessLookupError), profiler.unwinder.get_stack_trace)
+
+ # Exit the context manager to ensure the process is terminated
+ self.assertFalse(profiler._is_process_running())
+ self.assertRaises((ValueError, ProcessLookupError), profiler.unwinder.get_stack_trace)
+
+ @unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
+ def test_esrch_signal_handling(self):
+ with test_subprocess("import time; time.sleep(1000)") as proc:
+ try:
+ unwinder = _remote_debugging.RemoteUnwinder(proc.pid)
+ except PermissionError:
+ self.skipTest(
+ "Insufficient permissions to read the stack trace"
+ )
+ initial_trace = unwinder.get_stack_trace()
+ self.assertIsNotNone(initial_trace)
+
+ proc.kill()
+
+ # Wait for the process to die and try to get another trace
+ proc.wait()
+
+ with self.assertRaises(ProcessLookupError):
+ unwinder.get_stack_trace()
+
+
+
+class TestSampleProfilerCLI(unittest.TestCase):
+ def test_cli_collapsed_format_validation(self):
+ """Test that CLI properly validates incompatible options with collapsed format."""
+ test_cases = [
+ # Test sort options are invalid with collapsed
+ (
+ ["profile.sample", "--collapsed", "--sort-nsamples", "12345"],
+ "sort",
+ ),
+ (
+ ["profile.sample", "--collapsed", "--sort-tottime", "12345"],
+ "sort",
+ ),
+ (
+ [
+ "profile.sample",
+ "--collapsed",
+ "--sort-cumtime",
+ "12345",
+ ],
+ "sort",
+ ),
+ (
+ [
+ "profile.sample",
+ "--collapsed",
+ "--sort-sample-pct",
+ "12345",
+ ],
+ "sort",
+ ),
+ (
+ [
+ "profile.sample",
+ "--collapsed",
+ "--sort-cumul-pct",
+ "12345",
+ ],
+ "sort",
+ ),
+ (
+ ["profile.sample", "--collapsed", "--sort-name", "12345"],
+ "sort",
+ ),
+ # Test limit option is invalid with collapsed
+ (["profile.sample", "--collapsed", "-l", "20", "12345"], "limit"),
+ (
+ ["profile.sample", "--collapsed", "--limit", "20", "12345"],
+ "limit",
+ ),
+ # Test no-summary option is invalid with collapsed
+ (
+ ["profile.sample", "--collapsed", "--no-summary", "12345"],
+ "summary",
+ ),
+ ]
+
+ for test_args, expected_error_keyword in test_cases:
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
+ self.assertRaises(SystemExit) as cm,
+ ):
+ profile.sample.main()
+
+ self.assertEqual(cm.exception.code, 2) # argparse error code
+ error_msg = mock_stderr.getvalue()
+ self.assertIn("error:", error_msg)
+ self.assertIn("--pstats format", error_msg)
+
+ def test_cli_default_collapsed_filename(self):
+ """Test that collapsed format gets a default filename when not specified."""
+ test_args = ["profile.sample", "--collapsed", "12345"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ ):
+ profile.sample.main()
+
+ # Check that filename was set to default collapsed format
+ mock_sample.assert_called_once()
+ call_args = mock_sample.call_args[1]
+ self.assertEqual(call_args["output_format"], "collapsed")
+ self.assertEqual(call_args["filename"], "collapsed.12345.txt")
+
+ def test_cli_custom_output_filenames(self):
+ """Test custom output filenames for both formats."""
+ test_cases = [
+ (
+ ["profile.sample", "--pstats", "-o", "custom.pstats", "12345"],
+ "custom.pstats",
+ "pstats",
+ ),
+ (
+ ["profile.sample", "--collapsed", "-o", "custom.txt", "12345"],
+ "custom.txt",
+ "collapsed",
+ ),
+ ]
+
+ for test_args, expected_filename, expected_format in test_cases:
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ ):
+ profile.sample.main()
+
+ mock_sample.assert_called_once()
+ call_args = mock_sample.call_args[1]
+ self.assertEqual(call_args["filename"], expected_filename)
+ self.assertEqual(call_args["output_format"], expected_format)
+
+ def test_cli_missing_required_arguments(self):
+ """Test that CLI requires PID argument."""
+ with (
+ mock.patch("sys.argv", ["profile.sample"]),
+ mock.patch("sys.stderr", io.StringIO()),
+ ):
+ with self.assertRaises(SystemExit):
+ profile.sample.main()
+
+ def test_cli_mutually_exclusive_format_options(self):
+ """Test that pstats and collapsed options are mutually exclusive."""
+ with (
+ mock.patch(
+ "sys.argv",
+ ["profile.sample", "--pstats", "--collapsed", "12345"],
+ ),
+ mock.patch("sys.stderr", io.StringIO()),
+ ):
+ with self.assertRaises(SystemExit):
+ profile.sample.main()
+
+ def test_argument_parsing_basic(self):
+ test_args = ["profile.sample", "12345"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ ):
+ profile.sample.main()
+
+ mock_sample.assert_called_once_with(
+ 12345,
+ sample_interval_usec=100,
+ duration_sec=10,
+ filename=None,
+ all_threads=False,
+ limit=15,
+ sort=2,
+ show_summary=True,
+ output_format="pstats",
+ realtime_stats=False,
+ )
+
+ def test_sort_options(self):
+ sort_options = [
+ ("--sort-nsamples", 0),
+ ("--sort-tottime", 1),
+ ("--sort-cumtime", 2),
+ ("--sort-sample-pct", 3),
+ ("--sort-cumul-pct", 4),
+ ("--sort-name", -1),
+ ]
+
+ for option, expected_sort_value in sort_options:
+ test_args = ["profile.sample", option, "12345"]
+
+ with (
+ mock.patch("sys.argv", test_args),
+ mock.patch("profile.sample.sample") as mock_sample,
+ ):
+ profile.sample.main()
+
+ mock_sample.assert_called_once()
+ call_args = mock_sample.call_args[1]
+ self.assertEqual(
+ call_args["sort"],
+ expected_sort_value,
+ )
+ mock_sample.reset_mock()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_setcomps.py b/Lib/test/test_setcomps.py
index 0bb02ef11f6..6fc5bb74036 100644
--- a/Lib/test/test_setcomps.py
+++ b/Lib/test/test_setcomps.py
@@ -154,7 +154,7 @@ We also repeat each of the above scoping tests inside a function
class SetComprehensionTest(unittest.TestCase):
def test_exception_locations(self):
# The location of an exception raised from __init__ or
- # __next__ should should be the iterator expression
+ # __next__ should be the iterator expression
def init_raises():
try:
diff --git a/Lib/test/test_sqlite3/test_dbapi.py b/Lib/test/test_sqlite3/test_dbapi.py
index 291e0356253..3602726437d 100644
--- a/Lib/test/test_sqlite3/test_dbapi.py
+++ b/Lib/test/test_sqlite3/test_dbapi.py
@@ -31,8 +31,7 @@ import urllib.parse
import warnings
from test.support import (
- SHORT_TIMEOUT, check_disallow_instantiation, requires_subprocess,
- is_apple, is_emscripten, is_wasi
+ SHORT_TIMEOUT, check_disallow_instantiation, requires_subprocess
)
from test.support import gc_collect
from test.support import threading_helper, import_helper
@@ -641,14 +640,21 @@ class OpenTests(unittest.TestCase):
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)
+ def get_undecodable_path(self):
+ path = TESTFN_UNDECODABLE
+ if not path:
+ self.skipTest("only works if there are undecodable paths")
+ try:
+ open(path, 'wb').close()
+ except OSError:
+ self.skipTest(f"can't create file with undecodable path {path!r}")
+ unlink(path)
+ return path
+
@unittest.skipIf(sys.platform == "win32", "skipped on Windows")
- @unittest.skipIf(is_apple, "skipped on Apple platforms")
- @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
- @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
def test_open_with_undecodable_path(self):
- path = TESTFN_UNDECODABLE
+ path = self.get_undecodable_path()
self.addCleanup(unlink, path)
- self.assertFalse(os.path.exists(path))
with contextlib.closing(sqlite.connect(path)) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)
@@ -688,14 +694,10 @@ class OpenTests(unittest.TestCase):
cx.execute(self._sql)
@unittest.skipIf(sys.platform == "win32", "skipped on Windows")
- @unittest.skipIf(is_apple, "skipped on Apple platforms")
- @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
- @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
def test_open_undecodable_uri(self):
- path = TESTFN_UNDECODABLE
+ path = self.get_undecodable_path()
self.addCleanup(unlink, path)
uri = "file:" + urllib.parse.quote(path)
- self.assertFalse(os.path.exists(path))
with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)
diff --git a/Lib/test/test_string/_support.py b/Lib/test/test_string/_support.py
index abdddaf187b..e1d7f6f6500 100644
--- a/Lib/test/test_string/_support.py
+++ b/Lib/test/test_string/_support.py
@@ -2,33 +2,45 @@ from string.templatelib import Interpolation
class TStringBaseCase:
+ def assertInterpolationEqual(self, i, exp):
+ """Test Interpolation equality.
+
+ The *i* argument must be an Interpolation instance.
+
+ The *exp* argument must be a tuple of the form
+ (value, expression, conversion, format_spec) where the final three
+ items may be omitted and are assumed to be '', None and '' respectively.
+ """
+ if len(exp) == 4:
+ actual = (i.value, i.expression, i.conversion, i.format_spec)
+ self.assertEqual(actual, exp)
+ elif len(exp) == 3:
+ self.assertEqual((i.value, i.expression, i.conversion), exp)
+ self.assertEqual(i.format_spec, "")
+ elif len(exp) == 2:
+ self.assertEqual((i.value, i.expression), exp)
+ self.assertEqual(i.conversion, None)
+ self.assertEqual(i.format_spec, "")
+ elif len(exp) == 1:
+ self.assertEqual((i.value,), exp)
+ self.assertEqual(i.expression, "")
+ self.assertEqual(i.conversion, None)
+ self.assertEqual(i.format_spec, "")
+
def assertTStringEqual(self, t, strings, interpolations):
"""Test template string literal equality.
The *strings* argument must be a tuple of strings equal to *t.strings*.
The *interpolations* argument must be a sequence of tuples which are
- compared against *t.interpolations*. Each tuple consists of
- (value, expression, conversion, format_spec), though the final two
- items may be omitted, and are assumed to be None and '' respectively.
+ compared against *t.interpolations*. Each tuple must match the form
+ described in the `assertInterpolationEqual` method.
"""
self.assertEqual(t.strings, strings)
self.assertEqual(len(t.interpolations), len(interpolations))
for i, exp in zip(t.interpolations, interpolations, strict=True):
- if len(exp) == 4:
- actual = (i.value, i.expression, i.conversion, i.format_spec)
- self.assertEqual(actual, exp)
- continue
-
- if len(exp) == 3:
- self.assertEqual((i.value, i.expression, i.conversion), exp)
- self.assertEqual(i.format_spec, '')
- continue
-
- self.assertEqual((i.value, i.expression), exp)
- self.assertEqual(i.format_spec, '')
- self.assertIsNone(i.conversion)
+ self.assertInterpolationEqual(i, exp)
def convert(value, conversion):
diff --git a/Lib/test/test_string/test_templatelib.py b/Lib/test/test_string/test_templatelib.py
index 85fcff486d6..adaf590e64d 100644
--- a/Lib/test/test_string/test_templatelib.py
+++ b/Lib/test/test_string/test_templatelib.py
@@ -45,6 +45,19 @@ world"""
self.assertEqual(len(t.interpolations), 0)
self.assertEqual(fstring(t), 'Hello,\nworld')
+ def test_interpolation_creation(self):
+ i = Interpolation('Maria', 'name', 'a', 'fmt')
+ self.assertInterpolationEqual(i, ('Maria', 'name', 'a', 'fmt'))
+
+ i = Interpolation('Maria', 'name', 'a')
+ self.assertInterpolationEqual(i, ('Maria', 'name', 'a'))
+
+ i = Interpolation('Maria', 'name')
+ self.assertInterpolationEqual(i, ('Maria', 'name'))
+
+ i = Interpolation('Maria')
+ self.assertInterpolationEqual(i, ('Maria',))
+
def test_creation_interleaving(self):
# Should add strings on either side
t = Template(Interpolation('Maria', 'name', None, ''))
diff --git a/Lib/test/test_typing.py b/Lib/test/test_typing.py
index bef6773ad6c..b1615bbff38 100644
--- a/Lib/test/test_typing.py
+++ b/Lib/test/test_typing.py
@@ -6309,31 +6309,6 @@ class NoTypeCheckTests(BaseTestCase):
class InternalsTests(BaseTestCase):
- def test_deprecation_for_no_type_params_passed_to__evaluate(self):
- with self.assertWarnsRegex(
- DeprecationWarning,
- (
- "Failing to pass a value to the 'type_params' parameter "
- "of 'typing._eval_type' is deprecated"
- )
- ) as cm:
- self.assertEqual(typing._eval_type(list["int"], globals(), {}), list[int])
-
- self.assertEqual(cm.filename, __file__)
-
- f = ForwardRef("int")
-
- with self.assertWarnsRegex(
- DeprecationWarning,
- (
- "Failing to pass a value to the 'type_params' parameter "
- "of 'typing.ForwardRef._evaluate' is deprecated"
- )
- ) as cm:
- self.assertIs(f._evaluate(globals(), {}, recursive_guard=frozenset()), int)
-
- self.assertEqual(cm.filename, __file__)
-
def test_collect_parameters(self):
typing = import_helper.import_fresh_module("typing")
with self.assertWarnsRegex(
@@ -7351,6 +7326,12 @@ class EvaluateForwardRefTests(BaseTestCase):
list[EqualToForwardRef('A')],
)
+ def test_with_module(self):
+ from test.typinganndata import fwdref_module
+
+ typing.evaluate_forward_ref(
+ fwdref_module.fw,)
+
class CollectionsAbcTests(BaseTestCase):
diff --git a/Lib/test/test_unittest/test_async_case.py b/Lib/test/test_unittest/test_async_case.py
index 993e6bf013c..91d45283eb3 100644
--- a/Lib/test/test_unittest/test_async_case.py
+++ b/Lib/test/test_unittest/test_async_case.py
@@ -12,7 +12,7 @@ class MyException(Exception):
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class TestCM:
@@ -480,7 +480,7 @@ class TestAsyncCase(unittest.TestCase):
class TestCase1(unittest.IsolatedAsyncioTestCase):
def setUp(self):
- asyncio._get_event_loop_policy().get_event_loop()
+ asyncio.events._get_event_loop_policy().get_event_loop()
async def test_demo1(self):
pass
@@ -490,7 +490,7 @@ class TestAsyncCase(unittest.TestCase):
self.assertTrue(result.wasSuccessful())
def test_loop_factory(self):
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class TestCase1(unittest.IsolatedAsyncioTestCase):
loop_factory = asyncio.EventLoop
diff --git a/Lib/test/test_unittest/testmock/testasync.py b/Lib/test/test_unittest/testmock/testasync.py
index 0791675b540..dc36ceeb650 100644
--- a/Lib/test/test_unittest/testmock/testasync.py
+++ b/Lib/test/test_unittest/testmock/testasync.py
@@ -15,7 +15,7 @@ from unittest.mock import (ANY, call, AsyncMock, patch, MagicMock, Mock,
def tearDownModule():
- asyncio._set_event_loop_policy(None)
+ asyncio.events._set_event_loop_policy(None)
class AsyncClass:
diff --git a/Lib/test/test_zipfile/_path/test_path.py b/Lib/test/test_zipfile/_path/test_path.py
index 696134023a5..958a586b0dc 100644
--- a/Lib/test/test_zipfile/_path/test_path.py
+++ b/Lib/test/test_zipfile/_path/test_path.py
@@ -316,7 +316,7 @@ class TestPath(unittest.TestCase):
HUGE_ZIPFILE_NUM_ENTRIES = 2**13
def huge_zipfile(self):
- """Create a read-only zipfile with a huge number of entries entries."""
+ """Create a read-only zipfile with a huge number of entries."""
strm = io.BytesIO()
zf = zipfile.ZipFile(strm, "w")
for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
diff --git a/Lib/test/test_zoneinfo/test_zoneinfo_property.py b/Lib/test/test_zoneinfo/test_zoneinfo_property.py
index 294c7e9b27a..c00815e2fd4 100644
--- a/Lib/test/test_zoneinfo/test_zoneinfo_property.py
+++ b/Lib/test/test_zoneinfo/test_zoneinfo_property.py
@@ -147,23 +147,21 @@ class ZoneInfoPickleTest(ZoneInfoTestBase):
def test_pickle_unpickle_cache(self, key):
zi = self.klass(key)
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
- with self.subTest(proto=proto):
- pkl_str = pickle.dumps(zi, proto)
- zi_rt = pickle.loads(pkl_str)
+ pkl_str = pickle.dumps(zi, proto)
+ zi_rt = pickle.loads(pkl_str)
- self.assertIs(zi, zi_rt)
+ self.assertIs(zi, zi_rt)
@hypothesis.given(key=valid_keys())
@add_key_examples
def test_pickle_unpickle_no_cache(self, key):
zi = self.klass.no_cache(key)
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
- with self.subTest(proto=proto):
- pkl_str = pickle.dumps(zi, proto)
- zi_rt = pickle.loads(pkl_str)
+ pkl_str = pickle.dumps(zi, proto)
+ zi_rt = pickle.loads(pkl_str)
- self.assertIsNot(zi, zi_rt)
- self.assertEqual(str(zi), str(zi_rt))
+ self.assertIsNot(zi, zi_rt)
+ self.assertEqual(str(zi), str(zi_rt))
@hypothesis.given(key=valid_keys())
@add_key_examples
diff --git a/Lib/test/test_zstd.py b/Lib/test/test_zstd.py
index 90b2adc9665..6358cc78739 100644
--- a/Lib/test/test_zstd.py
+++ b/Lib/test/test_zstd.py
@@ -2673,8 +2673,12 @@ class FreeThreadingMethodTests(unittest.TestCase):
input = b'a'* (16*_1K)
num_threads = 8
+ # gh-136394: the first output of .compress() includes the frame header
+ # we run the first .compress() call outside of the threaded portion
+ # to make the test order-independent
+
comp = ZstdCompressor()
- parts = []
+ parts = [comp.compress(input, ZstdCompressor.FLUSH_BLOCK)]
for _ in range(num_threads):
res = comp.compress(input, ZstdCompressor.FLUSH_BLOCK)
if res:
@@ -2683,7 +2687,7 @@ class FreeThreadingMethodTests(unittest.TestCase):
expected = b''.join(parts) + rest1
comp = ZstdCompressor()
- output = []
+ output = [comp.compress(input, ZstdCompressor.FLUSH_BLOCK)]
def run_method(method, input_data, output_data):
res = method(input_data, ZstdCompressor.FLUSH_BLOCK)
if res:
diff --git a/Lib/test/typinganndata/fwdref_module.py b/Lib/test/typinganndata/fwdref_module.py
new file mode 100644
index 00000000000..7347a7a4245
--- /dev/null
+++ b/Lib/test/typinganndata/fwdref_module.py
@@ -0,0 +1,6 @@
+from typing import ForwardRef
+
+MyList = list[int]
+MyDict = dict[str, 'MyList']
+
+fw = ForwardRef('MyDict', module=__name__)