diff options
Diffstat (limited to 'Lib/test/test_concurrent_futures/test_interpreter_pool.py')
-rw-r--r-- | Lib/test/test_concurrent_futures/test_interpreter_pool.py | 265 |
1 files changed, 213 insertions, 52 deletions
diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py index f6c62ae4b20..844dfdd6fc9 100644 --- a/Lib/test/test_concurrent_futures/test_interpreter_pool.py +++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py @@ -2,35 +2,78 @@ import asyncio import contextlib import io import os -import pickle +import sys import time import unittest -from concurrent.futures.interpreter import ( - ExecutionFailed, BrokenInterpreterPool, -) +from concurrent.futures.interpreter import BrokenInterpreterPool +from concurrent import interpreters +from concurrent.interpreters import _queues as queues import _interpreters from test import support +from test.support import os_helper +from test.support import script_helper import test.test_asyncio.utils as testasyncio_utils -from test.support.interpreters import queues from .executor import ExecutorTest, mul from .util import BaseTestCase, InterpreterPoolMixin, setup_module +WINDOWS = sys.platform.startswith('win') + + +@contextlib.contextmanager +def nonblocking(fd): + blocking = os.get_blocking(fd) + if blocking: + os.set_blocking(fd, False) + try: + yield + finally: + if blocking: + os.set_blocking(fd, blocking) + + +def read_file_with_timeout(fd, nbytes, timeout): + with nonblocking(fd): + end = time.time() + timeout + try: + return os.read(fd, nbytes) + except BlockingIOError: + pass + while time.time() < end: + try: + return os.read(fd, nbytes) + except BlockingIOError: + continue + else: + raise TimeoutError('nothing to read') + + +if not WINDOWS: + import select + def read_file_with_timeout(fd, nbytes, timeout): + r, _, _ = select.select([fd], [], [], timeout) + if fd not in r: + raise TimeoutError('nothing to read') + return os.read(fd, nbytes) + + def noop(): pass def write_msg(fd, msg): + import os os.write(fd, msg + b'\0') -def read_msg(fd): +def read_msg(fd, timeout=10.0): msg = b'' - while ch := os.read(fd, 1): - if ch == b'\0': - return msg + ch = read_file_with_timeout(fd, 1, timeout) + while ch != b'\0': msg += ch + ch = os.read(fd, 1) + return msg def get_current_name(): @@ -113,6 +156,38 @@ class InterpreterPoolExecutorTest( self.assertEqual(before, b'\0') self.assertEqual(after, msg) + def test_init_with___main___global(self): + # See https://github.com/python/cpython/pull/133957#issuecomment-2927415311. + text = """if True: + from concurrent.futures import InterpreterPoolExecutor + + INITIALIZER_STATUS = 'uninitialized' + + def init(x): + global INITIALIZER_STATUS + INITIALIZER_STATUS = x + INITIALIZER_STATUS + + def get_init_status(): + return INITIALIZER_STATUS + + if __name__ == "__main__": + exe = InterpreterPoolExecutor(initializer=init, + initargs=('initialized',)) + fut = exe.submit(get_init_status) + print(fut.result()) # 'initialized' + exe.shutdown(wait=True) + print(INITIALIZER_STATUS) # 'uninitialized' + """ + with os_helper.temp_dir() as tempdir: + filename = script_helper.make_script(tempdir, 'my-script', text) + res = script_helper.assert_python_ok(filename) + stdout = res.out.decode('utf-8').strip() + self.assertEqual(stdout.splitlines(), [ + 'initialized', + 'uninitialized', + ]) + def test_init_closure(self): count = 0 def init1(): @@ -121,10 +196,19 @@ class InterpreterPoolExecutorTest( nonlocal count count += 1 - with self.assertRaises(pickle.PicklingError): - self.executor_type(initializer=init1) - with self.assertRaises(pickle.PicklingError): - self.executor_type(initializer=init2) + with contextlib.redirect_stderr(io.StringIO()) as stderr: + with self.executor_type(initializer=init1) as executor: + fut = executor.submit(lambda: None) + self.assertIn('NotShareableError', stderr.getvalue()) + with self.assertRaises(BrokenInterpreterPool): + fut.result() + + with contextlib.redirect_stderr(io.StringIO()) as stderr: + with self.executor_type(initializer=init2) as executor: + fut = executor.submit(lambda: None) + self.assertIn('NotShareableError', stderr.getvalue()) + with self.assertRaises(BrokenInterpreterPool): + fut.result() def test_init_instance_method(self): class Spam: @@ -132,26 +216,12 @@ class InterpreterPoolExecutorTest( raise NotImplementedError spam = Spam() - with self.assertRaises(pickle.PicklingError): - self.executor_type(initializer=spam.initializer) - - def test_init_shared(self): - msg = b'eggs' - r, w = self.pipe() - script = f"""if True: - import os - if __name__ != '__main__': - import __main__ - spam = __main__.spam - os.write({w}, spam + b'\\0') - """ - - executor = self.executor_type(shared={'spam': msg}) - fut = executor.submit(exec, script) - fut.result() - after = read_msg(r) - - self.assertEqual(after, msg) + with contextlib.redirect_stderr(io.StringIO()) as stderr: + with self.executor_type(initializer=spam.initializer) as executor: + fut = executor.submit(lambda: None) + self.assertIn('NotShareableError', stderr.getvalue()) + with self.assertRaises(BrokenInterpreterPool): + fut.result() @unittest.expectedFailure def test_init_exception_in_script(self): @@ -178,8 +248,6 @@ class InterpreterPoolExecutorTest( stderr = stderr.getvalue() self.assertIn('ExecutionFailed: Exception: spam', stderr) self.assertIn('Uncaught in the interpreter:', stderr) - self.assertIn('The above exception was the direct cause of the following exception:', - stderr) @unittest.expectedFailure def test_submit_script(self): @@ -208,10 +276,14 @@ class InterpreterPoolExecutorTest( return spam executor = self.executor_type() - with self.assertRaises(pickle.PicklingError): - executor.submit(task1) - with self.assertRaises(pickle.PicklingError): - executor.submit(task2) + + fut = executor.submit(task1) + with self.assertRaises(_interpreters.NotShareableError): + fut.result() + + fut = executor.submit(task2) + with self.assertRaises(_interpreters.NotShareableError): + fut.result() def test_submit_local_instance(self): class Spam: @@ -219,8 +291,9 @@ class InterpreterPoolExecutorTest( self.value = True executor = self.executor_type() - with self.assertRaises(pickle.PicklingError): - executor.submit(Spam) + fut = executor.submit(Spam) + with self.assertRaises(_interpreters.NotShareableError): + fut.result() def test_submit_instance_method(self): class Spam: @@ -229,8 +302,9 @@ class InterpreterPoolExecutorTest( spam = Spam() executor = self.executor_type() - with self.assertRaises(pickle.PicklingError): - executor.submit(spam.run) + fut = executor.submit(spam.run) + with self.assertRaises(_interpreters.NotShareableError): + fut.result() def test_submit_func_globals(self): executor = self.executor_type() @@ -242,13 +316,14 @@ class InterpreterPoolExecutorTest( @unittest.expectedFailure def test_submit_exception_in_script(self): + # Scripts are not supported currently. fut = self.executor.submit('raise Exception("spam")') with self.assertRaises(Exception) as captured: fut.result() self.assertIs(type(captured.exception), Exception) self.assertEqual(str(captured.exception), 'spam') cause = captured.exception.__cause__ - self.assertIs(type(cause), ExecutionFailed) + self.assertIs(type(cause), interpreters.ExecutionFailed) for attr in ('__name__', '__qualname__', '__module__'): self.assertEqual(getattr(cause.excinfo.type, attr), getattr(Exception, attr)) @@ -261,7 +336,7 @@ class InterpreterPoolExecutorTest( self.assertIs(type(captured.exception), Exception) self.assertEqual(str(captured.exception), 'spam') cause = captured.exception.__cause__ - self.assertIs(type(cause), ExecutionFailed) + self.assertIs(type(cause), interpreters.ExecutionFailed) for attr in ('__name__', '__qualname__', '__module__'): self.assertEqual(getattr(cause.excinfo.type, attr), getattr(Exception, attr)) @@ -269,16 +344,93 @@ class InterpreterPoolExecutorTest( def test_saturation(self): blocker = queues.create() - executor = self.executor_type(4, shared=dict(blocker=blocker)) + executor = self.executor_type(4) for i in range(15 * executor._max_workers): - executor.submit(exec, 'import __main__; __main__.blocker.get()') - #executor.submit('blocker.get()') + executor.submit(blocker.get) self.assertEqual(len(executor._threads), executor._max_workers) for i in range(15 * executor._max_workers): blocker.put_nowait(None) executor.shutdown(wait=True) + def test_blocking(self): + # There is no guarantee that a worker will be created for every + # submitted task. That's because there's a race between: + # + # * a new worker thread, created when task A was just submitted, + # becoming non-idle when it picks up task A + # * after task B is added to the queue, a new worker thread + # is started only if there are no idle workers + # (the check in ThreadPoolExecutor._adjust_thread_count()) + # + # That means we must not block waiting for *all* tasks to report + # "ready" before we unblock the known-ready workers. + ready = queues.create() + blocker = queues.create() + + def run(taskid, ready, blocker): + # There can't be any globals here. + ready.put_nowait(taskid) + blocker.get() # blocking + + numtasks = 10 + futures = [] + with self.executor_type() as executor: + # Request the jobs. + for i in range(numtasks): + fut = executor.submit(run, i, ready, blocker) + futures.append(fut) + pending = numtasks + while pending > 0: + # Wait for any to be ready. + done = 0 + for _ in range(pending): + try: + ready.get(timeout=1) # blocking + except interpreters.QueueEmpty: + pass + else: + done += 1 + pending -= done + # Unblock the workers. + for _ in range(done): + blocker.put_nowait(None) + + def test_blocking_with_limited_workers(self): + # This is essentially the same as test_blocking, + # but we explicitly force a limited number of workers, + # instead of it happening implicitly sometimes due to a race. + ready = queues.create() + blocker = queues.create() + + def run(taskid, ready, blocker): + # There can't be any globals here. + ready.put_nowait(taskid) + blocker.get() # blocking + + numtasks = 10 + futures = [] + with self.executor_type(4) as executor: + # Request the jobs. + for i in range(numtasks): + fut = executor.submit(run, i, ready, blocker) + futures.append(fut) + pending = numtasks + while pending > 0: + # Wait for any to be ready. + done = 0 + for _ in range(pending): + try: + ready.get(timeout=1) # blocking + except interpreters.QueueEmpty: + pass + else: + done += 1 + pending -= done + # Unblock the workers. + for _ in range(done): + blocker.put_nowait(None) + @support.requires_gil_enabled("gh-117344: test is flaky without the GIL") def test_idle_thread_reuse(self): executor = self.executor_type() @@ -289,12 +441,21 @@ class InterpreterPoolExecutorTest( executor.shutdown(wait=True) def test_pickle_errors_propagate(self): - # GH-125864: Pickle errors happen before the script tries to execute, so the - # queue used to wait infinitely. - + # GH-125864: Pickle errors happen before the script tries to execute, + # so the queue used to wait infinitely. fut = self.executor.submit(PickleShenanigans(0)) - with self.assertRaisesRegex(RuntimeError, "gotcha"): + expected = interpreters.NotShareableError + with self.assertRaisesRegex(expected, 'args not shareable') as cm: fut.result() + self.assertRegex(str(cm.exception.__cause__), 'unpickled') + + def test_no_stale_references(self): + # Weak references don't cross between interpreters. + raise unittest.SkipTest('not applicable') + + def test_free_reference(self): + # Weak references don't cross between interpreters. + raise unittest.SkipTest('not applicable') class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase): |