aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/support/interpreters
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/support/interpreters')
-rw-r--r--Lib/test/support/interpreters/__init__.py257
-rw-r--r--Lib/test/support/interpreters/_crossinterp.py102
-rw-r--r--Lib/test/support/interpreters/channels.py282
-rw-r--r--Lib/test/support/interpreters/queues.py288
4 files changed, 0 insertions, 929 deletions
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py
deleted file mode 100644
index 6d1b0690805..00000000000
--- a/Lib/test/support/interpreters/__init__.py
+++ /dev/null
@@ -1,257 +0,0 @@
-"""Subinterpreters High Level Module."""
-
-import threading
-import weakref
-import _interpreters
-
-# aliases:
-from _interpreters import (
- InterpreterError, InterpreterNotFoundError, NotShareableError,
- is_shareable,
-)
-
-
-__all__ = [
- 'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
- 'Interpreter',
- 'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed',
- 'NotShareableError',
- 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
-]
-
-
-_queuemod = None
-
-def __getattr__(name):
- if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'):
- global create_queue, Queue, QueueEmpty, QueueFull
- ns = globals()
- from .queues import (
- create as create_queue,
- Queue, QueueEmpty, QueueFull,
- )
- return ns[name]
- else:
- raise AttributeError(name)
-
-
-_EXEC_FAILURE_STR = """
-{superstr}
-
-Uncaught in the interpreter:
-
-{formatted}
-""".strip()
-
-class ExecutionFailed(InterpreterError):
- """An unhandled exception happened during execution.
-
- This is raised from Interpreter.exec() and Interpreter.call().
- """
-
- def __init__(self, excinfo):
- msg = excinfo.formatted
- if not msg:
- if excinfo.type and excinfo.msg:
- msg = f'{excinfo.type.__name__}: {excinfo.msg}'
- else:
- msg = excinfo.type.__name__ or excinfo.msg
- super().__init__(msg)
- self.excinfo = excinfo
-
- def __str__(self):
- try:
- formatted = self.excinfo.errdisplay
- except Exception:
- return super().__str__()
- else:
- return _EXEC_FAILURE_STR.format(
- superstr=super().__str__(),
- formatted=formatted,
- )
-
-
-def create():
- """Return a new (idle) Python interpreter."""
- id = _interpreters.create(reqrefs=True)
- return Interpreter(id, _ownsref=True)
-
-
-def list_all():
- """Return all existing interpreters."""
- return [Interpreter(id, _whence=whence)
- for id, whence in _interpreters.list_all(require_ready=True)]
-
-
-def get_current():
- """Return the currently running interpreter."""
- id, whence = _interpreters.get_current()
- return Interpreter(id, _whence=whence)
-
-
-def get_main():
- """Return the main interpreter."""
- id, whence = _interpreters.get_main()
- assert whence == _interpreters.WHENCE_RUNTIME, repr(whence)
- return Interpreter(id, _whence=whence)
-
-
-_known = weakref.WeakValueDictionary()
-
-class Interpreter:
- """A single Python interpreter.
-
- Attributes:
-
- "id" - the unique process-global ID number for the interpreter
- "whence" - indicates where the interpreter was created
-
- If the interpreter wasn't created by this module
- then any method that modifies the interpreter will fail,
- i.e. .close(), .prepare_main(), .exec(), and .call()
- """
-
- _WHENCE_TO_STR = {
- _interpreters.WHENCE_UNKNOWN: 'unknown',
- _interpreters.WHENCE_RUNTIME: 'runtime init',
- _interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
- _interpreters.WHENCE_CAPI: 'C-API',
- _interpreters.WHENCE_XI: 'cross-interpreter C-API',
- _interpreters.WHENCE_STDLIB: '_interpreters module',
- }
-
- def __new__(cls, id, /, _whence=None, _ownsref=None):
- # There is only one instance for any given ID.
- if not isinstance(id, int):
- raise TypeError(f'id must be an int, got {id!r}')
- id = int(id)
- if _whence is None:
- if _ownsref:
- _whence = _interpreters.WHENCE_STDLIB
- else:
- _whence = _interpreters.whence(id)
- assert _whence in cls._WHENCE_TO_STR, repr(_whence)
- if _ownsref is None:
- _ownsref = (_whence == _interpreters.WHENCE_STDLIB)
- try:
- self = _known[id]
- assert hasattr(self, '_ownsref')
- except KeyError:
- self = super().__new__(cls)
- _known[id] = self
- self._id = id
- self._whence = _whence
- self._ownsref = _ownsref
- if _ownsref:
- # This may raise InterpreterNotFoundError:
- _interpreters.incref(id)
- return self
-
- def __repr__(self):
- return f'{type(self).__name__}({self.id})'
-
- def __hash__(self):
- return hash(self._id)
-
- def __del__(self):
- self._decref()
-
- # for pickling:
- def __getnewargs__(self):
- return (self._id,)
-
- # for pickling:
- def __getstate__(self):
- return None
-
- def _decref(self):
- if not self._ownsref:
- return
- self._ownsref = False
- try:
- _interpreters.decref(self._id)
- except InterpreterNotFoundError:
- pass
-
- @property
- def id(self):
- return self._id
-
- @property
- def whence(self):
- return self._WHENCE_TO_STR[self._whence]
-
- def is_running(self):
- """Return whether or not the identified interpreter is running."""
- return _interpreters.is_running(self._id)
-
- # Everything past here is available only to interpreters created by
- # interpreters.create().
-
- def close(self):
- """Finalize and destroy the interpreter.
-
- Attempting to destroy the current interpreter results
- in an InterpreterError.
- """
- return _interpreters.destroy(self._id, restrict=True)
-
- def prepare_main(self, ns=None, /, **kwargs):
- """Bind the given values into the interpreter's __main__.
-
- The values must be shareable.
- """
- ns = dict(ns, **kwargs) if ns is not None else kwargs
- _interpreters.set___main___attrs(self._id, ns, restrict=True)
-
- def exec(self, code, /):
- """Run the given source code in the interpreter.
-
- This is essentially the same as calling the builtin "exec"
- with this interpreter, using the __dict__ of its __main__
- module as both globals and locals.
-
- There is no return value.
-
- If the code raises an unhandled exception then an ExecutionFailed
- exception is raised, which summarizes the unhandled exception.
- The actual exception is discarded because objects cannot be
- shared between interpreters.
-
- This blocks the current Python thread until done. During
- that time, the previous interpreter is allowed to run
- in other threads.
- """
- excinfo = _interpreters.exec(self._id, code, restrict=True)
- if excinfo is not None:
- raise ExecutionFailed(excinfo)
-
- def _call(self, callable, args, kwargs):
- res, excinfo = _interpreters.call(self._id, callable, args, kwargs, restrict=True)
- if excinfo is not None:
- raise ExecutionFailed(excinfo)
- return res
-
- def call(self, callable, /, *args, **kwargs):
- """Call the object in the interpreter with given args/kwargs.
-
- Nearly all callables, args, kwargs, and return values are
- supported. All "shareable" objects are supported, as are
- "stateless" functions (meaning non-closures that do not use
- any globals). This method will fall back to pickle.
-
- If the callable raises an exception then the error display
- (including full traceback) is sent back between the interpreters
- and an ExecutionFailed exception is raised, much like what
- happens with Interpreter.exec().
- """
- return self._call(callable, args, kwargs)
-
- def call_in_thread(self, callable, /, *args, **kwargs):
- """Return a new thread that calls the object in the interpreter.
-
- The return value and any raised exception are discarded.
- """
- t = threading.Thread(target=self._call, args=(callable, args, kwargs))
- t.start()
- return t
diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py
deleted file mode 100644
index 544e197ba4c..00000000000
--- a/Lib/test/support/interpreters/_crossinterp.py
+++ /dev/null
@@ -1,102 +0,0 @@
-"""Common code between queues and channels."""
-
-
-class ItemInterpreterDestroyed(Exception):
- """Raised when trying to get an item whose interpreter was destroyed."""
-
-
-class classonly:
- """A non-data descriptor that makes a value only visible on the class.
-
- This is like the "classmethod" builtin, but does not show up on
- instances of the class. It may be used as a decorator.
- """
-
- def __init__(self, value):
- self.value = value
- self.getter = classmethod(value).__get__
- self.name = None
-
- def __set_name__(self, cls, name):
- if self.name is not None:
- raise TypeError('already used')
- self.name = name
-
- def __get__(self, obj, cls):
- if obj is not None:
- raise AttributeError(self.name)
- # called on the class
- return self.getter(None, cls)
-
-
-class UnboundItem:
- """Represents a cross-interpreter item no longer bound to an interpreter.
-
- An item is unbound when the interpreter that added it to the
- cross-interpreter container is destroyed.
- """
-
- __slots__ = ()
-
- @classonly
- def singleton(cls, kind, module, name='UNBOUND'):
- doc = cls.__doc__.replace('cross-interpreter container', kind)
- doc = doc.replace('cross-interpreter', kind)
- subclass = type(
- f'Unbound{kind.capitalize()}Item',
- (cls,),
- dict(
- _MODULE=module,
- _NAME=name,
- __doc__=doc,
- ),
- )
- return object.__new__(subclass)
-
- _MODULE = __name__
- _NAME = 'UNBOUND'
-
- def __new__(cls):
- raise Exception(f'use {cls._MODULE}.{cls._NAME}')
-
- def __repr__(self):
- return f'{self._MODULE}.{self._NAME}'
-# return f'interpreters.queues.UNBOUND'
-
-
-UNBOUND = object.__new__(UnboundItem)
-UNBOUND_ERROR = object()
-UNBOUND_REMOVE = object()
-
-_UNBOUND_CONSTANT_TO_FLAG = {
- UNBOUND_REMOVE: 1,
- UNBOUND_ERROR: 2,
- UNBOUND: 3,
-}
-_UNBOUND_FLAG_TO_CONSTANT = {v: k
- for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
-
-
-def serialize_unbound(unbound):
- op = unbound
- try:
- flag = _UNBOUND_CONSTANT_TO_FLAG[op]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
- return flag,
-
-
-def resolve_unbound(flag, exctype_destroyed):
- try:
- op = _UNBOUND_FLAG_TO_CONSTANT[flag]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
- if op is UNBOUND_REMOVE:
- # "remove" not possible here
- raise NotImplementedError
- elif op is UNBOUND_ERROR:
- raise exctype_destroyed("item's original interpreter destroyed")
- elif op is UNBOUND:
- return UNBOUND
- else:
- raise NotImplementedError(repr(op))
diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py
deleted file mode 100644
index 1724759b75a..00000000000
--- a/Lib/test/support/interpreters/channels.py
+++ /dev/null
@@ -1,282 +0,0 @@
-"""Cross-interpreter Channels High Level Module."""
-
-import time
-import _interpchannels as _channels
-from . import _crossinterp
-
-# aliases:
-from _interpchannels import (
- ChannelError, ChannelNotFoundError, ChannelClosedError, # noqa: F401
- ChannelEmptyError, ChannelNotEmptyError, # noqa: F401
-)
-from ._crossinterp import (
- UNBOUND_ERROR, UNBOUND_REMOVE,
-)
-
-
-__all__ = [
- 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
- 'create', 'list_all',
- 'SendChannel', 'RecvChannel',
- 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError',
- 'ItemInterpreterDestroyed',
-]
-
-
-class ItemInterpreterDestroyed(ChannelError,
- _crossinterp.ItemInterpreterDestroyed):
- """Raised from get() and get_nowait()."""
-
-
-UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
-
-
-def _serialize_unbound(unbound):
- if unbound is UNBOUND:
- unbound = _crossinterp.UNBOUND
- return _crossinterp.serialize_unbound(unbound)
-
-
-def _resolve_unbound(flag):
- resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
- if resolved is _crossinterp.UNBOUND:
- resolved = UNBOUND
- return resolved
-
-
-def create(*, unbounditems=UNBOUND):
- """Return (recv, send) for a new cross-interpreter channel.
-
- The channel may be used to pass data safely between interpreters.
-
- "unbounditems" sets the default for the send end of the channel.
- See SendChannel.send() for supported values. The default value
- is UNBOUND, which replaces the unbound item when received.
- """
- unbound = _serialize_unbound(unbounditems)
- unboundop, = unbound
- cid = _channels.create(unboundop, -1)
- recv, send = RecvChannel(cid), SendChannel(cid)
- send._set_unbound(unboundop, unbounditems)
- return recv, send
-
-
-def list_all():
- """Return a list of (recv, send) for all open channels."""
- channels = []
- for cid, unboundop, _ in _channels.list_all():
- chan = _, send = RecvChannel(cid), SendChannel(cid)
- if not hasattr(send, '_unboundop'):
- send._set_unbound(unboundop)
- else:
- assert send._unbound[0] == unboundop
- channels.append(chan)
- return channels
-
-
-class _ChannelEnd:
- """The base class for RecvChannel and SendChannel."""
-
- _end = None
-
- def __new__(cls, cid):
- self = super().__new__(cls)
- if self._end == 'send':
- cid = _channels._channel_id(cid, send=True, force=True)
- elif self._end == 'recv':
- cid = _channels._channel_id(cid, recv=True, force=True)
- else:
- raise NotImplementedError(self._end)
- self._id = cid
- return self
-
- def __repr__(self):
- return f'{type(self).__name__}(id={int(self._id)})'
-
- def __hash__(self):
- return hash(self._id)
-
- def __eq__(self, other):
- if isinstance(self, RecvChannel):
- if not isinstance(other, RecvChannel):
- return NotImplemented
- elif not isinstance(other, SendChannel):
- return NotImplemented
- return other._id == self._id
-
- # for pickling:
- def __getnewargs__(self):
- return (int(self._id),)
-
- # for pickling:
- def __getstate__(self):
- return None
-
- @property
- def id(self):
- return self._id
-
- @property
- def _info(self):
- return _channels.get_info(self._id)
-
- @property
- def is_closed(self):
- return self._info.closed
-
-
-_NOT_SET = object()
-
-
-class RecvChannel(_ChannelEnd):
- """The receiving end of a cross-interpreter channel."""
-
- _end = 'recv'
-
- def recv(self, timeout=None, *,
- _sentinel=object(),
- _delay=10 / 1000, # 10 milliseconds
- ):
- """Return the next object from the channel.
-
- This blocks until an object has been sent, if none have been
- sent already.
- """
- if timeout is not None:
- timeout = int(timeout)
- if timeout < 0:
- raise ValueError(f'timeout value must be non-negative')
- end = time.time() + timeout
- obj, unboundop = _channels.recv(self._id, _sentinel)
- while obj is _sentinel:
- time.sleep(_delay)
- if timeout is not None and time.time() >= end:
- raise TimeoutError
- obj, unboundop = _channels.recv(self._id, _sentinel)
- if unboundop is not None:
- assert obj is None, repr(obj)
- return _resolve_unbound(unboundop)
- return obj
-
- def recv_nowait(self, default=_NOT_SET):
- """Return the next object from the channel.
-
- If none have been sent then return the default if one
- is provided or fail with ChannelEmptyError. Otherwise this
- is the same as recv().
- """
- if default is _NOT_SET:
- obj, unboundop = _channels.recv(self._id)
- else:
- obj, unboundop = _channels.recv(self._id, default)
- if unboundop is not None:
- assert obj is None, repr(obj)
- return _resolve_unbound(unboundop)
- return obj
-
- def close(self):
- _channels.close(self._id, recv=True)
-
-
-class SendChannel(_ChannelEnd):
- """The sending end of a cross-interpreter channel."""
-
- _end = 'send'
-
-# def __new__(cls, cid, *, _unbound=None):
-# if _unbound is None:
-# try:
-# op = _channels.get_channel_defaults(cid)
-# _unbound = (op,)
-# except ChannelNotFoundError:
-# _unbound = _serialize_unbound(UNBOUND)
-# self = super().__new__(cls, cid)
-# self._unbound = _unbound
-# return self
-
- def _set_unbound(self, op, items=None):
- assert not hasattr(self, '_unbound')
- if items is None:
- items = _resolve_unbound(op)
- unbound = (op, items)
- self._unbound = unbound
- return unbound
-
- @property
- def unbounditems(self):
- try:
- _, items = self._unbound
- except AttributeError:
- op, _ = _channels.get_queue_defaults(self._id)
- _, items = self._set_unbound(op)
- return items
-
- @property
- def is_closed(self):
- info = self._info
- return info.closed or info.closing
-
- def send(self, obj, timeout=None, *,
- unbounditems=None,
- ):
- """Send the object (i.e. its data) to the channel's receiving end.
-
- This blocks until the object is received.
- """
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
-
- def send_nowait(self, obj, *,
- unbounditems=None,
- ):
- """Send the object to the channel's receiving end.
-
- If the object is immediately received then return True
- (else False). Otherwise this is the same as send().
- """
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- # XXX Note that at the moment channel_send() only ever returns
- # None. This should be fixed when channel_send_wait() is added.
- # See bpo-32604 and gh-19829.
- return _channels.send(self._id, obj, unboundop, blocking=False)
-
- def send_buffer(self, obj, timeout=None, *,
- unbounditems=None,
- ):
- """Send the object's buffer to the channel's receiving end.
-
- This blocks until the object is received.
- """
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- _channels.send_buffer(self._id, obj, unboundop,
- timeout=timeout, blocking=True)
-
- def send_buffer_nowait(self, obj, *,
- unbounditems=None,
- ):
- """Send the object's buffer to the channel's receiving end.
-
- If the object is immediately received then return True
- (else False). Otherwise this is the same as send().
- """
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
-
- def close(self):
- _channels.close(self._id, send=True)
-
-
-# XXX This is causing leaks (gh-110318):
-_channels._register_end_types(SendChannel, RecvChannel)
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
deleted file mode 100644
index 99987f2f692..00000000000
--- a/Lib/test/support/interpreters/queues.py
+++ /dev/null
@@ -1,288 +0,0 @@
-"""Cross-interpreter Queues High Level Module."""
-
-import queue
-import time
-import weakref
-import _interpqueues as _queues
-from . import _crossinterp
-
-# aliases:
-from _interpqueues import (
- QueueError, QueueNotFoundError,
-)
-from ._crossinterp import (
- UNBOUND_ERROR, UNBOUND_REMOVE,
-)
-
-__all__ = [
- 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
- 'create', 'list_all',
- 'Queue',
- 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
- 'ItemInterpreterDestroyed',
-]
-
-
-class QueueEmpty(QueueError, queue.Empty):
- """Raised from get_nowait() when the queue is empty.
-
- It is also raised from get() if it times out.
- """
-
-
-class QueueFull(QueueError, queue.Full):
- """Raised from put_nowait() when the queue is full.
-
- It is also raised from put() if it times out.
- """
-
-
-class ItemInterpreterDestroyed(QueueError,
- _crossinterp.ItemInterpreterDestroyed):
- """Raised from get() and get_nowait()."""
-
-
-_SHARED_ONLY = 0
-_PICKLED = 1
-
-
-UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
-
-
-def _serialize_unbound(unbound):
- if unbound is UNBOUND:
- unbound = _crossinterp.UNBOUND
- return _crossinterp.serialize_unbound(unbound)
-
-
-def _resolve_unbound(flag):
- resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
- if resolved is _crossinterp.UNBOUND:
- resolved = UNBOUND
- return resolved
-
-
-def create(maxsize=0, *, unbounditems=UNBOUND):
- """Return a new cross-interpreter queue.
-
- The queue may be used to pass data safely between interpreters.
-
- "unbounditems" sets the default for Queue.put(); see that method for
- supported values. The default value is UNBOUND, which replaces
- the unbound item.
- """
- unbound = _serialize_unbound(unbounditems)
- unboundop, = unbound
- qid = _queues.create(maxsize, unboundop, -1)
- self = Queue(qid)
- self._set_unbound(unboundop, unbounditems)
- return self
-
-
-def list_all():
- """Return a list of all open queues."""
- queues = []
- for qid, unboundop, _ in _queues.list_all():
- self = Queue(qid)
- if not hasattr(self, '_unbound'):
- self._set_unbound(unboundop)
- else:
- assert self._unbound[0] == unboundop
- queues.append(self)
- return queues
-
-
-_known_queues = weakref.WeakValueDictionary()
-
-class Queue:
- """A cross-interpreter queue."""
-
- def __new__(cls, id, /):
- # There is only one instance for any given ID.
- if isinstance(id, int):
- id = int(id)
- else:
- raise TypeError(f'id must be an int, got {id!r}')
- try:
- self = _known_queues[id]
- except KeyError:
- self = super().__new__(cls)
- self._id = id
- _known_queues[id] = self
- _queues.bind(id)
- return self
-
- def __del__(self):
- try:
- _queues.release(self._id)
- except QueueNotFoundError:
- pass
- try:
- del _known_queues[self._id]
- except KeyError:
- pass
-
- def __repr__(self):
- return f'{type(self).__name__}({self.id})'
-
- def __hash__(self):
- return hash(self._id)
-
- # for pickling:
- def __getnewargs__(self):
- return (self._id,)
-
- # for pickling:
- def __getstate__(self):
- return None
-
- def _set_unbound(self, op, items=None):
- assert not hasattr(self, '_unbound')
- if items is None:
- items = _resolve_unbound(op)
- unbound = (op, items)
- self._unbound = unbound
- return unbound
-
- @property
- def id(self):
- return self._id
-
- @property
- def unbounditems(self):
- try:
- _, items = self._unbound
- except AttributeError:
- op, _ = _queues.get_queue_defaults(self._id)
- _, items = self._set_unbound(op)
- return items
-
- @property
- def maxsize(self):
- try:
- return self._maxsize
- except AttributeError:
- self._maxsize = _queues.get_maxsize(self._id)
- return self._maxsize
-
- def empty(self):
- return self.qsize() == 0
-
- def full(self):
- return _queues.is_full(self._id)
-
- def qsize(self):
- return _queues.get_count(self._id)
-
- def put(self, obj, timeout=None, *,
- unbounditems=None,
- _delay=10 / 1000, # 10 milliseconds
- ):
- """Add the object to the queue.
-
- This blocks while the queue is full.
-
- For most objects, the object received through Queue.get() will
- be a new one, equivalent to the original and not sharing any
- actual underlying data. The notable exceptions include
- cross-interpreter types (like Queue) and memoryview, where the
- underlying data is actually shared. Furthermore, some types
- can be sent through a queue more efficiently than others. This
- group includes various immutable types like int, str, bytes, and
- tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable().
-
- "unbounditems" controls the behavior of Queue.get() for the given
- object if the current interpreter (calling put()) is later
- destroyed.
-
- If "unbounditems" is None (the default) then it uses the
- queue's default, set with create_queue(),
- which is usually UNBOUND.
-
- If "unbounditems" is UNBOUND_ERROR then get() will raise an
- ItemInterpreterDestroyed exception if the original interpreter
- has been destroyed. This does not otherwise affect the queue;
- the next call to put() will work like normal, returning the next
- item in the queue.
-
- If "unbounditems" is UNBOUND_REMOVE then the item will be removed
- from the queue as soon as the original interpreter is destroyed.
- Be aware that this will introduce an imbalance between put()
- and get() calls.
-
- If "unbounditems" is UNBOUND then it is returned by get() in place
- of the unbound item.
- """
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- if timeout is not None:
- timeout = int(timeout)
- if timeout < 0:
- raise ValueError(f'timeout value must be non-negative')
- end = time.time() + timeout
- while True:
- try:
- _queues.put(self._id, obj, unboundop)
- except QueueFull as exc:
- if timeout is not None and time.time() >= end:
- raise # re-raise
- time.sleep(_delay)
- else:
- break
-
- def put_nowait(self, obj, *, unbounditems=None):
- if unbounditems is None:
- unboundop = -1
- else:
- unboundop, = _serialize_unbound(unbounditems)
- _queues.put(self._id, obj, unboundop)
-
- def get(self, timeout=None, *,
- _delay=10 / 1000, # 10 milliseconds
- ):
- """Return the next object from the queue.
-
- This blocks while the queue is empty.
-
- If the next item's original interpreter has been destroyed
- then the "next object" is determined by the value of the
- "unbounditems" argument to put().
- """
- if timeout is not None:
- timeout = int(timeout)
- if timeout < 0:
- raise ValueError(f'timeout value must be non-negative')
- end = time.time() + timeout
- while True:
- try:
- obj, unboundop = _queues.get(self._id)
- except QueueEmpty as exc:
- if timeout is not None and time.time() >= end:
- raise # re-raise
- time.sleep(_delay)
- else:
- break
- if unboundop is not None:
- assert obj is None, repr(obj)
- return _resolve_unbound(unboundop)
- return obj
-
- def get_nowait(self):
- """Return the next object from the channel.
-
- If the queue is empty then raise QueueEmpty. Otherwise this
- is the same as get().
- """
- try:
- obj, unboundop = _queues.get(self._id)
- except QueueEmpty as exc:
- raise # re-raise
- if unboundop is not None:
- assert obj is None, repr(obj)
- return _resolve_unbound(unboundop)
- return obj
-
-
-_queues._register_heap_types(Queue, QueueEmpty, QueueFull)