diff options
Diffstat (limited to 'Lib/test/support/interpreters/channels.py')
-rw-r--r-- | Lib/test/support/interpreters/channels.py | 282 |
1 files changed, 0 insertions, 282 deletions
diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py deleted file mode 100644 index 1724759b75a..00000000000 --- a/Lib/test/support/interpreters/channels.py +++ /dev/null @@ -1,282 +0,0 @@ -"""Cross-interpreter Channels High Level Module.""" - -import time -import _interpchannels as _channels -from . import _crossinterp - -# aliases: -from _interpchannels import ( - ChannelError, ChannelNotFoundError, ChannelClosedError, # noqa: F401 - ChannelEmptyError, ChannelNotEmptyError, # noqa: F401 -) -from ._crossinterp import ( - UNBOUND_ERROR, UNBOUND_REMOVE, -) - - -__all__ = [ - 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', - 'create', 'list_all', - 'SendChannel', 'RecvChannel', - 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError', - 'ItemInterpreterDestroyed', -] - - -class ItemInterpreterDestroyed(ChannelError, - _crossinterp.ItemInterpreterDestroyed): - """Raised from get() and get_nowait().""" - - -UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) - - -def _serialize_unbound(unbound): - if unbound is UNBOUND: - unbound = _crossinterp.UNBOUND - return _crossinterp.serialize_unbound(unbound) - - -def _resolve_unbound(flag): - resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) - if resolved is _crossinterp.UNBOUND: - resolved = UNBOUND - return resolved - - -def create(*, unbounditems=UNBOUND): - """Return (recv, send) for a new cross-interpreter channel. - - The channel may be used to pass data safely between interpreters. - - "unbounditems" sets the default for the send end of the channel. - See SendChannel.send() for supported values. The default value - is UNBOUND, which replaces the unbound item when received. - """ - unbound = _serialize_unbound(unbounditems) - unboundop, = unbound - cid = _channels.create(unboundop, -1) - recv, send = RecvChannel(cid), SendChannel(cid) - send._set_unbound(unboundop, unbounditems) - return recv, send - - -def list_all(): - """Return a list of (recv, send) for all open channels.""" - channels = [] - for cid, unboundop, _ in _channels.list_all(): - chan = _, send = RecvChannel(cid), SendChannel(cid) - if not hasattr(send, '_unboundop'): - send._set_unbound(unboundop) - else: - assert send._unbound[0] == unboundop - channels.append(chan) - return channels - - -class _ChannelEnd: - """The base class for RecvChannel and SendChannel.""" - - _end = None - - def __new__(cls, cid): - self = super().__new__(cls) - if self._end == 'send': - cid = _channels._channel_id(cid, send=True, force=True) - elif self._end == 'recv': - cid = _channels._channel_id(cid, recv=True, force=True) - else: - raise NotImplementedError(self._end) - self._id = cid - return self - - def __repr__(self): - return f'{type(self).__name__}(id={int(self._id)})' - - def __hash__(self): - return hash(self._id) - - def __eq__(self, other): - if isinstance(self, RecvChannel): - if not isinstance(other, RecvChannel): - return NotImplemented - elif not isinstance(other, SendChannel): - return NotImplemented - return other._id == self._id - - # for pickling: - def __getnewargs__(self): - return (int(self._id),) - - # for pickling: - def __getstate__(self): - return None - - @property - def id(self): - return self._id - - @property - def _info(self): - return _channels.get_info(self._id) - - @property - def is_closed(self): - return self._info.closed - - -_NOT_SET = object() - - -class RecvChannel(_ChannelEnd): - """The receiving end of a cross-interpreter channel.""" - - _end = 'recv' - - def recv(self, timeout=None, *, - _sentinel=object(), - _delay=10 / 1000, # 10 milliseconds - ): - """Return the next object from the channel. - - This blocks until an object has been sent, if none have been - sent already. - """ - if timeout is not None: - timeout = int(timeout) - if timeout < 0: - raise ValueError(f'timeout value must be non-negative') - end = time.time() + timeout - obj, unboundop = _channels.recv(self._id, _sentinel) - while obj is _sentinel: - time.sleep(_delay) - if timeout is not None and time.time() >= end: - raise TimeoutError - obj, unboundop = _channels.recv(self._id, _sentinel) - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - return obj - - def recv_nowait(self, default=_NOT_SET): - """Return the next object from the channel. - - If none have been sent then return the default if one - is provided or fail with ChannelEmptyError. Otherwise this - is the same as recv(). - """ - if default is _NOT_SET: - obj, unboundop = _channels.recv(self._id) - else: - obj, unboundop = _channels.recv(self._id, default) - if unboundop is not None: - assert obj is None, repr(obj) - return _resolve_unbound(unboundop) - return obj - - def close(self): - _channels.close(self._id, recv=True) - - -class SendChannel(_ChannelEnd): - """The sending end of a cross-interpreter channel.""" - - _end = 'send' - -# def __new__(cls, cid, *, _unbound=None): -# if _unbound is None: -# try: -# op = _channels.get_channel_defaults(cid) -# _unbound = (op,) -# except ChannelNotFoundError: -# _unbound = _serialize_unbound(UNBOUND) -# self = super().__new__(cls, cid) -# self._unbound = _unbound -# return self - - def _set_unbound(self, op, items=None): - assert not hasattr(self, '_unbound') - if items is None: - items = _resolve_unbound(op) - unbound = (op, items) - self._unbound = unbound - return unbound - - @property - def unbounditems(self): - try: - _, items = self._unbound - except AttributeError: - op, _ = _channels.get_queue_defaults(self._id) - _, items = self._set_unbound(op) - return items - - @property - def is_closed(self): - info = self._info - return info.closed or info.closing - - def send(self, obj, timeout=None, *, - unbounditems=None, - ): - """Send the object (i.e. its data) to the channel's receiving end. - - This blocks until the object is received. - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True) - - def send_nowait(self, obj, *, - unbounditems=None, - ): - """Send the object to the channel's receiving end. - - If the object is immediately received then return True - (else False). Otherwise this is the same as send(). - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - # XXX Note that at the moment channel_send() only ever returns - # None. This should be fixed when channel_send_wait() is added. - # See bpo-32604 and gh-19829. - return _channels.send(self._id, obj, unboundop, blocking=False) - - def send_buffer(self, obj, timeout=None, *, - unbounditems=None, - ): - """Send the object's buffer to the channel's receiving end. - - This blocks until the object is received. - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - _channels.send_buffer(self._id, obj, unboundop, - timeout=timeout, blocking=True) - - def send_buffer_nowait(self, obj, *, - unbounditems=None, - ): - """Send the object's buffer to the channel's receiving end. - - If the object is immediately received then return True - (else False). Otherwise this is the same as send(). - """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) - return _channels.send_buffer(self._id, obj, unboundop, blocking=False) - - def close(self): - _channels.close(self._id, send=True) - - -# XXX This is causing leaks (gh-110318): -_channels._register_end_types(SendChannel, RecvChannel) |