aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/concurrent/futures/interpreter.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures/interpreter.py')
-rw-r--r--Lib/concurrent/futures/interpreter.py181
1 files changed, 45 insertions, 136 deletions
diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py
index a2c4fbfd3fb..cbb60ce80c1 100644
--- a/Lib/concurrent/futures/interpreter.py
+++ b/Lib/concurrent/futures/interpreter.py
@@ -1,56 +1,39 @@
"""Implements InterpreterPoolExecutor."""
-import contextlib
-import pickle
+from concurrent import interpreters
+import sys
import textwrap
from . import thread as _thread
-import _interpreters
-import _interpqueues
+import traceback
-class ExecutionFailed(_interpreters.InterpreterError):
- """An unhandled exception happened during execution."""
-
- def __init__(self, excinfo):
- msg = excinfo.formatted
- if not msg:
- if excinfo.type and excinfo.msg:
- msg = f'{excinfo.type.__name__}: {excinfo.msg}'
- else:
- msg = excinfo.type.__name__ or excinfo.msg
- super().__init__(msg)
- self.excinfo = excinfo
-
- def __str__(self):
+def do_call(results, func, args, kwargs):
+ try:
+ return func(*args, **kwargs)
+ except BaseException as exc:
+ # Send the captured exception out on the results queue,
+ # but still leave it unhandled for the interpreter to handle.
try:
- formatted = self.excinfo.errdisplay
- except Exception:
- return super().__str__()
- else:
- return textwrap.dedent(f"""
-{super().__str__()}
-
-Uncaught in the interpreter:
-
-{formatted}
- """.strip())
+ results.put(exc)
+ except interpreters.NotShareableError:
+ # The exception is not shareable.
+ print('exception is not shareable:', file=sys.stderr)
+ traceback.print_exception(exc)
+ results.put(None)
+ raise # re-raise
class WorkerContext(_thread.WorkerContext):
@classmethod
- def prepare(cls, initializer, initargs, shared):
+ def prepare(cls, initializer, initargs):
def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
else:
- # Functions defined in the __main__ module can't be pickled,
- # so they can't be used here. In the future, we could possibly
- # borrow from multiprocessing to work around this.
task = (fn, args, kwargs)
- data = pickle.dumps(task)
- return data
+ return task
if initializer is not None:
try:
@@ -62,68 +45,24 @@ class WorkerContext(_thread.WorkerContext):
else:
initdata = None
def create_context():
- return cls(initdata, shared)
+ return cls(initdata)
return create_context, resolve_task
- @classmethod
- @contextlib.contextmanager
- def _capture_exc(cls, resultsid):
- try:
- yield
- except BaseException as exc:
- # Send the captured exception out on the results queue,
- # but still leave it unhandled for the interpreter to handle.
- _interpqueues.put(resultsid, (None, exc))
- raise # re-raise
-
- @classmethod
- def _send_script_result(cls, resultsid):
- _interpqueues.put(resultsid, (None, None))
-
- @classmethod
- def _call(cls, func, args, kwargs, resultsid):
- with cls._capture_exc(resultsid):
- res = func(*args or (), **kwargs or {})
- # Send the result back.
- with cls._capture_exc(resultsid):
- _interpqueues.put(resultsid, (res, None))
-
- @classmethod
- def _call_pickled(cls, pickled, resultsid):
- with cls._capture_exc(resultsid):
- fn, args, kwargs = pickle.loads(pickled)
- cls._call(fn, args, kwargs, resultsid)
-
- def __init__(self, initdata, shared=None):
+ def __init__(self, initdata):
self.initdata = initdata
- self.shared = dict(shared) if shared else None
- self.interpid = None
- self.resultsid = None
+ self.interp = None
+ self.results = None
def __del__(self):
- if self.interpid is not None:
+ if self.interp is not None:
self.finalize()
- def _exec(self, script):
- assert self.interpid is not None
- excinfo = _interpreters.exec(self.interpid, script, restrict=True)
- if excinfo is not None:
- raise ExecutionFailed(excinfo)
-
def initialize(self):
- assert self.interpid is None, self.interpid
- self.interpid = _interpreters.create(reqrefs=True)
+ assert self.interp is None, self.interp
+ self.interp = interpreters.create()
try:
- _interpreters.incref(self.interpid)
-
maxsize = 0
- self.resultsid = _interpqueues.create(maxsize)
-
- self._exec(f'from {__name__} import WorkerContext')
-
- if self.shared:
- _interpreters.set___main___attrs(
- self.interpid, self.shared, restrict=True)
+ self.results = interpreters.create_queue(maxsize)
if self.initdata:
self.run(self.initdata)
@@ -132,53 +71,25 @@ class WorkerContext(_thread.WorkerContext):
raise # re-raise
def finalize(self):
- interpid = self.interpid
- resultsid = self.resultsid
- self.resultsid = None
- self.interpid = None
- if resultsid is not None:
- try:
- _interpqueues.destroy(resultsid)
- except _interpqueues.QueueNotFoundError:
- pass
- if interpid is not None:
- try:
- _interpreters.decref(interpid)
- except _interpreters.InterpreterNotFoundError:
- pass
+ interp = self.interp
+ results = self.results
+ self.results = None
+ self.interp = None
+ if results is not None:
+ del results
+ if interp is not None:
+ interp.close()
def run(self, task):
- data = task
- script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
-
try:
- self._exec(script)
- except ExecutionFailed as exc:
- exc_wrapper = exc
- else:
- exc_wrapper = None
-
- # Return the result, or raise the exception.
- while True:
- try:
- obj = _interpqueues.get(self.resultsid)
- except _interpqueues.QueueNotFoundError:
+ return self.interp.call(do_call, self.results, *task)
+ except interpreters.ExecutionFailed as wrapper:
+ # Wait for the exception data to show up.
+ exc = self.results.get()
+ if exc is None:
+ # The exception must have been not shareable.
raise # re-raise
- except _interpqueues.QueueError:
- continue
- except ModuleNotFoundError:
- # interpreters.queues doesn't exist, which means
- # QueueEmpty doesn't. Act as though it does.
- continue
- else:
- break
- (res, exc), unboundop = obj
- assert unboundop is None, unboundop
- if exc is not None:
- assert res is None, res
- assert exc_wrapper is not None
- raise exc from exc_wrapper
- return res
+ raise exc from wrapper
class BrokenInterpreterPool(_thread.BrokenThreadPool):
@@ -192,11 +103,11 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
BROKEN = BrokenInterpreterPool
@classmethod
- def prepare_context(cls, initializer, initargs, shared):
- return WorkerContext.prepare(initializer, initargs, shared)
+ def prepare_context(cls, initializer, initargs):
+ return WorkerContext.prepare(initializer, initargs)
def __init__(self, max_workers=None, thread_name_prefix='',
- initializer=None, initargs=(), shared=None):
+ initializer=None, initargs=()):
"""Initializes a new InterpreterPoolExecutor instance.
Args:
@@ -206,8 +117,6 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
initializer: A callable or script used to initialize
each worker interpreter.
initargs: A tuple of arguments to pass to the initializer.
- shared: A mapping of shareabled objects to be inserted into
- each worker interpreter.
"""
super().__init__(max_workers, thread_name_prefix,
- initializer, initargs, shared=shared)
+ initializer, initargs)