aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_concurrent_futures/test_process_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_concurrent_futures/test_process_pool.py')
-rw-r--r--Lib/test/test_concurrent_futures/test_process_pool.py97
1 files changed, 0 insertions, 97 deletions
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py
index 354b7d0a346..8b1bdaa33d8 100644
--- a/Lib/test/test_concurrent_futures/test_process_pool.py
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -1,17 +1,13 @@
import os
-import queue
-import signal
import sys
import threading
import time
import unittest
-import unittest.mock
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool
from test import support
from test.support import hashlib_helper
-from test.test_importlib.metadata.fixtures import parameterize
from .executor import ExecutorTest, mul
from .util import (
@@ -26,19 +22,6 @@ class EventfulGCObj():
def __del__(self):
self.event.set()
-TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__
-KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__
-FORCE_SHUTDOWN_PARAMS = [
- dict(function_name=TERMINATE_WORKERS),
- dict(function_name=KILL_WORKERS),
-]
-
-def _put_sleep_put(queue):
- """ Used as part of test_terminate_workers """
- queue.put('started')
- time.sleep(2)
- queue.put('finished')
-
class ProcessPoolExecutorTest(ExecutorTest):
@@ -235,86 +218,6 @@ class ProcessPoolExecutorTest(ExecutorTest):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()
- def test_terminate_workers(self):
- mock_fn = unittest.mock.Mock()
- with self.executor_type(max_workers=1) as executor:
- executor._force_shutdown = mock_fn
- executor.terminate_workers()
-
- mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)
-
- def test_kill_workers(self):
- mock_fn = unittest.mock.Mock()
- with self.executor_type(max_workers=1) as executor:
- executor._force_shutdown = mock_fn
- executor.kill_workers()
-
- mock_fn.assert_called_once_with(operation=futures.process._KILL)
-
- def test_force_shutdown_workers_invalid_op(self):
- with self.executor_type(max_workers=1) as executor:
- self.assertRaises(ValueError,
- executor._force_shutdown,
- operation='invalid operation'),
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers(self, function_name):
- manager = self.get_context().Manager()
- q = manager.Queue()
-
- with self.executor_type(max_workers=1) as executor:
- executor.submit(_put_sleep_put, q)
-
- # We should get started, but not finished since we'll terminate the
- # workers just after
- self.assertEqual(q.get(timeout=5), 'started')
-
- worker_process = list(executor._processes.values())[0]
- getattr(executor, function_name)()
- worker_process.join()
-
- if function_name == TERMINATE_WORKERS or \
- sys.platform == 'win32':
- # On windows, kill and terminate both send SIGTERM
- self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
- elif function_name == KILL_WORKERS:
- self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
- else:
- self.fail(f"Unknown operation: {function_name}")
-
- self.assertRaises(queue.Empty, q.get, timeout=1)
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers_dead_workers(self, function_name):
- with self.executor_type(max_workers=1) as executor:
- future = executor.submit(os._exit, 1)
- self.assertRaises(BrokenProcessPool, future.result)
-
- # even though the pool is broken, this shouldn't raise
- getattr(executor, function_name)()
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers_not_started_yet(self, function_name):
- ctx = self.get_context()
- with unittest.mock.patch.object(ctx, 'Process') as mock_process:
- with self.executor_type(max_workers=1, mp_context=ctx) as executor:
- # The worker has not been started yet, terminate/kill_workers
- # should basically no-op
- getattr(executor, function_name)()
-
- mock_process.return_value.kill.assert_not_called()
- mock_process.return_value.terminate.assert_not_called()
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers_stops_pool(self, function_name):
- with self.executor_type(max_workers=1) as executor:
- task = executor.submit(time.sleep, 0)
- self.assertIsNone(task.result())
-
- getattr(executor, function_name)()
-
- self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
-
create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,