aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent')
-rw-r--r--Lib/concurrent/futures/interpreter.py2
-rw-r--r--Lib/concurrent/futures/process.py5
-rw-r--r--Lib/concurrent/interpreters/__init__.py246
-rw-r--r--Lib/concurrent/interpreters/_crossinterp.py102
-rw-r--r--Lib/concurrent/interpreters/_queues.py288
5 files changed, 642 insertions, 1 deletions
diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py
index a2c4fbfd3fb..f12b4ac33cd 100644
--- a/Lib/concurrent/futures/interpreter.py
+++ b/Lib/concurrent/futures/interpreter.py
@@ -167,7 +167,7 @@ class WorkerContext(_thread.WorkerContext):
except _interpqueues.QueueError:
continue
except ModuleNotFoundError:
- # interpreters.queues doesn't exist, which means
+ # interpreters._queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 76b7b2abe83..a14650bf5fa 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -755,6 +755,11 @@ class ProcessPoolExecutor(_base.Executor):
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
+ # gh-132969: avoid error when state is reset and executor is still running,
+ # which will happen when shutdown(wait=False) is called.
+ if self._processes is None:
+ return
+
# if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_semaphore.acquire(blocking=False):
return
diff --git a/Lib/concurrent/interpreters/__init__.py b/Lib/concurrent/interpreters/__init__.py
new file mode 100644
index 00000000000..0fd661249a2
--- /dev/null
+++ b/Lib/concurrent/interpreters/__init__.py
@@ -0,0 +1,246 @@
+"""Subinterpreters High Level Module."""
+
+import threading
+import weakref
+import _interpreters
+
+# aliases:
+from _interpreters import (
+ InterpreterError, InterpreterNotFoundError, NotShareableError,
+ is_shareable,
+)
+from ._queues import (
+ create as create_queue,
+ Queue, QueueEmpty, QueueFull,
+)
+
+
+__all__ = [
+ 'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
+ 'Interpreter',
+ 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed',
+ 'NotShareableError',
+ 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
+]
+
+
+_EXEC_FAILURE_STR = """
+{superstr}
+
+Uncaught in the interpreter:
+
+{formatted}
+""".strip()
+
+class ExecutionFailed(InterpreterError):
+ """An unhandled exception happened during execution.
+
+ This is raised from Interpreter.exec() and Interpreter.call().
+ """
+
+ def __init__(self, excinfo):
+ msg = excinfo.formatted
+ if not msg:
+ if excinfo.type and excinfo.msg:
+ msg = f'{excinfo.type.__name__}: {excinfo.msg}'
+ else:
+ msg = excinfo.type.__name__ or excinfo.msg
+ super().__init__(msg)
+ self.excinfo = excinfo
+
+ def __str__(self):
+ try:
+ formatted = self.excinfo.errdisplay
+ except Exception:
+ return super().__str__()
+ else:
+ return _EXEC_FAILURE_STR.format(
+ superstr=super().__str__(),
+ formatted=formatted,
+ )
+
+
+def create():
+ """Return a new (idle) Python interpreter."""
+ id = _interpreters.create(reqrefs=True)
+ return Interpreter(id, _ownsref=True)
+
+
+def list_all():
+ """Return all existing interpreters."""
+ return [Interpreter(id, _whence=whence)
+ for id, whence in _interpreters.list_all(require_ready=True)]
+
+
+def get_current():
+ """Return the currently running interpreter."""
+ id, whence = _interpreters.get_current()
+ return Interpreter(id, _whence=whence)
+
+
+def get_main():
+ """Return the main interpreter."""
+ id, whence = _interpreters.get_main()
+ assert whence == _interpreters.WHENCE_RUNTIME, repr(whence)
+ return Interpreter(id, _whence=whence)
+
+
+_known = weakref.WeakValueDictionary()
+
+class Interpreter:
+ """A single Python interpreter.
+
+ Attributes:
+
+ "id" - the unique process-global ID number for the interpreter
+ "whence" - indicates where the interpreter was created
+
+ If the interpreter wasn't created by this module
+ then any method that modifies the interpreter will fail,
+ i.e. .close(), .prepare_main(), .exec(), and .call()
+ """
+
+ _WHENCE_TO_STR = {
+ _interpreters.WHENCE_UNKNOWN: 'unknown',
+ _interpreters.WHENCE_RUNTIME: 'runtime init',
+ _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
+ _interpreters.WHENCE_CAPI: 'C-API',
+ _interpreters.WHENCE_XI: 'cross-interpreter C-API',
+ _interpreters.WHENCE_STDLIB: '_interpreters module',
+ }
+
+ def __new__(cls, id, /, _whence=None, _ownsref=None):
+ # There is only one instance for any given ID.
+ if not isinstance(id, int):
+ raise TypeError(f'id must be an int, got {id!r}')
+ id = int(id)
+ if _whence is None:
+ if _ownsref:
+ _whence = _interpreters.WHENCE_STDLIB
+ else:
+ _whence = _interpreters.whence(id)
+ assert _whence in cls._WHENCE_TO_STR, repr(_whence)
+ if _ownsref is None:
+ _ownsref = (_whence == _interpreters.WHENCE_STDLIB)
+ try:
+ self = _known[id]
+ assert hasattr(self, '_ownsref')
+ except KeyError:
+ self = super().__new__(cls)
+ _known[id] = self
+ self._id = id
+ self._whence = _whence
+ self._ownsref = _ownsref
+ if _ownsref:
+ # This may raise InterpreterNotFoundError:
+ _interpreters.incref(id)
+ return self
+
+ def __repr__(self):
+ return f'{type(self).__name__}({self.id})'
+
+ def __hash__(self):
+ return hash(self._id)
+
+ def __del__(self):
+ self._decref()
+
+ # for pickling:
+ def __getnewargs__(self):
+ return (self._id,)
+
+ # for pickling:
+ def __getstate__(self):
+ return None
+
+ def _decref(self):
+ if not self._ownsref:
+ return
+ self._ownsref = False
+ try:
+ _interpreters.decref(self._id)
+ except InterpreterNotFoundError:
+ pass
+
+ @property
+ def id(self):
+ return self._id
+
+ @property
+ def whence(self):
+ return self._WHENCE_TO_STR[self._whence]
+
+ def is_running(self):
+ """Return whether or not the identified interpreter is running."""
+ return _interpreters.is_running(self._id)
+
+ # Everything past here is available only to interpreters created by
+ # interpreters.create().
+
+ def close(self):
+ """Finalize and destroy the interpreter.
+
+ Attempting to destroy the current interpreter results
+ in an InterpreterError.
+ """
+ return _interpreters.destroy(self._id, restrict=True)
+
+ def prepare_main(self, ns=None, /, **kwargs):
+ """Bind the given values into the interpreter's __main__.
+
+ The values must be shareable.
+ """
+ ns = dict(ns, **kwargs) if ns is not None else kwargs
+ _interpreters.set___main___attrs(self._id, ns, restrict=True)
+
+ def exec(self, code, /):
+ """Run the given source code in the interpreter.
+
+ This is essentially the same as calling the builtin "exec"
+ with this interpreter, using the __dict__ of its __main__
+ module as both globals and locals.
+
+ There is no return value.
+
+ If the code raises an unhandled exception then an ExecutionFailed
+ exception is raised, which summarizes the unhandled exception.
+ The actual exception is discarded because objects cannot be
+ shared between interpreters.
+
+ This blocks the current Python thread until done. During
+ that time, the previous interpreter is allowed to run
+ in other threads.
+ """
+ excinfo = _interpreters.exec(self._id, code, restrict=True)
+ if excinfo is not None:
+ raise ExecutionFailed(excinfo)
+
+ def _call(self, callable, args, kwargs):
+ res, excinfo = _interpreters.call(self._id, callable, args, kwargs, restrict=True)
+ if excinfo is not None:
+ raise ExecutionFailed(excinfo)
+ return res
+
+ def call(self, callable, /, *args, **kwargs):
+ """Call the object in the interpreter with given args/kwargs.
+
+ Nearly all callables, args, kwargs, and return values are
+ supported. All "shareable" objects are supported, as are
+ "stateless" functions (meaning non-closures that do not use
+ any globals). This method will fall back to pickle.
+
+ If the callable raises an exception then the error display
+ (including full traceback) is sent back between the interpreters
+ and an ExecutionFailed exception is raised, much like what
+ happens with Interpreter.exec().
+ """
+ return self._call(callable, args, kwargs)
+
+ def call_in_thread(self, callable, /, *args, **kwargs):
+ """Return a new thread that calls the object in the interpreter.
+
+ The return value and any raised exception are discarded.
+ """
+ t = threading.Thread(target=self._call, args=(callable, args, kwargs))
+ t.start()
+ return t
diff --git a/Lib/concurrent/interpreters/_crossinterp.py b/Lib/concurrent/interpreters/_crossinterp.py
new file mode 100644
index 00000000000..f47eb693ac8
--- /dev/null
+++ b/Lib/concurrent/interpreters/_crossinterp.py
@@ -0,0 +1,102 @@
+"""Common code between queues and channels."""
+
+
+class ItemInterpreterDestroyed(Exception):
+ """Raised when trying to get an item whose interpreter was destroyed."""
+
+
+class classonly:
+ """A non-data descriptor that makes a value only visible on the class.
+
+ This is like the "classmethod" builtin, but does not show up on
+ instances of the class. It may be used as a decorator.
+ """
+
+ def __init__(self, value):
+ self.value = value
+ self.getter = classmethod(value).__get__
+ self.name = None
+
+ def __set_name__(self, cls, name):
+ if self.name is not None:
+ raise TypeError('already used')
+ self.name = name
+
+ def __get__(self, obj, cls):
+ if obj is not None:
+ raise AttributeError(self.name)
+ # called on the class
+ return self.getter(None, cls)
+
+
+class UnboundItem:
+ """Represents a cross-interpreter item no longer bound to an interpreter.
+
+ An item is unbound when the interpreter that added it to the
+ cross-interpreter container is destroyed.
+ """
+
+ __slots__ = ()
+
+ @classonly
+ def singleton(cls, kind, module, name='UNBOUND'):
+ doc = cls.__doc__.replace('cross-interpreter container', kind)
+ doc = doc.replace('cross-interpreter', kind)
+ subclass = type(
+ f'Unbound{kind.capitalize()}Item',
+ (cls,),
+ dict(
+ _MODULE=module,
+ _NAME=name,
+ __doc__=doc,
+ ),
+ )
+ return object.__new__(subclass)
+
+ _MODULE = __name__
+ _NAME = 'UNBOUND'
+
+ def __new__(cls):
+ raise Exception(f'use {cls._MODULE}.{cls._NAME}')
+
+ def __repr__(self):
+ return f'{self._MODULE}.{self._NAME}'
+# return f'interpreters._queues.UNBOUND'
+
+
+UNBOUND = object.__new__(UnboundItem)
+UNBOUND_ERROR = object()
+UNBOUND_REMOVE = object()
+
+_UNBOUND_CONSTANT_TO_FLAG = {
+ UNBOUND_REMOVE: 1,
+ UNBOUND_ERROR: 2,
+ UNBOUND: 3,
+}
+_UNBOUND_FLAG_TO_CONSTANT = {v: k
+ for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
+
+
+def serialize_unbound(unbound):
+ op = unbound
+ try:
+ flag = _UNBOUND_CONSTANT_TO_FLAG[op]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
+ return flag,
+
+
+def resolve_unbound(flag, exctype_destroyed):
+ try:
+ op = _UNBOUND_FLAG_TO_CONSTANT[flag]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
+ if op is UNBOUND_REMOVE:
+ # "remove" not possible here
+ raise NotImplementedError
+ elif op is UNBOUND_ERROR:
+ raise exctype_destroyed("item's original interpreter destroyed")
+ elif op is UNBOUND:
+ return UNBOUND
+ else:
+ raise NotImplementedError(repr(op))
diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py
new file mode 100644
index 00000000000..99987f2f692
--- /dev/null
+++ b/Lib/concurrent/interpreters/_queues.py
@@ -0,0 +1,288 @@
+"""Cross-interpreter Queues High Level Module."""
+
+import queue
+import time
+import weakref
+import _interpqueues as _queues
+from . import _crossinterp
+
+# aliases:
+from _interpqueues import (
+ QueueError, QueueNotFoundError,
+)
+from ._crossinterp import (
+ UNBOUND_ERROR, UNBOUND_REMOVE,
+)
+
+__all__ = [
+ 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
+ 'create', 'list_all',
+ 'Queue',
+ 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
+ 'ItemInterpreterDestroyed',
+]
+
+
+class QueueEmpty(QueueError, queue.Empty):
+ """Raised from get_nowait() when the queue is empty.
+
+ It is also raised from get() if it times out.
+ """
+
+
+class QueueFull(QueueError, queue.Full):
+ """Raised from put_nowait() when the queue is full.
+
+ It is also raised from put() if it times out.
+ """
+
+
+class ItemInterpreterDestroyed(QueueError,
+ _crossinterp.ItemInterpreterDestroyed):
+ """Raised from get() and get_nowait()."""
+
+
+_SHARED_ONLY = 0
+_PICKLED = 1
+
+
+UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
+
+
+def _serialize_unbound(unbound):
+ if unbound is UNBOUND:
+ unbound = _crossinterp.UNBOUND
+ return _crossinterp.serialize_unbound(unbound)
+
+
+def _resolve_unbound(flag):
+ resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
+ if resolved is _crossinterp.UNBOUND:
+ resolved = UNBOUND
+ return resolved
+
+
+def create(maxsize=0, *, unbounditems=UNBOUND):
+ """Return a new cross-interpreter queue.
+
+ The queue may be used to pass data safely between interpreters.
+
+ "unbounditems" sets the default for Queue.put(); see that method for
+ supported values. The default value is UNBOUND, which replaces
+ the unbound item.
+ """
+ unbound = _serialize_unbound(unbounditems)
+ unboundop, = unbound
+ qid = _queues.create(maxsize, unboundop, -1)
+ self = Queue(qid)
+ self._set_unbound(unboundop, unbounditems)
+ return self
+
+
+def list_all():
+ """Return a list of all open queues."""
+ queues = []
+ for qid, unboundop, _ in _queues.list_all():
+ self = Queue(qid)
+ if not hasattr(self, '_unbound'):
+ self._set_unbound(unboundop)
+ else:
+ assert self._unbound[0] == unboundop
+ queues.append(self)
+ return queues
+
+
+_known_queues = weakref.WeakValueDictionary()
+
+class Queue:
+ """A cross-interpreter queue."""
+
+ def __new__(cls, id, /):
+ # There is only one instance for any given ID.
+ if isinstance(id, int):
+ id = int(id)
+ else:
+ raise TypeError(f'id must be an int, got {id!r}')
+ try:
+ self = _known_queues[id]
+ except KeyError:
+ self = super().__new__(cls)
+ self._id = id
+ _known_queues[id] = self
+ _queues.bind(id)
+ return self
+
+ def __del__(self):
+ try:
+ _queues.release(self._id)
+ except QueueNotFoundError:
+ pass
+ try:
+ del _known_queues[self._id]
+ except KeyError:
+ pass
+
+ def __repr__(self):
+ return f'{type(self).__name__}({self.id})'
+
+ def __hash__(self):
+ return hash(self._id)
+
+ # for pickling:
+ def __getnewargs__(self):
+ return (self._id,)
+
+ # for pickling:
+ def __getstate__(self):
+ return None
+
+ def _set_unbound(self, op, items=None):
+ assert not hasattr(self, '_unbound')
+ if items is None:
+ items = _resolve_unbound(op)
+ unbound = (op, items)
+ self._unbound = unbound
+ return unbound
+
+ @property
+ def id(self):
+ return self._id
+
+ @property
+ def unbounditems(self):
+ try:
+ _, items = self._unbound
+ except AttributeError:
+ op, _ = _queues.get_queue_defaults(self._id)
+ _, items = self._set_unbound(op)
+ return items
+
+ @property
+ def maxsize(self):
+ try:
+ return self._maxsize
+ except AttributeError:
+ self._maxsize = _queues.get_maxsize(self._id)
+ return self._maxsize
+
+ def empty(self):
+ return self.qsize() == 0
+
+ def full(self):
+ return _queues.is_full(self._id)
+
+ def qsize(self):
+ return _queues.get_count(self._id)
+
+ def put(self, obj, timeout=None, *,
+ unbounditems=None,
+ _delay=10 / 1000, # 10 milliseconds
+ ):
+ """Add the object to the queue.
+
+ This blocks while the queue is full.
+
+ For most objects, the object received through Queue.get() will
+ be a new one, equivalent to the original and not sharing any
+ actual underlying data. The notable exceptions include
+ cross-interpreter types (like Queue) and memoryview, where the
+ underlying data is actually shared. Furthermore, some types
+ can be sent through a queue more efficiently than others. This
+ group includes various immutable types like int, str, bytes, and
+ tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable().
+
+ "unbounditems" controls the behavior of Queue.get() for the given
+ object if the current interpreter (calling put()) is later
+ destroyed.
+
+ If "unbounditems" is None (the default) then it uses the
+ queue's default, set with create_queue(),
+ which is usually UNBOUND.
+
+ If "unbounditems" is UNBOUND_ERROR then get() will raise an
+ ItemInterpreterDestroyed exception if the original interpreter
+ has been destroyed. This does not otherwise affect the queue;
+ the next call to put() will work like normal, returning the next
+ item in the queue.
+
+ If "unbounditems" is UNBOUND_REMOVE then the item will be removed
+ from the queue as soon as the original interpreter is destroyed.
+ Be aware that this will introduce an imbalance between put()
+ and get() calls.
+
+ If "unbounditems" is UNBOUND then it is returned by get() in place
+ of the unbound item.
+ """
+ if unbounditems is None:
+ unboundop = -1
+ else:
+ unboundop, = _serialize_unbound(unbounditems)
+ if timeout is not None:
+ timeout = int(timeout)
+ if timeout < 0:
+ raise ValueError(f'timeout value must be non-negative')
+ end = time.time() + timeout
+ while True:
+ try:
+ _queues.put(self._id, obj, unboundop)
+ except QueueFull as exc:
+ if timeout is not None and time.time() >= end:
+ raise # re-raise
+ time.sleep(_delay)
+ else:
+ break
+
+ def put_nowait(self, obj, *, unbounditems=None):
+ if unbounditems is None:
+ unboundop = -1
+ else:
+ unboundop, = _serialize_unbound(unbounditems)
+ _queues.put(self._id, obj, unboundop)
+
+ def get(self, timeout=None, *,
+ _delay=10 / 1000, # 10 milliseconds
+ ):
+ """Return the next object from the queue.
+
+ This blocks while the queue is empty.
+
+ If the next item's original interpreter has been destroyed
+ then the "next object" is determined by the value of the
+ "unbounditems" argument to put().
+ """
+ if timeout is not None:
+ timeout = int(timeout)
+ if timeout < 0:
+ raise ValueError(f'timeout value must be non-negative')
+ end = time.time() + timeout
+ while True:
+ try:
+ obj, unboundop = _queues.get(self._id)
+ except QueueEmpty as exc:
+ if timeout is not None and time.time() >= end:
+ raise # re-raise
+ time.sleep(_delay)
+ else:
+ break
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
+ return obj
+
+ def get_nowait(self):
+ """Return the next object from the channel.
+
+ If the queue is empty then raise QueueEmpty. Otherwise this
+ is the same as get().
+ """
+ try:
+ obj, unboundop = _queues.get(self._id)
+ except QueueEmpty as exc:
+ raise # re-raise
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
+ return obj
+
+
+_queues._register_heap_types(Queue, QueueEmpty, QueueFull)