aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_concurrent_futures/test_process_pool.py
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-08-24 19:21:44 +0200
committerGitHub <noreply@github.com>2023-08-24 19:21:44 +0200
commitaa6f787faa4bc45006da4dc2f942fb9b82c98836 (patch)
tree0128af159e1ad5b3a44c6104090f677de93fd48b /Lib/test/test_concurrent_futures/test_process_pool.py
parentbbbe1faf7bc6860d4a628e204db944b81dfdbd73 (diff)
downloadcpython-aa6f787faa4bc45006da4dc2f942fb9b82c98836.tar.gz
cpython-aa6f787faa4bc45006da4dc2f942fb9b82c98836.zip
gh-108388: Convert test_concurrent_futures to package (#108401)
Convert test_concurrent_futures to a package of sub-tests.
Diffstat (limited to 'Lib/test/test_concurrent_futures/test_process_pool.py')
-rw-r--r--Lib/test/test_concurrent_futures/test_process_pool.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py
new file mode 100644
index 00000000000..7763a4946f1
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -0,0 +1,202 @@
+import os
+import sys
+import time
+import unittest
+from concurrent import futures
+from concurrent.futures.process import BrokenProcessPool
+
+from test import support
+from test.support import hashlib_helper
+
+from .executor import ExecutorTest, mul
+from .util import (
+ ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
+ create_executor_tests, setup_module)
+
+
+class EventfulGCObj():
+ def __init__(self, mgr):
+ self.event = mgr.Event()
+
+ def __del__(self):
+ self.event.set()
+
+
+class ProcessPoolExecutorTest(ExecutorTest):
+
+ @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
+ def test_max_workers_too_large(self):
+ with self.assertRaisesRegex(ValueError,
+ "max_workers must be <= 61"):
+ futures.ProcessPoolExecutor(max_workers=62)
+
+ def test_killed_child(self):
+ # When a child process is abruptly terminated, the whole pool gets
+ # "broken".
+ futures = [self.executor.submit(time.sleep, 3)]
+ # Get one of the processes, and terminate (kill) it
+ p = next(iter(self.executor._processes.values()))
+ p.terminate()
+ for fut in futures:
+ self.assertRaises(BrokenProcessPool, fut.result)
+ # Submitting other jobs fails as well.
+ self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
+
+ def test_map_chunksize(self):
+ def bad_map():
+ list(self.executor.map(pow, range(40), range(40), chunksize=-1))
+
+ ref = list(map(pow, range(40), range(40)))
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=6)),
+ ref)
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=50)),
+ ref)
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=40)),
+ ref)
+ self.assertRaises(ValueError, bad_map)
+
+ @classmethod
+ def _test_traceback(cls):
+ raise RuntimeError(123) # some comment
+
+ def test_traceback(self):
+ # We want ensure that the traceback from the child process is
+ # contained in the traceback raised in the main process.
+ future = self.executor.submit(self._test_traceback)
+ with self.assertRaises(Exception) as cm:
+ future.result()
+
+ exc = cm.exception
+ self.assertIs(type(exc), RuntimeError)
+ self.assertEqual(exc.args, (123,))
+ cause = exc.__cause__
+ self.assertIs(type(cause), futures.process._RemoteTraceback)
+ self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
+
+ with support.captured_stderr() as f1:
+ try:
+ raise exc
+ except RuntimeError:
+ sys.excepthook(*sys.exc_info())
+ self.assertIn('raise RuntimeError(123) # some comment',
+ f1.getvalue())
+
+ @hashlib_helper.requires_hashdigest('md5')
+ def test_ressources_gced_in_workers(self):
+ # Ensure that argument for a job are correctly gc-ed after the job
+ # is finished
+ mgr = self.get_context().Manager()
+ obj = EventfulGCObj(mgr)
+ future = self.executor.submit(id, obj)
+ future.result()
+
+ self.assertTrue(obj.event.wait(timeout=1))
+
+ # explicitly destroy the object to ensure that EventfulGCObj.__del__()
+ # is called while manager is still running.
+ obj = None
+ support.gc_collect()
+
+ mgr.shutdown()
+ mgr.join()
+
+ def test_saturation(self):
+ executor = self.executor
+ mp_context = self.get_context()
+ sem = mp_context.Semaphore(0)
+ job_count = 15 * executor._max_workers
+ for _ in range(job_count):
+ executor.submit(sem.acquire)
+ self.assertEqual(len(executor._processes), executor._max_workers)
+ for _ in range(job_count):
+ sem.release()
+
+ def test_idle_process_reuse_one(self):
+ executor = self.executor
+ assert executor._max_workers >= 4
+ if self.get_context().get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._processes), 1)
+
+ def test_idle_process_reuse_multiple(self):
+ executor = self.executor
+ assert executor._max_workers <= 5
+ if self.get_context().get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
+ executor.submit(mul, 12, 7).result()
+ executor.submit(mul, 33, 25)
+ executor.submit(mul, 25, 26).result()
+ executor.submit(mul, 18, 29)
+ executor.submit(mul, 1, 2).result()
+ executor.submit(mul, 0, 9)
+ self.assertLessEqual(len(executor._processes), 3)
+ executor.shutdown()
+
+ def test_max_tasks_per_child(self):
+ context = self.get_context()
+ if context.get_start_method(allow_none=False) == "fork":
+ with self.assertRaises(ValueError):
+ self.executor_type(1, mp_context=context, max_tasks_per_child=3)
+ return
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(
+ 1, mp_context=context, max_tasks_per_child=3)
+ f1 = executor.submit(os.getpid)
+ original_pid = f1.result()
+ # The worker pid remains the same as the worker could be reused
+ f2 = executor.submit(os.getpid)
+ self.assertEqual(f2.result(), original_pid)
+ self.assertEqual(len(executor._processes), 1)
+ f3 = executor.submit(os.getpid)
+ self.assertEqual(f3.result(), original_pid)
+
+ # A new worker is spawned, with a statistically different pid,
+ # while the previous was reaped.
+ f4 = executor.submit(os.getpid)
+ new_pid = f4.result()
+ self.assertNotEqual(original_pid, new_pid)
+ self.assertEqual(len(executor._processes), 1)
+
+ executor.shutdown()
+
+ def test_max_tasks_per_child_defaults_to_spawn_context(self):
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(1, max_tasks_per_child=3)
+ self.assertEqual(executor._mp_context.get_start_method(), "spawn")
+
+ def test_max_tasks_early_shutdown(self):
+ context = self.get_context()
+ if context.get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(
+ 3, mp_context=context, max_tasks_per_child=1)
+ futures = []
+ for i in range(6):
+ futures.append(executor.submit(mul, i, i))
+ executor.shutdown()
+ for i, future in enumerate(futures):
+ self.assertEqual(future.result(), mul(i, i))
+
+
+create_executor_tests(globals(), ProcessPoolExecutorTest,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()