diff options
Diffstat (limited to 'Lib/test/support/interpreters')
-rw-r--r-- | Lib/test/support/interpreters/__init__.py | 257 | ||||
-rw-r--r-- | Lib/test/support/interpreters/_crossinterp.py | 102 | ||||
-rw-r--r-- | Lib/test/support/interpreters/channels.py | 282 | ||||
-rw-r--r-- | Lib/test/support/interpreters/queues.py | 288 |
4 files changed, 0 insertions, 929 deletions
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py deleted file mode 100644 index 6d1b0690805..00000000000 --- a/Lib/test/support/interpreters/__init__.py +++ /dev/null @@ -1,257 +0,0 @@ -"""Subinterpreters High Level Module.""" - -import threading -import weakref -import _interpreters - -# aliases: -from _interpreters import ( - InterpreterError, InterpreterNotFoundError, NotShareableError, - is_shareable, -) - - -__all__ = [ - 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', - 'Interpreter', - 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed', - 'NotShareableError', - 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', -] - - -_queuemod = None - -def __getattr__(name): - if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'): - global create_queue, Queue, QueueEmpty, QueueFull - ns = globals() - from .queues import ( - create as create_queue, - Queue, QueueEmpty, QueueFull, - ) - return ns[name] - else: - raise AttributeError(name) - - -_EXEC_FAILURE_STR = """ -{superstr} - -Uncaught in the interpreter: - -{formatted} -""".strip() - -class ExecutionFailed(InterpreterError): - """An unhandled exception happened during execution. - - This is raised from Interpreter.exec() and Interpreter.call(). - """ - - def __init__(self, excinfo): - msg = excinfo.formatted - if not msg: - if excinfo.type and excinfo.msg: - msg = f'{excinfo.type.__name__}: {excinfo.msg}' - else: - msg = excinfo.type.__name__ or excinfo.msg - super().__init__(msg) - self.excinfo = excinfo - - def __str__(self): - try: - formatted = self.excinfo.errdisplay - except Exception: - return super().__str__() - else: - return _EXEC_FAILURE_STR.format( - superstr=super().__str__(), - formatted=formatted, - ) - - -def create(): - """Return a new (idle) Python interpreter.""" - id = _interpreters.create(reqrefs=True) - return Interpreter(id, _ownsref=True) - - -def list_all(): - """Return all existing interpreters.""" - return [Interpreter(id, _whence=whence) - for id, whence in _interpreters.list_all(require_ready=True)] - - -def get_current(): - """Return the currently running interpreter.""" - id, whence = _interpreters.get_current() - return Interpreter(id, _whence=whence) - - -def get_main(): - """Return the main interpreter.""" - id, whence = _interpreters.get_main() - assert whence == _interpreters.WHENCE_RUNTIME, repr(whence) - return Interpreter(id, _whence=whence) - - -_known = weakref.WeakValueDictionary() - -class Interpreter: - """A single Python interpreter. - - Attributes: - - "id" - the unique process-global ID number for the interpreter - "whence" - indicates where the interpreter was created - - If the interpreter wasn't created by this module - then any method that modifies the interpreter will fail, - i.e. .close(), .prepare_main(), .exec(), and .call() - """ - - _WHENCE_TO_STR = { - _interpreters.WHENCE_UNKNOWN: 'unknown', - _interpreters.WHENCE_RUNTIME: 'runtime init', - _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API', - _interpreters.WHENCE_CAPI: 'C-API', - _interpreters.WHENCE_XI: 'cross-interpreter C-API', - _interpreters.WHENCE_STDLIB: '_interpreters module', - } - - def __new__(cls, id, /, _whence=None, _ownsref=None): - # There is only one instance for any given ID. - if not isinstance(id, int): - raise TypeError(f'id must be an int, got {id!r}') - id = int(id) - if _whence is None: - if _ownsref: - _whence = _interpreters.WHENCE_STDLIB - else: - _whence = _interpreters.whence(id) - assert _whence in cls._WHENCE_TO_STR, repr(_whence) - if _ownsref is None: - _ownsref = (_whence == _interpreters.WHENCE_STDLIB) - try: - self = _known[id] - assert hasattr(self, '_ownsref') - except KeyError: - self = super().__new__(cls) - _known[id] = self - self._id = id - self._whence = _whence - self._ownsref = _ownsref - if _ownsref: - # This may raise InterpreterNotFoundError: - _interpreters.incref(id) - return self - - def __repr__(self): - return f'{type(self).__name__}({self.id})' - - def __hash__(self): - return hash(self._id) - - def __del__(self): - self._decref() - - # for pickling: - def __getnewargs__(self): - return (self._id,) - - # for pickling: - def __getstate__(self): - return None - - def _decref(self): - if not self._ownsref: - return - self._ownsref = False - try: - _interpreters.decref(self._id) - except InterpreterNotFoundError: - pass - - @property - def id(self): - return self._id - - @property - def whence(self): - return self._WHENCE_TO_STR[self._whence] - - def is_running(self): - """Return whether or not the identified interpreter is running.""" - return _interpreters.is_running(self._id) - - # Everything past here is available only to interpreters created by - # interpreters.create(). - - def close(self): - """Finalize and destroy the interpreter. - - Attempting to destroy the current interpreter results - in an InterpreterError. - """ - return _interpreters.destroy(self._id, restrict=True) - - def prepare_main(self, ns=None, /, **kwargs): - """Bind the given values into the interpreter's __main__. - - The values must be shareable. - """ - ns = dict(ns, **kwargs) if ns is not None else kwargs - _interpreters.set___main___attrs(self._id, ns, restrict=True) - - def exec(self, code, /): - """Run the given source code in the interpreter. - - This is essentially the same as calling the builtin "exec" - with this interpreter, using the __dict__ of its __main__ - module as both globals and locals. - - There is no return value. - - If the code raises an unhandled exception then an ExecutionFailed - exception is raised, which summarizes the unhandled exception. - The actual exception is discarded because objects cannot be - shared between interpreters. - - This blocks the current Python thread until done. During - that time, the previous interpreter is allowed to run - in other threads. - """ - excinfo = _interpreters.exec(self._id, code, restrict=True) - if excinfo is not None: - raise ExecutionFailed(excinfo) - - def _call(self, callable, args, kwargs): - res, excinfo = _interpreters.call(self._id, callable, args, kwargs, restrict=True) - if excinfo is not None: - raise ExecutionFailed(excinfo) - return res - - def call(self, callable, /, *args, **kwargs): - """Call the object in the interpreter with given args/kwargs. - - Nearly all callables, args, kwargs, and return values are - supported. All "shareable" objects are supported, as are - "stateless" functions (meaning non-closures that do not use - any globals). This method will fall back to pickle. - - If the callable raises an exception then the error display - (including full traceback) is sent back between the interpreters - and an ExecutionFailed exception is raised, much like what - happens with Interpreter.exec(). - """ - return self._call(callable, args, kwargs) - - def call_in_thread(self, callable, /, *args, **kwargs): - """Return a new thread that calls the object in the interpreter. - - The return value and any raised exception are discarded. - """ - t = threading.Thread(target=self._call, args=(callable, args, kwargs)) - t.start() - return t diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py deleted file mode 100644 index 544e197ba4c..00000000000 --- a/Lib/test/support/interpreters/_crossinterp.py +++ /dev/null @@ -1,102 +0,0 @@ -"""Common code between queues and channels.""" - - -class ItemInterpreterDestroyed(Exception): - """Raised when trying to get an item whose interpreter was destroyed.""" - - -class classonly: - """A non-data descriptor that makes a value only visible on the class. - - This is like the "classmethod" builtin, but does not show up on - instances of the class. It may be used as a decorator. - """ - - def __init__(self, value): - self.value = value - self.getter = classmethod(value).__get__ - self.name = None - - def __set_name__(self, cls, name): - if self.name is not None: - raise TypeError('already used') - self.name = name - - def __get__(self, obj, cls): - if obj is not None: - raise AttributeError(self.name) - # called on the class - return self.getter(None, cls) - - -class UnboundItem: - """Represents a cross-interpreter item no longer bound to an interpreter. - - An item is unbound when the interpreter that added it to the - cross-interpreter container is destroyed. - """ - - __slots__ = () - - @classonly - def singleton(cls, kind, module, name='UNBOUND'): - doc = cls.__doc__.replace('cross-interpreter container', kind) - doc = doc.replace('cross-interpreter', kind) - subclass = type( - f'Unbound{kind.capitalize()}Item', - (cls,), - dict( - _MODULE=module, - _NAME=name, - __doc__=doc, - ), - ) - return object.__new__(subclass) - - _MODULE = __name__ - _NAME = 'UNBOUND' - - def __new__(cls): - raise Exception(f'use {cls._MODULE}.{cls._NAME}') - - def __repr__(self): - return f'{self._MODULE}.{self._NAME}' -# return f'interpreters.queues.UNBOUND' - - -UNBOUND = object.__new__(UnboundItem) -UNBOUND_ERROR = object() -UNBOUND_REMOVE = object() - -_UNBOUND_CONSTANT_TO_FLAG = { - UNBOUND_REMOVE: 1, - UNBOUND_ERROR: 2, - UNBOUND: 3, -} -_UNBOUND_FLAG_TO_CONSTANT = {v: k - for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} - - -def serialize_unbound(unbound): - op = unbound - try: - flag = _UNBOUND_CONSTANT_TO_FLAG[op] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {op!r}') - return flag, - - -def resolve_unbound(flag, exctype_destroyed): - try: - op = _UNBOUND_FLAG_TO_CONSTANT[flag] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') - if op is UNBOUND_REMOVE: - # "remove" not possible here - raise NotImplementedError - elif op is UNBOUND_ERROR: - raise exctype_destroyed("item's original interpreter destroyed") - elif op is UNBOUND: - return UNBOUND - else: - raise NotImplementedError(repr(op)) diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py deleted file mode 100644 index 1724759b75a..00000000000 --- a/Lib/test/support/interpreters/channels.py +++ /dev/null @@ -1,282 +0,0 @@ -"""Cross-interpreter Channels High Level Module.""" - -import time -import _interpchannels as _channels -from . import _crossinterp - -# aliases: -from _interpchannels import ( - ChannelError, ChannelNotFoundError, ChannelClosedError, # noqa: F401 - ChannelEmptyError, ChannelNotEmptyError, # noqa: F401 -) -from ._crossinterp import ( - UNBOUND_ERROR, UNBOUND_REMOVE, -) - - -__all__ = [ - 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', - 'create', 'list_all', - 'SendChannel', 'RecvChannel', - 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError', - 'ItemInterpreterDestroyed', -] - - -class ItemInterpreterDestroyed(ChannelError, - _crossinterp.ItemInterpreterDestroyed): - """Raised from get() and get_nowait().""" - - -UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) - - -def _serialize_unbound(unbound): - if unbound is UNBOUND: - unbound = _crossinterp.UNBOUND - return _crossinterp.serialize_unbound(unbound) - - -def _resolve_unbound(flag): - resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) - if resolved is _crossinterp.UNBOUND: - resolved = UNBOUND - return resolved - - -def create(*, unbounditems=UNBOUND): - """Return (recv, send) for a new cross-interpreter channel. - - The channel may be used to pass data safely between interpreters. - - "unbounditems" sets the default for the send end of the channel. - See SendChannel.send() for supported values. The default value - is UNBOUND, which replaces the unbound item when received. - """ - unbound = _serialize_unbound(unbounditems) - unboundop, = unbound - cid = _channels.create(unboundop, -1) - recv, send = RecvChannel(cid), SendChannel(cid) - send._set_unbound(unboundop, unbounditems) - return recv, send - - -def list_all(): - """Return a list of (recv, send) for all open channels.""" - channels = [] - for cid, unboundop, _ in _channels.list_all(): - chan = _, send = RecvChannel(cid), SendChannel(cid) - if not hasattr(send, '_unboundop'): - send._set_unbound(unboundop) - else: - assert send._unbound[0] == unboundop - channels.append(chan) - return channels - - -class _ChannelEnd: - """The base class for RecvChannel and SendChannel.""" - - _end = None - - def __new__(cls, cid): - self = super().__new__(cls) - if self._end == 'send': - cid = _channels._channel_id(cid, send=True, force=True) - elif self._end == 'recv': - cid = _channels._channel_id(cid, recv=True, force=True) - else: - raise NotImplementedError(self._end) - self._id = cid - return self - - def __repr__(self): - return f'{type(self).__name__}(id={int(self._id)})' - - def __hash__(self): - return hash(self._id) - - def __eq__(self, other): - if isinstance(self, RecvChannel): - if not isinstance(other, RecvChannel): - return NotImplemented - elif not isinstance(other, SendChannel): - return NotImplemented - return other._id == self._id - - # for pickling: - def __getnewargs__(self): - return (int(self._id),) - - # for pickling: - def __getstate__(self): - return None - - @property - def id(self): - return self._id - - @property - def _info(self): - return _channels.get_info(self._id) - - @property - def is_closed(self): - return self._info.closed - - -_NOT_SET = object() - - -class RecvChannel(_ChannelEnd): - """The receiving end of a cross-interpreter channel.""" - - _end = 'recv' - - def recv(self, timeout=None, *, - _sentinel=object(), - _delay=10 / 1000, # 10 milliseconds - ): - """Return the next object from the channel. - - This blocks until an object has been sent, if none have been - sent already. - """ - if timeout is not None: - timeout = int(timeout) - if timeout < 0: - raise ValueError(f'timeout value must be non-negative') - end = time.time() + timeout - obj, unboundop = _channels.recv(self._id, _sentinel) - while obj is _sentinel: - time.sleep(_delay) - if timeout is not None and time.time() >= end: - raise TimeoutError - obj, unboundop = _channels.recv(self._id, _sentinel) - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - return obj - - def recv_nowait(self, default=_NOT_SET): - """Return the next object from the channel. - - If none have been sent then return the default if one - is provided or fail with ChannelEmptyError. Otherwise this - is the same as recv(). - """ - if default is _NOT_SET: - obj, unboundop = _channels.recv(self._id) - else: - obj, unboundop = _channels.recv(self._id, default) - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - return obj - - def close(self): - _channels.close(self._id, recv=True) - - -class SendChannel(_ChannelEnd): - """The sending end of a cross-interpreter channel.""" - - _end = 'send' - -# def __new__(cls, cid, *, _unbound=None): -# if _unbound is None: -# try: -# op = _channels.get_channel_defaults(cid) -# _unbound = (op,) -# except ChannelNotFoundError: -# _unbound = _serialize_unbound(UNBOUND) -# self = super().__new__(cls, cid) -# self._unbound = _unbound -# return self - - def _set_unbound(self, op, items=None): - assert not hasattr(self, '_unbound') - if items is None: - items = _resolve_unbound(op) - unbound = (op, items) - self._unbound = unbound - return unbound - - @property - def unbounditems(self): - try: - _, items = self._unbound - except AttributeError: - op, _ = _channels.get_queue_defaults(self._id) - _, items = self._set_unbound(op) - return items - - @property - def is_closed(self): - info = self._info - return info.closed or info.closing - - def send(self, obj, timeout=None, *, - unbounditems=None, - ): - """Send the object (i.e. its data) to the channel's receiving end. - - This blocks until the object is received. - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True) - - def send_nowait(self, obj, *, - unbounditems=None, - ): - """Send the object to the channel's receiving end. - - If the object is immediately received then return True - (else False). Otherwise this is the same as send(). - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - # XXX Note that at the moment channel_send() only ever returns - # None. This should be fixed when channel_send_wait() is added. - # See bpo-32604 and gh-19829. - return _channels.send(self._id, obj, unboundop, blocking=False) - - def send_buffer(self, obj, timeout=None, *, - unbounditems=None, - ): - """Send the object's buffer to the channel's receiving end. - - This blocks until the object is received. - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - _channels.send_buffer(self._id, obj, unboundop, - timeout=timeout, blocking=True) - - def send_buffer_nowait(self, obj, *, - unbounditems=None, - ): - """Send the object's buffer to the channel's receiving end. - - If the object is immediately received then return True - (else False). Otherwise this is the same as send(). - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - return _channels.send_buffer(self._id, obj, unboundop, blocking=False) - - def close(self): - _channels.close(self._id, send=True) - - -# XXX This is causing leaks (gh-110318): -_channels._register_end_types(SendChannel, RecvChannel) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py deleted file mode 100644 index 99987f2f692..00000000000 --- a/Lib/test/support/interpreters/queues.py +++ /dev/null @@ -1,288 +0,0 @@ -"""Cross-interpreter Queues High Level Module.""" - -import queue -import time -import weakref -import _interpqueues as _queues -from . import _crossinterp - -# aliases: -from _interpqueues import ( - QueueError, QueueNotFoundError, -) -from ._crossinterp import ( - UNBOUND_ERROR, UNBOUND_REMOVE, -) - -__all__ = [ - 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', - 'create', 'list_all', - 'Queue', - 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', - 'ItemInterpreterDestroyed', -] - - -class QueueEmpty(QueueError, queue.Empty): - """Raised from get_nowait() when the queue is empty. - - It is also raised from get() if it times out. - """ - - -class QueueFull(QueueError, queue.Full): - """Raised from put_nowait() when the queue is full. - - It is also raised from put() if it times out. - """ - - -class ItemInterpreterDestroyed(QueueError, - _crossinterp.ItemInterpreterDestroyed): - """Raised from get() and get_nowait().""" - - -_SHARED_ONLY = 0 -_PICKLED = 1 - - -UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) - - -def _serialize_unbound(unbound): - if unbound is UNBOUND: - unbound = _crossinterp.UNBOUND - return _crossinterp.serialize_unbound(unbound) - - -def _resolve_unbound(flag): - resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) - if resolved is _crossinterp.UNBOUND: - resolved = UNBOUND - return resolved - - -def create(maxsize=0, *, unbounditems=UNBOUND): - """Return a new cross-interpreter queue. - - The queue may be used to pass data safely between interpreters. - - "unbounditems" sets the default for Queue.put(); see that method for - supported values. The default value is UNBOUND, which replaces - the unbound item. - """ - unbound = _serialize_unbound(unbounditems) - unboundop, = unbound - qid = _queues.create(maxsize, unboundop, -1) - self = Queue(qid) - self._set_unbound(unboundop, unbounditems) - return self - - -def list_all(): - """Return a list of all open queues.""" - queues = [] - for qid, unboundop, _ in _queues.list_all(): - self = Queue(qid) - if not hasattr(self, '_unbound'): - self._set_unbound(unboundop) - else: - assert self._unbound[0] == unboundop - queues.append(self) - return queues - - -_known_queues = weakref.WeakValueDictionary() - -class Queue: - """A cross-interpreter queue.""" - - def __new__(cls, id, /): - # There is only one instance for any given ID. - if isinstance(id, int): - id = int(id) - else: - raise TypeError(f'id must be an int, got {id!r}') - try: - self = _known_queues[id] - except KeyError: - self = super().__new__(cls) - self._id = id - _known_queues[id] = self - _queues.bind(id) - return self - - def __del__(self): - try: - _queues.release(self._id) - except QueueNotFoundError: - pass - try: - del _known_queues[self._id] - except KeyError: - pass - - def __repr__(self): - return f'{type(self).__name__}({self.id})' - - def __hash__(self): - return hash(self._id) - - # for pickling: - def __getnewargs__(self): - return (self._id,) - - # for pickling: - def __getstate__(self): - return None - - def _set_unbound(self, op, items=None): - assert not hasattr(self, '_unbound') - if items is None: - items = _resolve_unbound(op) - unbound = (op, items) - self._unbound = unbound - return unbound - - @property - def id(self): - return self._id - - @property - def unbounditems(self): - try: - _, items = self._unbound - except AttributeError: - op, _ = _queues.get_queue_defaults(self._id) - _, items = self._set_unbound(op) - return items - - @property - def maxsize(self): - try: - return self._maxsize - except AttributeError: - self._maxsize = _queues.get_maxsize(self._id) - return self._maxsize - - def empty(self): - return self.qsize() == 0 - - def full(self): - return _queues.is_full(self._id) - - def qsize(self): - return _queues.get_count(self._id) - - def put(self, obj, timeout=None, *, - unbounditems=None, - _delay=10 / 1000, # 10 milliseconds - ): - """Add the object to the queue. - - This blocks while the queue is full. - - For most objects, the object received through Queue.get() will - be a new one, equivalent to the original and not sharing any - actual underlying data. The notable exceptions include - cross-interpreter types (like Queue) and memoryview, where the - underlying data is actually shared. Furthermore, some types - can be sent through a queue more efficiently than others. This - group includes various immutable types like int, str, bytes, and - tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable(). - - "unbounditems" controls the behavior of Queue.get() for the given - object if the current interpreter (calling put()) is later - destroyed. - - If "unbounditems" is None (the default) then it uses the - queue's default, set with create_queue(), - which is usually UNBOUND. - - If "unbounditems" is UNBOUND_ERROR then get() will raise an - ItemInterpreterDestroyed exception if the original interpreter - has been destroyed. This does not otherwise affect the queue; - the next call to put() will work like normal, returning the next - item in the queue. - - If "unbounditems" is UNBOUND_REMOVE then the item will be removed - from the queue as soon as the original interpreter is destroyed. - Be aware that this will introduce an imbalance between put() - and get() calls. - - If "unbounditems" is UNBOUND then it is returned by get() in place - of the unbound item. - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - if timeout is not None: - timeout = int(timeout) - if timeout < 0: - raise ValueError(f'timeout value must be non-negative') - end = time.time() + timeout - while True: - try: - _queues.put(self._id, obj, unboundop) - except QueueFull as exc: - if timeout is not None and time.time() >= end: - raise # re-raise - time.sleep(_delay) - else: - break - - def put_nowait(self, obj, *, unbounditems=None): - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - _queues.put(self._id, obj, unboundop) - - def get(self, timeout=None, *, - _delay=10 / 1000, # 10 milliseconds - ): - """Return the next object from the queue. - - This blocks while the queue is empty. - - If the next item's original interpreter has been destroyed - then the "next object" is determined by the value of the - "unbounditems" argument to put(). - """ - if timeout is not None: - timeout = int(timeout) - if timeout < 0: - raise ValueError(f'timeout value must be non-negative') - end = time.time() + timeout - while True: - try: - obj, unboundop = _queues.get(self._id) - except QueueEmpty as exc: - if timeout is not None and time.time() >= end: - raise # re-raise - time.sleep(_delay) - else: - break - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - return obj - - def get_nowait(self): - """Return the next object from the channel. - - If the queue is empty then raise QueueEmpty. Otherwise this - is the same as get(). - """ - try: - obj, unboundop = _queues.get(self._id) - except QueueEmpty as exc: - raise # re-raise - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - return obj - - -_queues._register_heap_types(Queue, QueueEmpty, QueueFull) |