aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/concurrent/futures/interpreter.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures/interpreter.py')
-rw-r--r--Lib/concurrent/futures/interpreter.py55
1 files changed, 13 insertions, 42 deletions
diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py
index d17688dc9d7..a2c4fbfd3fb 100644
--- a/Lib/concurrent/futures/interpreter.py
+++ b/Lib/concurrent/futures/interpreter.py
@@ -36,9 +36,6 @@ Uncaught in the interpreter:
""".strip())
-UNBOUND = 2 # error; this should not happen.
-
-
class WorkerContext(_thread.WorkerContext):
@classmethod
@@ -47,23 +44,13 @@ class WorkerContext(_thread.WorkerContext):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
- if args or kwargs:
- raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
- data = textwrap.dedent(fn)
- kind = 'script'
- # Make sure the script compiles.
- # Ideally we wouldn't throw away the resulting code
- # object. However, there isn't much to be done until
- # code objects are shareable and/or we do a better job
- # of supporting code objects in _interpreters.exec().
- compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
- data = pickle.dumps((fn, args, kwargs))
- kind = 'function'
- return (data, kind)
+ task = (fn, args, kwargs)
+ data = pickle.dumps(task)
+ return data
if initializer is not None:
try:
@@ -86,24 +73,20 @@ class WorkerContext(_thread.WorkerContext):
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
- err = pickle.dumps(exc)
- _interpqueues.put(resultsid, (None, err), 1, UNBOUND)
+ _interpqueues.put(resultsid, (None, exc))
raise # re-raise
@classmethod
def _send_script_result(cls, resultsid):
- _interpqueues.put(resultsid, (None, None), 0, UNBOUND)
+ _interpqueues.put(resultsid, (None, None))
@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
- try:
- _interpqueues.put(resultsid, (res, None), 0, UNBOUND)
- except _interpreters.NotShareableError:
- res = pickle.dumps(res)
- _interpqueues.put(resultsid, (res, None), 1, UNBOUND)
+ with cls._capture_exc(resultsid):
+ _interpqueues.put(resultsid, (res, None))
@classmethod
def _call_pickled(cls, pickled, resultsid):
@@ -134,8 +117,7 @@ class WorkerContext(_thread.WorkerContext):
_interpreters.incref(self.interpid)
maxsize = 0
- fmt = 0
- self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
+ self.resultsid = _interpqueues.create(maxsize)
self._exec(f'from {__name__} import WorkerContext')
@@ -166,17 +148,8 @@ class WorkerContext(_thread.WorkerContext):
pass
def run(self, task):
- data, kind = task
- if kind == 'script':
- raise NotImplementedError('script kind disabled')
- script = f"""
-with WorkerContext._capture_exc({self.resultsid}):
-{textwrap.indent(data, ' ')}
-WorkerContext._send_script_result({self.resultsid})"""
- elif kind == 'function':
- script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
- else:
- raise NotImplementedError(kind)
+ data = task
+ script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
try:
self._exec(script)
@@ -199,15 +172,13 @@ WorkerContext._send_script_result({self.resultsid})"""
continue
else:
break
- (res, excdata), pickled, unboundop = obj
+ (res, exc), unboundop = obj
assert unboundop is None, unboundop
- if excdata is not None:
+ if exc is not None:
assert res is None, res
- assert pickled
assert exc_wrapper is not None
- exc = pickle.loads(excdata)
raise exc from exc_wrapper
- return pickle.loads(res) if pickled else res
+ return res
class BrokenInterpreterPool(_thread.BrokenThreadPool):