diff options
Diffstat (limited to 'Lib/test/test_concurrent_futures/test_process_pool.py')
-rw-r--r-- | Lib/test/test_concurrent_futures/test_process_pool.py | 97 |
1 files changed, 0 insertions, 97 deletions
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 354b7d0a346..8b1bdaa33d8 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,17 +1,13 @@ import os -import queue -import signal import sys import threading import time import unittest -import unittest.mock from concurrent import futures from concurrent.futures.process import BrokenProcessPool from test import support from test.support import hashlib_helper -from test.test_importlib.metadata.fixtures import parameterize from .executor import ExecutorTest, mul from .util import ( @@ -26,19 +22,6 @@ class EventfulGCObj(): def __del__(self): self.event.set() -TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__ -KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__ -FORCE_SHUTDOWN_PARAMS = [ - dict(function_name=TERMINATE_WORKERS), - dict(function_name=KILL_WORKERS), -] - -def _put_sleep_put(queue): - """ Used as part of test_terminate_workers """ - queue.put('started') - time.sleep(2) - queue.put('finished') - class ProcessPoolExecutorTest(ExecutorTest): @@ -235,86 +218,6 @@ class ProcessPoolExecutorTest(ExecutorTest): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() - def test_terminate_workers(self): - mock_fn = unittest.mock.Mock() - with self.executor_type(max_workers=1) as executor: - executor._force_shutdown = mock_fn - executor.terminate_workers() - - mock_fn.assert_called_once_with(operation=futures.process._TERMINATE) - - def test_kill_workers(self): - mock_fn = unittest.mock.Mock() - with self.executor_type(max_workers=1) as executor: - executor._force_shutdown = mock_fn - executor.kill_workers() - - mock_fn.assert_called_once_with(operation=futures.process._KILL) - - def test_force_shutdown_workers_invalid_op(self): - with self.executor_type(max_workers=1) as executor: - self.assertRaises(ValueError, - executor._force_shutdown, - operation='invalid operation'), - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers(self, function_name): - manager = self.get_context().Manager() - q = manager.Queue() - - with self.executor_type(max_workers=1) as executor: - executor.submit(_put_sleep_put, q) - - # We should get started, but not finished since we'll terminate the - # workers just after - self.assertEqual(q.get(timeout=5), 'started') - - worker_process = list(executor._processes.values())[0] - getattr(executor, function_name)() - worker_process.join() - - if function_name == TERMINATE_WORKERS or \ - sys.platform == 'win32': - # On windows, kill and terminate both send SIGTERM - self.assertEqual(worker_process.exitcode, -signal.SIGTERM) - elif function_name == KILL_WORKERS: - self.assertEqual(worker_process.exitcode, -signal.SIGKILL) - else: - self.fail(f"Unknown operation: {function_name}") - - self.assertRaises(queue.Empty, q.get, timeout=1) - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers_dead_workers(self, function_name): - with self.executor_type(max_workers=1) as executor: - future = executor.submit(os._exit, 1) - self.assertRaises(BrokenProcessPool, future.result) - - # even though the pool is broken, this shouldn't raise - getattr(executor, function_name)() - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers_not_started_yet(self, function_name): - ctx = self.get_context() - with unittest.mock.patch.object(ctx, 'Process') as mock_process: - with self.executor_type(max_workers=1, mp_context=ctx) as executor: - # The worker has not been started yet, terminate/kill_workers - # should basically no-op - getattr(executor, function_name)() - - mock_process.return_value.kill.assert_not_called() - mock_process.return_value.terminate.assert_not_called() - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers_stops_pool(self, function_name): - with self.executor_type(max_workers=1) as executor: - task = executor.submit(time.sleep, 0) - self.assertIsNone(task.result()) - - getattr(executor, function_name)() - - self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) - create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, |