diff options
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r-- | Lib/concurrent/futures/_base.py | 27 | ||||
-rw-r--r-- | Lib/concurrent/futures/interpreter.py | 55 |
2 files changed, 40 insertions, 42 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d98b1ebdd58..f506ce68aea 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -558,6 +558,33 @@ class Future(object): self._condition.notify_all() self._invoke_callbacks() + def _get_snapshot(self): + """Get a snapshot of the future's current state. + + This method atomically retrieves the state in one lock acquisition, + which is significantly faster than multiple method calls. + + Returns: + Tuple of (done, cancelled, result, exception) + - done: True if the future is done (cancelled or finished) + - cancelled: True if the future was cancelled + - result: The result if available and not cancelled + - exception: The exception if available and not cancelled + """ + # Fast path: check if already finished without lock + if self._state == FINISHED: + return True, False, self._result, self._exception + + # Need lock for other states since they can change + with self._condition: + # We have to check the state again after acquiring the lock + # because it may have changed in the meantime. + if self._state == FINISHED: + return True, False, self._result, self._exception + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}: + return True, True, None, None + return False, False, None, None + __class_getitem__ = classmethod(types.GenericAlias) class Executor(object): diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index d17688dc9d7..a2c4fbfd3fb 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -36,9 +36,6 @@ Uncaught in the interpreter: """.strip()) -UNBOUND = 2 # error; this should not happen. - - class WorkerContext(_thread.WorkerContext): @classmethod @@ -47,23 +44,13 @@ class WorkerContext(_thread.WorkerContext): if isinstance(fn, str): # XXX Circle back to this later. raise TypeError('scripts not supported') - if args or kwargs: - raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}') - data = textwrap.dedent(fn) - kind = 'script' - # Make sure the script compiles. - # Ideally we wouldn't throw away the resulting code - # object. However, there isn't much to be done until - # code objects are shareable and/or we do a better job - # of supporting code objects in _interpreters.exec(). - compile(data, '<string>', 'exec') else: # Functions defined in the __main__ module can't be pickled, # so they can't be used here. In the future, we could possibly # borrow from multiprocessing to work around this. - data = pickle.dumps((fn, args, kwargs)) - kind = 'function' - return (data, kind) + task = (fn, args, kwargs) + data = pickle.dumps(task) + return data if initializer is not None: try: @@ -86,24 +73,20 @@ class WorkerContext(_thread.WorkerContext): except BaseException as exc: # Send the captured exception out on the results queue, # but still leave it unhandled for the interpreter to handle. - err = pickle.dumps(exc) - _interpqueues.put(resultsid, (None, err), 1, UNBOUND) + _interpqueues.put(resultsid, (None, exc)) raise # re-raise @classmethod def _send_script_result(cls, resultsid): - _interpqueues.put(resultsid, (None, None), 0, UNBOUND) + _interpqueues.put(resultsid, (None, None)) @classmethod def _call(cls, func, args, kwargs, resultsid): with cls._capture_exc(resultsid): res = func(*args or (), **kwargs or {}) # Send the result back. - try: - _interpqueues.put(resultsid, (res, None), 0, UNBOUND) - except _interpreters.NotShareableError: - res = pickle.dumps(res) - _interpqueues.put(resultsid, (res, None), 1, UNBOUND) + with cls._capture_exc(resultsid): + _interpqueues.put(resultsid, (res, None)) @classmethod def _call_pickled(cls, pickled, resultsid): @@ -134,8 +117,7 @@ class WorkerContext(_thread.WorkerContext): _interpreters.incref(self.interpid) maxsize = 0 - fmt = 0 - self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) + self.resultsid = _interpqueues.create(maxsize) self._exec(f'from {__name__} import WorkerContext') @@ -166,17 +148,8 @@ class WorkerContext(_thread.WorkerContext): pass def run(self, task): - data, kind = task - if kind == 'script': - raise NotImplementedError('script kind disabled') - script = f""" -with WorkerContext._capture_exc({self.resultsid}): -{textwrap.indent(data, ' ')} -WorkerContext._send_script_result({self.resultsid})""" - elif kind == 'function': - script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})' - else: - raise NotImplementedError(kind) + data = task + script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})' try: self._exec(script) @@ -199,15 +172,13 @@ WorkerContext._send_script_result({self.resultsid})""" continue else: break - (res, excdata), pickled, unboundop = obj + (res, exc), unboundop = obj assert unboundop is None, unboundop - if excdata is not None: + if exc is not None: assert res is None, res - assert pickled assert exc_wrapper is not None - exc = pickle.loads(excdata) raise exc from exc_wrapper - return pickle.loads(res) if pickled else res + return res class BrokenInterpreterPool(_thread.BrokenThreadPool): |