diff options
Diffstat (limited to 'Lib/test/support/interpreters/queues.py')
-rw-r--r-- | Lib/test/support/interpreters/queues.py | 313 |
1 files changed, 0 insertions, 313 deletions
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py deleted file mode 100644 index deb8e8613af..00000000000 --- a/Lib/test/support/interpreters/queues.py +++ /dev/null @@ -1,313 +0,0 @@ -"""Cross-interpreter Queues High Level Module.""" - -import pickle -import queue -import time -import weakref -import _interpqueues as _queues -from . import _crossinterp - -# aliases: -from _interpqueues import ( - QueueError, QueueNotFoundError, -) -from ._crossinterp import ( - UNBOUND_ERROR, UNBOUND_REMOVE, -) - -__all__ = [ - 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', - 'create', 'list_all', - 'Queue', - 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', - 'ItemInterpreterDestroyed', -] - - -class QueueEmpty(QueueError, queue.Empty): - """Raised from get_nowait() when the queue is empty. - - It is also raised from get() if it times out. - """ - - -class QueueFull(QueueError, queue.Full): - """Raised from put_nowait() when the queue is full. - - It is also raised from put() if it times out. - """ - - -class ItemInterpreterDestroyed(QueueError, - _crossinterp.ItemInterpreterDestroyed): - """Raised from get() and get_nowait().""" - - -_SHARED_ONLY = 0 -_PICKLED = 1 - - -UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) - - -def _serialize_unbound(unbound): - if unbound is UNBOUND: - unbound = _crossinterp.UNBOUND - return _crossinterp.serialize_unbound(unbound) - - -def _resolve_unbound(flag): - resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) - if resolved is _crossinterp.UNBOUND: - resolved = UNBOUND - return resolved - - -def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): - """Return a new cross-interpreter queue. - - The queue may be used to pass data safely between interpreters. - - "syncobj" sets the default for Queue.put() - and Queue.put_nowait(). - - "unbounditems" likewise sets the default. See Queue.put() for - supported values. The default value is UNBOUND, which replaces - the unbound item. - """ - fmt = _SHARED_ONLY if syncobj else _PICKLED - unbound = _serialize_unbound(unbounditems) - unboundop, = unbound - qid = _queues.create(maxsize, fmt, unboundop) - return Queue(qid, _fmt=fmt, _unbound=unbound) - - -def list_all(): - """Return a list of all open queues.""" - return [Queue(qid, _fmt=fmt, _unbound=(unboundop,)) - for qid, fmt, unboundop in _queues.list_all()] - - -_known_queues = weakref.WeakValueDictionary() - -class Queue: - """A cross-interpreter queue.""" - - def __new__(cls, id, /, *, _fmt=None, _unbound=None): - # There is only one instance for any given ID. - if isinstance(id, int): - id = int(id) - else: - raise TypeError(f'id must be an int, got {id!r}') - if _fmt is None: - if _unbound is None: - _fmt, op = _queues.get_queue_defaults(id) - _unbound = (op,) - else: - _fmt, _ = _queues.get_queue_defaults(id) - elif _unbound is None: - _, op = _queues.get_queue_defaults(id) - _unbound = (op,) - try: - self = _known_queues[id] - except KeyError: - self = super().__new__(cls) - self._id = id - self._fmt = _fmt - self._unbound = _unbound - _known_queues[id] = self - _queues.bind(id) - return self - - def __del__(self): - try: - _queues.release(self._id) - except QueueNotFoundError: - pass - try: - del _known_queues[self._id] - except KeyError: - pass - - def __repr__(self): - return f'{type(self).__name__}({self.id})' - - def __hash__(self): - return hash(self._id) - - # for pickling: - def __getnewargs__(self): - return (self._id,) - - # for pickling: - def __getstate__(self): - return None - - @property - def id(self): - return self._id - - @property - def maxsize(self): - try: - return self._maxsize - except AttributeError: - self._maxsize = _queues.get_maxsize(self._id) - return self._maxsize - - def empty(self): - return self.qsize() == 0 - - def full(self): - return _queues.is_full(self._id) - - def qsize(self): - return _queues.get_count(self._id) - - def put(self, obj, timeout=None, *, - syncobj=None, - unbound=None, - _delay=10 / 1000, # 10 milliseconds - ): - """Add the object to the queue. - - This blocks while the queue is full. - - If "syncobj" is None (the default) then it uses the - queue's default, set with create_queue(). - - If "syncobj" is false then all objects are supported, - at the expense of worse performance. - - If "syncobj" is true then the object must be "shareable". - Examples of "shareable" objects include the builtin singletons, - str, and memoryview. One benefit is that such objects are - passed through the queue efficiently. - - The key difference, though, is conceptual: the corresponding - object returned from Queue.get() will be strictly equivalent - to the given obj. In other words, the two objects will be - effectively indistinguishable from each other, even if the - object is mutable. The received object may actually be the - same object, or a copy (immutable values only), or a proxy. - Regardless, the received object should be treated as though - the original has been shared directly, whether or not it - actually is. That's a slightly different and stronger promise - than just (initial) equality, which is all "syncobj=False" - can promise. - - "unbound" controls the behavior of Queue.get() for the given - object if the current interpreter (calling put()) is later - destroyed. - - If "unbound" is None (the default) then it uses the - queue's default, set with create_queue(), - which is usually UNBOUND. - - If "unbound" is UNBOUND_ERROR then get() will raise an - ItemInterpreterDestroyed exception if the original interpreter - has been destroyed. This does not otherwise affect the queue; - the next call to put() will work like normal, returning the next - item in the queue. - - If "unbound" is UNBOUND_REMOVE then the item will be removed - from the queue as soon as the original interpreter is destroyed. - Be aware that this will introduce an imbalance between put() - and get() calls. - - If "unbound" is UNBOUND then it is returned by get() in place - of the unbound item. - """ - if syncobj is None: - fmt = self._fmt - else: - fmt = _SHARED_ONLY if syncobj else _PICKLED - if unbound is None: - unboundop, = self._unbound - else: - unboundop, = _serialize_unbound(unbound) - if timeout is not None: - timeout = int(timeout) - if timeout < 0: - raise ValueError(f'timeout value must be non-negative') - end = time.time() + timeout - if fmt is _PICKLED: - obj = pickle.dumps(obj) - while True: - try: - _queues.put(self._id, obj, fmt, unboundop) - except QueueFull as exc: - if timeout is not None and time.time() >= end: - raise # re-raise - time.sleep(_delay) - else: - break - - def put_nowait(self, obj, *, syncobj=None, unbound=None): - if syncobj is None: - fmt = self._fmt - else: - fmt = _SHARED_ONLY if syncobj else _PICKLED - if unbound is None: - unboundop, = self._unbound - else: - unboundop, = _serialize_unbound(unbound) - if fmt is _PICKLED: - obj = pickle.dumps(obj) - _queues.put(self._id, obj, fmt, unboundop) - - def get(self, timeout=None, *, - _delay=10 / 1000, # 10 milliseconds - ): - """Return the next object from the queue. - - This blocks while the queue is empty. - - If the next item's original interpreter has been destroyed - then the "next object" is determined by the value of the - "unbound" argument to put(). - """ - if timeout is not None: - timeout = int(timeout) - if timeout < 0: - raise ValueError(f'timeout value must be non-negative') - end = time.time() + timeout - while True: - try: - obj, fmt, unboundop = _queues.get(self._id) - except QueueEmpty as exc: - if timeout is not None and time.time() >= end: - raise # re-raise - time.sleep(_delay) - else: - break - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - if fmt == _PICKLED: - obj = pickle.loads(obj) - else: - assert fmt == _SHARED_ONLY - return obj - - def get_nowait(self): - """Return the next object from the channel. - - If the queue is empty then raise QueueEmpty. Otherwise this - is the same as get(). - """ - try: - obj, fmt, unboundop = _queues.get(self._id) - except QueueEmpty as exc: - raise # re-raise - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - if fmt == _PICKLED: - obj = pickle.loads(obj) - else: - assert fmt == _SHARED_ONLY - return obj - - -_queues._register_heap_types(Queue, QueueEmpty, QueueFull) |