diff options
Diffstat (limited to 'Lib/concurrent')
-rw-r--r-- | Lib/concurrent/futures/interpreter.py | 2 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 5 | ||||
-rw-r--r-- | Lib/concurrent/interpreters/__init__.py | 246 | ||||
-rw-r--r-- | Lib/concurrent/interpreters/_crossinterp.py | 102 | ||||
-rw-r--r-- | Lib/concurrent/interpreters/_queues.py | 288 |
5 files changed, 642 insertions, 1 deletions
diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py index a2c4fbfd3fb..f12b4ac33cd 100644 --- a/Lib/concurrent/futures/interpreter.py +++ b/Lib/concurrent/futures/interpreter.py @@ -167,7 +167,7 @@ class WorkerContext(_thread.WorkerContext): except _interpqueues.QueueError: continue except ModuleNotFoundError: - # interpreters.queues doesn't exist, which means + # interpreters._queues doesn't exist, which means # QueueEmpty doesn't. Act as though it does. continue else: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 76b7b2abe83..a14650bf5fa 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -755,6 +755,11 @@ class ProcessPoolExecutor(_base.Executor): self._executor_manager_thread_wakeup def _adjust_process_count(self): + # gh-132969: avoid error when state is reset and executor is still running, + # which will happen when shutdown(wait=False) is called. + if self._processes is None: + return + # if there's an idle process, we don't need to spawn a new one. if self._idle_worker_semaphore.acquire(blocking=False): return diff --git a/Lib/concurrent/interpreters/__init__.py b/Lib/concurrent/interpreters/__init__.py new file mode 100644 index 00000000000..0fd661249a2 --- /dev/null +++ b/Lib/concurrent/interpreters/__init__.py @@ -0,0 +1,246 @@ +"""Subinterpreters High Level Module.""" + +import threading +import weakref +import _interpreters + +# aliases: +from _interpreters import ( + InterpreterError, InterpreterNotFoundError, NotShareableError, + is_shareable, +) +from ._queues import ( + create as create_queue, + Queue, QueueEmpty, QueueFull, +) + + +__all__ = [ + 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', + 'Interpreter', + 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed', + 'NotShareableError', + 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', +] + + +_EXEC_FAILURE_STR = """ +{superstr} + +Uncaught in the interpreter: + +{formatted} +""".strip() + +class ExecutionFailed(InterpreterError): + """An unhandled exception happened during execution. + + This is raised from Interpreter.exec() and Interpreter.call(). + """ + + def __init__(self, excinfo): + msg = excinfo.formatted + if not msg: + if excinfo.type and excinfo.msg: + msg = f'{excinfo.type.__name__}: {excinfo.msg}' + else: + msg = excinfo.type.__name__ or excinfo.msg + super().__init__(msg) + self.excinfo = excinfo + + def __str__(self): + try: + formatted = self.excinfo.errdisplay + except Exception: + return super().__str__() + else: + return _EXEC_FAILURE_STR.format( + superstr=super().__str__(), + formatted=formatted, + ) + + +def create(): + """Return a new (idle) Python interpreter.""" + id = _interpreters.create(reqrefs=True) + return Interpreter(id, _ownsref=True) + + +def list_all(): + """Return all existing interpreters.""" + return [Interpreter(id, _whence=whence) + for id, whence in _interpreters.list_all(require_ready=True)] + + +def get_current(): + """Return the currently running interpreter.""" + id, whence = _interpreters.get_current() + return Interpreter(id, _whence=whence) + + +def get_main(): + """Return the main interpreter.""" + id, whence = _interpreters.get_main() + assert whence == _interpreters.WHENCE_RUNTIME, repr(whence) + return Interpreter(id, _whence=whence) + + +_known = weakref.WeakValueDictionary() + +class Interpreter: + """A single Python interpreter. + + Attributes: + + "id" - the unique process-global ID number for the interpreter + "whence" - indicates where the interpreter was created + + If the interpreter wasn't created by this module + then any method that modifies the interpreter will fail, + i.e. .close(), .prepare_main(), .exec(), and .call() + """ + + _WHENCE_TO_STR = { + _interpreters.WHENCE_UNKNOWN: 'unknown', + _interpreters.WHENCE_RUNTIME: 'runtime init', + _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API', + _interpreters.WHENCE_CAPI: 'C-API', + _interpreters.WHENCE_XI: 'cross-interpreter C-API', + _interpreters.WHENCE_STDLIB: '_interpreters module', + } + + def __new__(cls, id, /, _whence=None, _ownsref=None): + # There is only one instance for any given ID. + if not isinstance(id, int): + raise TypeError(f'id must be an int, got {id!r}') + id = int(id) + if _whence is None: + if _ownsref: + _whence = _interpreters.WHENCE_STDLIB + else: + _whence = _interpreters.whence(id) + assert _whence in cls._WHENCE_TO_STR, repr(_whence) + if _ownsref is None: + _ownsref = (_whence == _interpreters.WHENCE_STDLIB) + try: + self = _known[id] + assert hasattr(self, '_ownsref') + except KeyError: + self = super().__new__(cls) + _known[id] = self + self._id = id + self._whence = _whence + self._ownsref = _ownsref + if _ownsref: + # This may raise InterpreterNotFoundError: + _interpreters.incref(id) + return self + + def __repr__(self): + return f'{type(self).__name__}({self.id})' + + def __hash__(self): + return hash(self._id) + + def __del__(self): + self._decref() + + # for pickling: + def __getnewargs__(self): + return (self._id,) + + # for pickling: + def __getstate__(self): + return None + + def _decref(self): + if not self._ownsref: + return + self._ownsref = False + try: + _interpreters.decref(self._id) + except InterpreterNotFoundError: + pass + + @property + def id(self): + return self._id + + @property + def whence(self): + return self._WHENCE_TO_STR[self._whence] + + def is_running(self): + """Return whether or not the identified interpreter is running.""" + return _interpreters.is_running(self._id) + + # Everything past here is available only to interpreters created by + # interpreters.create(). + + def close(self): + """Finalize and destroy the interpreter. + + Attempting to destroy the current interpreter results + in an InterpreterError. + """ + return _interpreters.destroy(self._id, restrict=True) + + def prepare_main(self, ns=None, /, **kwargs): + """Bind the given values into the interpreter's __main__. + + The values must be shareable. + """ + ns = dict(ns, **kwargs) if ns is not None else kwargs + _interpreters.set___main___attrs(self._id, ns, restrict=True) + + def exec(self, code, /): + """Run the given source code in the interpreter. + + This is essentially the same as calling the builtin "exec" + with this interpreter, using the __dict__ of its __main__ + module as both globals and locals. + + There is no return value. + + If the code raises an unhandled exception then an ExecutionFailed + exception is raised, which summarizes the unhandled exception. + The actual exception is discarded because objects cannot be + shared between interpreters. + + This blocks the current Python thread until done. During + that time, the previous interpreter is allowed to run + in other threads. + """ + excinfo = _interpreters.exec(self._id, code, restrict=True) + if excinfo is not None: + raise ExecutionFailed(excinfo) + + def _call(self, callable, args, kwargs): + res, excinfo = _interpreters.call(self._id, callable, args, kwargs, restrict=True) + if excinfo is not None: + raise ExecutionFailed(excinfo) + return res + + def call(self, callable, /, *args, **kwargs): + """Call the object in the interpreter with given args/kwargs. + + Nearly all callables, args, kwargs, and return values are + supported. All "shareable" objects are supported, as are + "stateless" functions (meaning non-closures that do not use + any globals). This method will fall back to pickle. + + If the callable raises an exception then the error display + (including full traceback) is sent back between the interpreters + and an ExecutionFailed exception is raised, much like what + happens with Interpreter.exec(). + """ + return self._call(callable, args, kwargs) + + def call_in_thread(self, callable, /, *args, **kwargs): + """Return a new thread that calls the object in the interpreter. + + The return value and any raised exception are discarded. + """ + t = threading.Thread(target=self._call, args=(callable, args, kwargs)) + t.start() + return t diff --git a/Lib/concurrent/interpreters/_crossinterp.py b/Lib/concurrent/interpreters/_crossinterp.py new file mode 100644 index 00000000000..f47eb693ac8 --- /dev/null +++ b/Lib/concurrent/interpreters/_crossinterp.py @@ -0,0 +1,102 @@ +"""Common code between queues and channels.""" + + +class ItemInterpreterDestroyed(Exception): + """Raised when trying to get an item whose interpreter was destroyed.""" + + +class classonly: + """A non-data descriptor that makes a value only visible on the class. + + This is like the "classmethod" builtin, but does not show up on + instances of the class. It may be used as a decorator. + """ + + def __init__(self, value): + self.value = value + self.getter = classmethod(value).__get__ + self.name = None + + def __set_name__(self, cls, name): + if self.name is not None: + raise TypeError('already used') + self.name = name + + def __get__(self, obj, cls): + if obj is not None: + raise AttributeError(self.name) + # called on the class + return self.getter(None, cls) + + +class UnboundItem: + """Represents a cross-interpreter item no longer bound to an interpreter. + + An item is unbound when the interpreter that added it to the + cross-interpreter container is destroyed. + """ + + __slots__ = () + + @classonly + def singleton(cls, kind, module, name='UNBOUND'): + doc = cls.__doc__.replace('cross-interpreter container', kind) + doc = doc.replace('cross-interpreter', kind) + subclass = type( + f'Unbound{kind.capitalize()}Item', + (cls,), + dict( + _MODULE=module, + _NAME=name, + __doc__=doc, + ), + ) + return object.__new__(subclass) + + _MODULE = __name__ + _NAME = 'UNBOUND' + + def __new__(cls): + raise Exception(f'use {cls._MODULE}.{cls._NAME}') + + def __repr__(self): + return f'{self._MODULE}.{self._NAME}' +# return f'interpreters._queues.UNBOUND' + + +UNBOUND = object.__new__(UnboundItem) +UNBOUND_ERROR = object() +UNBOUND_REMOVE = object() + +_UNBOUND_CONSTANT_TO_FLAG = { + UNBOUND_REMOVE: 1, + UNBOUND_ERROR: 2, + UNBOUND: 3, +} +_UNBOUND_FLAG_TO_CONSTANT = {v: k + for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} + + +def serialize_unbound(unbound): + op = unbound + try: + flag = _UNBOUND_CONSTANT_TO_FLAG[op] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {op!r}') + return flag, + + +def resolve_unbound(flag, exctype_destroyed): + try: + op = _UNBOUND_FLAG_TO_CONSTANT[flag] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') + if op is UNBOUND_REMOVE: + # "remove" not possible here + raise NotImplementedError + elif op is UNBOUND_ERROR: + raise exctype_destroyed("item's original interpreter destroyed") + elif op is UNBOUND: + return UNBOUND + else: + raise NotImplementedError(repr(op)) diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py new file mode 100644 index 00000000000..99987f2f692 --- /dev/null +++ b/Lib/concurrent/interpreters/_queues.py @@ -0,0 +1,288 @@ +"""Cross-interpreter Queues High Level Module.""" + +import queue +import time +import weakref +import _interpqueues as _queues +from . import _crossinterp + +# aliases: +from _interpqueues import ( + QueueError, QueueNotFoundError, +) +from ._crossinterp import ( + UNBOUND_ERROR, UNBOUND_REMOVE, +) + +__all__ = [ + 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', + 'create', 'list_all', + 'Queue', + 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', + 'ItemInterpreterDestroyed', +] + + +class QueueEmpty(QueueError, queue.Empty): + """Raised from get_nowait() when the queue is empty. + + It is also raised from get() if it times out. + """ + + +class QueueFull(QueueError, queue.Full): + """Raised from put_nowait() when the queue is full. + + It is also raised from put() if it times out. + """ + + +class ItemInterpreterDestroyed(QueueError, + _crossinterp.ItemInterpreterDestroyed): + """Raised from get() and get_nowait().""" + + +_SHARED_ONLY = 0 +_PICKLED = 1 + + +UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) + + +def _serialize_unbound(unbound): + if unbound is UNBOUND: + unbound = _crossinterp.UNBOUND + return _crossinterp.serialize_unbound(unbound) + + +def _resolve_unbound(flag): + resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) + if resolved is _crossinterp.UNBOUND: + resolved = UNBOUND + return resolved + + +def create(maxsize=0, *, unbounditems=UNBOUND): + """Return a new cross-interpreter queue. + + The queue may be used to pass data safely between interpreters. + + "unbounditems" sets the default for Queue.put(); see that method for + supported values. The default value is UNBOUND, which replaces + the unbound item. + """ + unbound = _serialize_unbound(unbounditems) + unboundop, = unbound + qid = _queues.create(maxsize, unboundop, -1) + self = Queue(qid) + self._set_unbound(unboundop, unbounditems) + return self + + +def list_all(): + """Return a list of all open queues.""" + queues = [] + for qid, unboundop, _ in _queues.list_all(): + self = Queue(qid) + if not hasattr(self, '_unbound'): + self._set_unbound(unboundop) + else: + assert self._unbound[0] == unboundop + queues.append(self) + return queues + + +_known_queues = weakref.WeakValueDictionary() + +class Queue: + """A cross-interpreter queue.""" + + def __new__(cls, id, /): + # There is only one instance for any given ID. + if isinstance(id, int): + id = int(id) + else: + raise TypeError(f'id must be an int, got {id!r}') + try: + self = _known_queues[id] + except KeyError: + self = super().__new__(cls) + self._id = id + _known_queues[id] = self + _queues.bind(id) + return self + + def __del__(self): + try: + _queues.release(self._id) + except QueueNotFoundError: + pass + try: + del _known_queues[self._id] + except KeyError: + pass + + def __repr__(self): + return f'{type(self).__name__}({self.id})' + + def __hash__(self): + return hash(self._id) + + # for pickling: + def __getnewargs__(self): + return (self._id,) + + # for pickling: + def __getstate__(self): + return None + + def _set_unbound(self, op, items=None): + assert not hasattr(self, '_unbound') + if items is None: + items = _resolve_unbound(op) + unbound = (op, items) + self._unbound = unbound + return unbound + + @property + def id(self): + return self._id + + @property + def unbounditems(self): + try: + _, items = self._unbound + except AttributeError: + op, _ = _queues.get_queue_defaults(self._id) + _, items = self._set_unbound(op) + return items + + @property + def maxsize(self): + try: + return self._maxsize + except AttributeError: + self._maxsize = _queues.get_maxsize(self._id) + return self._maxsize + + def empty(self): + return self.qsize() == 0 + + def full(self): + return _queues.is_full(self._id) + + def qsize(self): + return _queues.get_count(self._id) + + def put(self, obj, timeout=None, *, + unbounditems=None, + _delay=10 / 1000, # 10 milliseconds + ): + """Add the object to the queue. + + This blocks while the queue is full. + + For most objects, the object received through Queue.get() will + be a new one, equivalent to the original and not sharing any + actual underlying data. The notable exceptions include + cross-interpreter types (like Queue) and memoryview, where the + underlying data is actually shared. Furthermore, some types + can be sent through a queue more efficiently than others. This + group includes various immutable types like int, str, bytes, and + tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable(). + + "unbounditems" controls the behavior of Queue.get() for the given + object if the current interpreter (calling put()) is later + destroyed. + + If "unbounditems" is None (the default) then it uses the + queue's default, set with create_queue(), + which is usually UNBOUND. + + If "unbounditems" is UNBOUND_ERROR then get() will raise an + ItemInterpreterDestroyed exception if the original interpreter + has been destroyed. This does not otherwise affect the queue; + the next call to put() will work like normal, returning the next + item in the queue. + + If "unbounditems" is UNBOUND_REMOVE then the item will be removed + from the queue as soon as the original interpreter is destroyed. + Be aware that this will introduce an imbalance between put() + and get() calls. + + If "unbounditems" is UNBOUND then it is returned by get() in place + of the unbound item. + """ + if unbounditems is None: + unboundop = -1 + else: + unboundop, = _serialize_unbound(unbounditems) + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout + while True: + try: + _queues.put(self._id, obj, unboundop) + except QueueFull as exc: + if timeout is not None and time.time() >= end: + raise # re-raise + time.sleep(_delay) + else: + break + + def put_nowait(self, obj, *, unbounditems=None): + if unbounditems is None: + unboundop = -1 + else: + unboundop, = _serialize_unbound(unbounditems) + _queues.put(self._id, obj, unboundop) + + def get(self, timeout=None, *, + _delay=10 / 1000, # 10 milliseconds + ): + """Return the next object from the queue. + + This blocks while the queue is empty. + + If the next item's original interpreter has been destroyed + then the "next object" is determined by the value of the + "unbounditems" argument to put(). + """ + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout + while True: + try: + obj, unboundop = _queues.get(self._id) + except QueueEmpty as exc: + if timeout is not None and time.time() >= end: + raise # re-raise + time.sleep(_delay) + else: + break + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + return obj + + def get_nowait(self): + """Return the next object from the channel. + + If the queue is empty then raise QueueEmpty. Otherwise this + is the same as get(). + """ + try: + obj, unboundop = _queues.get(self._id) + except QueueEmpty as exc: + raise # re-raise + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + return obj + + +_queues._register_heap_types(Queue, QueueEmpty, QueueFull) |