diff options
Diffstat (limited to 'Lib/concurrent/futures/interpreter.py')
-rw-r--r-- | Lib/concurrent/futures/interpreter.py | 181 |
1 files changed, 45 insertions, 136 deletions
diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index a2c4fbfd3fb..cbb60ce80c1 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -1,56 +1,39 @@ """Implements InterpreterPoolExecutor.""" -import contextlib -import pickle +from concurrent import interpreters +import sys import textwrap from . import thread as _thread -import _interpreters -import _interpqueues +import traceback -class ExecutionFailed(_interpreters.InterpreterError): - """An unhandled exception happened during execution.""" - - def __init__(self, excinfo): - msg = excinfo.formatted - if not msg: - if excinfo.type and excinfo.msg: - msg = f'{excinfo.type.__name__}: {excinfo.msg}' - else: - msg = excinfo.type.__name__ or excinfo.msg - super().__init__(msg) - self.excinfo = excinfo - - def __str__(self): +def do_call(results, func, args, kwargs): + try: + return func(*args, **kwargs) + except BaseException as exc: + # Send the captured exception out on the results queue, + # but still leave it unhandled for the interpreter to handle. try: - formatted = self.excinfo.errdisplay - except Exception: - return super().__str__() - else: - return textwrap.dedent(f""" -{super().__str__()} - -Uncaught in the interpreter: - -{formatted} - """.strip()) + results.put(exc) + except interpreters.NotShareableError: + # The exception is not shareable. + print('exception is not shareable:', file=sys.stderr) + traceback.print_exception(exc) + results.put(None) + raise # re-raise class WorkerContext(_thread.WorkerContext): @classmethod - def prepare(cls, initializer, initargs, shared): + def prepare(cls, initializer, initargs): def resolve_task(fn, args, kwargs): if isinstance(fn, str): # XXX Circle back to this later. raise TypeError('scripts not supported') else: - # Functions defined in the __main__ module can't be pickled, - # so they can't be used here. In the future, we could possibly - # borrow from multiprocessing to work around this. task = (fn, args, kwargs) - data = pickle.dumps(task) - return data + return task if initializer is not None: try: @@ -62,68 +45,24 @@ class WorkerContext(_thread.WorkerContext): else: initdata = None def create_context(): - return cls(initdata, shared) + return cls(initdata) return create_context, resolve_task - @classmethod - @contextlib.contextmanager - def _capture_exc(cls, resultsid): - try: - yield - except BaseException as exc: - # Send the captured exception out on the results queue, - # but still leave it unhandled for the interpreter to handle. - _interpqueues.put(resultsid, (None, exc)) - raise # re-raise - - @classmethod - def _send_script_result(cls, resultsid): - _interpqueues.put(resultsid, (None, None)) - - @classmethod - def _call(cls, func, args, kwargs, resultsid): - with cls._capture_exc(resultsid): - res = func(*args or (), **kwargs or {}) - # Send the result back. - with cls._capture_exc(resultsid): - _interpqueues.put(resultsid, (res, None)) - - @classmethod - def _call_pickled(cls, pickled, resultsid): - with cls._capture_exc(resultsid): - fn, args, kwargs = pickle.loads(pickled) - cls._call(fn, args, kwargs, resultsid) - - def __init__(self, initdata, shared=None): + def __init__(self, initdata): self.initdata = initdata - self.shared = dict(shared) if shared else None - self.interpid = None - self.resultsid = None + self.interp = None + self.results = None def __del__(self): - if self.interpid is not None: + if self.interp is not None: self.finalize() - def _exec(self, script): - assert self.interpid is not None - excinfo = _interpreters.exec(self.interpid, script, restrict=True) - if excinfo is not None: - raise ExecutionFailed(excinfo) - def initialize(self): - assert self.interpid is None, self.interpid - self.interpid = _interpreters.create(reqrefs=True) + assert self.interp is None, self.interp + self.interp = interpreters.create() try: - _interpreters.incref(self.interpid) - maxsize = 0 - self.resultsid = _interpqueues.create(maxsize) - - self._exec(f'from {__name__} import WorkerContext') - - if self.shared: - _interpreters.set___main___attrs( - self.interpid, self.shared, restrict=True) + self.results = interpreters.create_queue(maxsize) if self.initdata: self.run(self.initdata) @@ -132,53 +71,25 @@ class WorkerContext(_thread.WorkerContext): raise # re-raise def finalize(self): - interpid = self.interpid - resultsid = self.resultsid - self.resultsid = None - self.interpid = None - if resultsid is not None: - try: - _interpqueues.destroy(resultsid) - except _interpqueues.QueueNotFoundError: - pass - if interpid is not None: - try: - _interpreters.decref(interpid) - except _interpreters.InterpreterNotFoundError: - pass + interp = self.interp + results = self.results + self.results = None + self.interp = None + if results is not None: + del results + if interp is not None: + interp.close() def run(self, task): - data = task - script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})' - try: - self._exec(script) - except ExecutionFailed as exc: - exc_wrapper = exc - else: - exc_wrapper = None - - # Return the result, or raise the exception. - while True: - try: - obj = _interpqueues.get(self.resultsid) - except _interpqueues.QueueNotFoundError: + return self.interp.call(do_call, self.results, *task) + except interpreters.ExecutionFailed as wrapper: + # Wait for the exception data to show up. + exc = self.results.get() + if exc is None: + # The exception must have been not shareable. raise # re-raise - except _interpqueues.QueueError: - continue - except ModuleNotFoundError: - # interpreters.queues doesn't exist, which means - # QueueEmpty doesn't. Act as though it does. - continue - else: - break - (res, exc), unboundop = obj - assert unboundop is None, unboundop - if exc is not None: - assert res is None, res - assert exc_wrapper is not None - raise exc from exc_wrapper - return res + raise exc from wrapper class BrokenInterpreterPool(_thread.BrokenThreadPool): @@ -192,11 +103,11 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor): BROKEN = BrokenInterpreterPool @classmethod - def prepare_context(cls, initializer, initargs, shared): - return WorkerContext.prepare(initializer, initargs, shared) + def prepare_context(cls, initializer, initargs): + return WorkerContext.prepare(initializer, initargs) def __init__(self, max_workers=None, thread_name_prefix='', - initializer=None, initargs=(), shared=None): + initializer=None, initargs=()): """Initializes a new InterpreterPoolExecutor instance. Args: @@ -206,8 +117,6 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor): initializer: A callable or script used to initialize each worker interpreter. initargs: A tuple of arguments to pass to the initializer. - shared: A mapping of shareabled objects to be inserted into - each worker interpreter. """ super().__init__(max_workers, thread_name_prefix, - initializer, initargs, shared=shared) + initializer, initargs) |