diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2023-12-12 10:43:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-12 17:43:30 +0000 |
commit | a49b427b0265c415d9089da0be39f4b5ccd1f15f (patch) | |
tree | a7ae1247798124110a002bb5a9b088cefa66fad6 /Lib/test/support/interpreters/queues.py | |
parent | cde141717578f22947553db776980aa3e8801353 (diff) | |
download | cpython-a49b427b0265c415d9089da0be39f4b5ccd1f15f.tar.gz cpython-a49b427b0265c415d9089da0be39f4b5ccd1f15f.zip |
gh-76785: More Fixes for test.support.interpreters (gh-113012)
This brings the module (along with the associated extension modules) mostly in sync with PEP 734. There are only a few small things to wrap up.
Diffstat (limited to 'Lib/test/support/interpreters/queues.py')
-rw-r--r-- | Lib/test/support/interpreters/queues.py | 158 |
1 files changed, 87 insertions, 71 deletions
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index ed6b0d551dd..aead0c40ca9 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -3,13 +3,11 @@ import queue import time import weakref -import _xxinterpchannels as _channels -import _xxinterpchannels as _queues +import _xxinterpqueues as _queues # aliases: -from _xxinterpchannels import ( - ChannelError as QueueError, - ChannelNotFoundError as QueueNotFoundError, +from _xxinterpqueues import ( + QueueError, QueueNotFoundError, ) __all__ = [ @@ -19,14 +17,27 @@ __all__ = [ ] +class QueueEmpty(_queues.QueueEmpty, queue.Empty): + """Raised from get_nowait() when the queue is empty. + + It is also raised from get() if it times out. + """ + + +class QueueFull(_queues.QueueFull, queue.Full): + """Raised from put_nowait() when the queue is full. + + It is also raised from put() if it times out. + """ + + def create(maxsize=0): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. """ - # XXX honor maxsize - qid = _queues.create() - return Queue._with_maxsize(qid, maxsize) + qid = _queues.create(maxsize) + return Queue(qid) def list_all(): @@ -35,53 +46,37 @@ def list_all(): for qid in _queues.list_all()] -class QueueEmpty(queue.Empty): - """Raised from get_nowait() when the queue is empty. - - It is also raised from get() if it times out. - """ - - -class QueueFull(queue.Full): - """Raised from put_nowait() when the queue is full. - - It is also raised from put() if it times out. - """ - _known_queues = weakref.WeakValueDictionary() class Queue: """A cross-interpreter queue.""" - @classmethod - def _with_maxsize(cls, id, maxsize): - if not isinstance(maxsize, int): - raise TypeError(f'maxsize must be an int, got {maxsize!r}') - elif maxsize < 0: - maxsize = 0 - else: - maxsize = int(maxsize) - self = cls(id) - self._maxsize = maxsize - return self - def __new__(cls, id, /): # There is only one instance for any given ID. if isinstance(id, int): - id = _channels._channel_id(id, force=False) - elif not isinstance(id, _channels.ChannelID): + id = int(id) + else: raise TypeError(f'id must be an int, got {id!r}') - key = int(id) try: - self = _known_queues[key] + self = _known_queues[id] except KeyError: self = super().__new__(cls) self._id = id - self._maxsize = 0 - _known_queues[key] = self + _known_queues[id] = self + _queues.bind(id) return self + def __del__(self): + try: + _queues.release(self._id) + except QueueNotFoundError: + pass + try: + del _known_queues[self._id] + except KeyError: + pass + def __repr__(self): return f'{type(self).__name__}({self.id})' @@ -90,39 +85,58 @@ class Queue: @property def id(self): - return int(self._id) + return self._id @property def maxsize(self): - return self._maxsize - - @property - def _info(self): - return _channels.get_info(self._id) + try: + return self._maxsize + except AttributeError: + self._maxsize = _queues.get_maxsize(self._id) + return self._maxsize def empty(self): - return self._info.count == 0 + return self.qsize() == 0 def full(self): - if self._maxsize <= 0: - return False - return self._info.count >= self._maxsize + return _queues.is_full(self._id) def qsize(self): - return self._info.count + return _queues.get_count(self._id) - def put(self, obj, timeout=None): - # XXX block if full - _channels.send(self._id, obj, blocking=False) + def put(self, obj, timeout=None, *, + _delay=10 / 1000, # 10 milliseconds + ): + """Add the object to the queue. + + This blocks while the queue is full. + """ + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout + while True: + try: + _queues.put(self._id, obj) + except _queues.QueueFull as exc: + if timeout is not None and time.time() >= end: + exc.__class__ = QueueFull + raise # re-raise + time.sleep(_delay) + else: + break def put_nowait(self, obj): - # XXX raise QueueFull if full - return _channels.send(self._id, obj, blocking=False) + try: + return _queues.put(self._id, obj) + except _queues.QueueFull as exc: + exc.__class__ = QueueFull + raise # re-raise def get(self, timeout=None, *, - _sentinel=object(), - _delay=10 / 1000, # 10 milliseconds - ): + _delay=10 / 1000, # 10 milliseconds + ): """Return the next object from the queue. This blocks while the queue is empty. @@ -132,25 +146,27 @@ class Queue: if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout - obj = _channels.recv(self._id, _sentinel) - while obj is _sentinel: - time.sleep(_delay) - if timeout is not None and time.time() >= end: - raise QueueEmpty - obj = _channels.recv(self._id, _sentinel) + while True: + try: + return _queues.get(self._id) + except _queues.QueueEmpty as exc: + if timeout is not None and time.time() >= end: + exc.__class__ = QueueEmpty + raise # re-raise + time.sleep(_delay) return obj - def get_nowait(self, *, _sentinel=object()): + def get_nowait(self): """Return the next object from the channel. If the queue is empty then raise QueueEmpty. Otherwise this is the same as get(). """ - obj = _channels.recv(self._id, _sentinel) - if obj is _sentinel: - raise QueueEmpty - return obj + try: + return _queues.get(self._id) + except _queues.QueueEmpty as exc: + exc.__class__ = QueueEmpty + raise # re-raise -# XXX add this: -#_channels._register_queue_type(Queue) +_queues._register_queue_type(Queue) |