aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/support/interpreters/queues.py
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2023-12-12 10:43:30 -0700
committerGitHub <noreply@github.com>2023-12-12 17:43:30 +0000
commita49b427b0265c415d9089da0be39f4b5ccd1f15f (patch)
treea7ae1247798124110a002bb5a9b088cefa66fad6 /Lib/test/support/interpreters/queues.py
parentcde141717578f22947553db776980aa3e8801353 (diff)
downloadcpython-a49b427b0265c415d9089da0be39f4b5ccd1f15f.tar.gz
cpython-a49b427b0265c415d9089da0be39f4b5ccd1f15f.zip
gh-76785: More Fixes for test.support.interpreters (gh-113012)
This brings the module (along with the associated extension modules) mostly in sync with PEP 734. There are only a few small things to wrap up.
Diffstat (limited to 'Lib/test/support/interpreters/queues.py')
-rw-r--r--Lib/test/support/interpreters/queues.py158
1 files changed, 87 insertions, 71 deletions
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
index ed6b0d551dd..aead0c40ca9 100644
--- a/Lib/test/support/interpreters/queues.py
+++ b/Lib/test/support/interpreters/queues.py
@@ -3,13 +3,11 @@
import queue
import time
import weakref
-import _xxinterpchannels as _channels
-import _xxinterpchannels as _queues
+import _xxinterpqueues as _queues
# aliases:
-from _xxinterpchannels import (
- ChannelError as QueueError,
- ChannelNotFoundError as QueueNotFoundError,
+from _xxinterpqueues import (
+ QueueError, QueueNotFoundError,
)
__all__ = [
@@ -19,14 +17,27 @@ __all__ = [
]
+class QueueEmpty(_queues.QueueEmpty, queue.Empty):
+ """Raised from get_nowait() when the queue is empty.
+
+ It is also raised from get() if it times out.
+ """
+
+
+class QueueFull(_queues.QueueFull, queue.Full):
+ """Raised from put_nowait() when the queue is full.
+
+ It is also raised from put() if it times out.
+ """
+
+
def create(maxsize=0):
"""Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
"""
- # XXX honor maxsize
- qid = _queues.create()
- return Queue._with_maxsize(qid, maxsize)
+ qid = _queues.create(maxsize)
+ return Queue(qid)
def list_all():
@@ -35,53 +46,37 @@ def list_all():
for qid in _queues.list_all()]
-class QueueEmpty(queue.Empty):
- """Raised from get_nowait() when the queue is empty.
-
- It is also raised from get() if it times out.
- """
-
-
-class QueueFull(queue.Full):
- """Raised from put_nowait() when the queue is full.
-
- It is also raised from put() if it times out.
- """
-
_known_queues = weakref.WeakValueDictionary()
class Queue:
"""A cross-interpreter queue."""
- @classmethod
- def _with_maxsize(cls, id, maxsize):
- if not isinstance(maxsize, int):
- raise TypeError(f'maxsize must be an int, got {maxsize!r}')
- elif maxsize < 0:
- maxsize = 0
- else:
- maxsize = int(maxsize)
- self = cls(id)
- self._maxsize = maxsize
- return self
-
def __new__(cls, id, /):
# There is only one instance for any given ID.
if isinstance(id, int):
- id = _channels._channel_id(id, force=False)
- elif not isinstance(id, _channels.ChannelID):
+ id = int(id)
+ else:
raise TypeError(f'id must be an int, got {id!r}')
- key = int(id)
try:
- self = _known_queues[key]
+ self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
- self._maxsize = 0
- _known_queues[key] = self
+ _known_queues[id] = self
+ _queues.bind(id)
return self
+ def __del__(self):
+ try:
+ _queues.release(self._id)
+ except QueueNotFoundError:
+ pass
+ try:
+ del _known_queues[self._id]
+ except KeyError:
+ pass
+
def __repr__(self):
return f'{type(self).__name__}({self.id})'
@@ -90,39 +85,58 @@ class Queue:
@property
def id(self):
- return int(self._id)
+ return self._id
@property
def maxsize(self):
- return self._maxsize
-
- @property
- def _info(self):
- return _channels.get_info(self._id)
+ try:
+ return self._maxsize
+ except AttributeError:
+ self._maxsize = _queues.get_maxsize(self._id)
+ return self._maxsize
def empty(self):
- return self._info.count == 0
+ return self.qsize() == 0
def full(self):
- if self._maxsize <= 0:
- return False
- return self._info.count >= self._maxsize
+ return _queues.is_full(self._id)
def qsize(self):
- return self._info.count
+ return _queues.get_count(self._id)
- def put(self, obj, timeout=None):
- # XXX block if full
- _channels.send(self._id, obj, blocking=False)
+ def put(self, obj, timeout=None, *,
+ _delay=10 / 1000, # 10 milliseconds
+ ):
+ """Add the object to the queue.
+
+ This blocks while the queue is full.
+ """
+ if timeout is not None:
+ timeout = int(timeout)
+ if timeout < 0:
+ raise ValueError(f'timeout value must be non-negative')
+ end = time.time() + timeout
+ while True:
+ try:
+ _queues.put(self._id, obj)
+ except _queues.QueueFull as exc:
+ if timeout is not None and time.time() >= end:
+ exc.__class__ = QueueFull
+ raise # re-raise
+ time.sleep(_delay)
+ else:
+ break
def put_nowait(self, obj):
- # XXX raise QueueFull if full
- return _channels.send(self._id, obj, blocking=False)
+ try:
+ return _queues.put(self._id, obj)
+ except _queues.QueueFull as exc:
+ exc.__class__ = QueueFull
+ raise # re-raise
def get(self, timeout=None, *,
- _sentinel=object(),
- _delay=10 / 1000, # 10 milliseconds
- ):
+ _delay=10 / 1000, # 10 milliseconds
+ ):
"""Return the next object from the queue.
This blocks while the queue is empty.
@@ -132,25 +146,27 @@ class Queue:
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
- obj = _channels.recv(self._id, _sentinel)
- while obj is _sentinel:
- time.sleep(_delay)
- if timeout is not None and time.time() >= end:
- raise QueueEmpty
- obj = _channels.recv(self._id, _sentinel)
+ while True:
+ try:
+ return _queues.get(self._id)
+ except _queues.QueueEmpty as exc:
+ if timeout is not None and time.time() >= end:
+ exc.__class__ = QueueEmpty
+ raise # re-raise
+ time.sleep(_delay)
return obj
- def get_nowait(self, *, _sentinel=object()):
+ def get_nowait(self):
"""Return the next object from the channel.
If the queue is empty then raise QueueEmpty. Otherwise this
is the same as get().
"""
- obj = _channels.recv(self._id, _sentinel)
- if obj is _sentinel:
- raise QueueEmpty
- return obj
+ try:
+ return _queues.get(self._id)
+ except _queues.QueueEmpty as exc:
+ exc.__class__ = QueueEmpty
+ raise # re-raise
-# XXX add this:
-#_channels._register_queue_type(Queue)
+_queues._register_queue_type(Queue)