aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2024-07-15 13:43:59 -0600
committerGitHub <noreply@github.com>2024-07-15 19:43:59 +0000
commit8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a (patch)
tree517e2f87dc7a278d390cfd51c91a03a694c6c7f2
parentfd085a411ed2ccc9bde2338cf50068bc7f213ece (diff)
downloadcpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.tar.gz
cpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.zip
gh-76785: Expand How Interpreter Channels Handle Interpreter Finalization (gh-121805)
See 6b98b274b6 for an explanation of the problem and solution. Here I've applied the solution to channels.
-rw-r--r--Lib/test/support/interpreters/_crossinterp.py102
-rw-r--r--Lib/test/support/interpreters/channels.py110
-rw-r--r--Lib/test/support/interpreters/queues.py60
-rw-r--r--Lib/test/test__interpchannels.py275
-rw-r--r--Lib/test/test_interpreters/test_channels.py222
-rw-r--r--Lib/test/test_interpreters/test_queues.py4
-rw-r--r--Modules/_interpchannelsmodule.c344
-rw-r--r--Modules/_interpqueuesmodule.c42
-rw-r--r--Modules/_interpreters_common.h45
9 files changed, 898 insertions, 306 deletions
diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py
new file mode 100644
index 00000000000..544e197ba4c
--- /dev/null
+++ b/Lib/test/support/interpreters/_crossinterp.py
@@ -0,0 +1,102 @@
+"""Common code between queues and channels."""
+
+
+class ItemInterpreterDestroyed(Exception):
+ """Raised when trying to get an item whose interpreter was destroyed."""
+
+
+class classonly:
+ """A non-data descriptor that makes a value only visible on the class.
+
+ This is like the "classmethod" builtin, but does not show up on
+ instances of the class. It may be used as a decorator.
+ """
+
+ def __init__(self, value):
+ self.value = value
+ self.getter = classmethod(value).__get__
+ self.name = None
+
+ def __set_name__(self, cls, name):
+ if self.name is not None:
+ raise TypeError('already used')
+ self.name = name
+
+ def __get__(self, obj, cls):
+ if obj is not None:
+ raise AttributeError(self.name)
+ # called on the class
+ return self.getter(None, cls)
+
+
+class UnboundItem:
+ """Represents a cross-interpreter item no longer bound to an interpreter.
+
+ An item is unbound when the interpreter that added it to the
+ cross-interpreter container is destroyed.
+ """
+
+ __slots__ = ()
+
+ @classonly
+ def singleton(cls, kind, module, name='UNBOUND'):
+ doc = cls.__doc__.replace('cross-interpreter container', kind)
+ doc = doc.replace('cross-interpreter', kind)
+ subclass = type(
+ f'Unbound{kind.capitalize()}Item',
+ (cls,),
+ dict(
+ _MODULE=module,
+ _NAME=name,
+ __doc__=doc,
+ ),
+ )
+ return object.__new__(subclass)
+
+ _MODULE = __name__
+ _NAME = 'UNBOUND'
+
+ def __new__(cls):
+ raise Exception(f'use {cls._MODULE}.{cls._NAME}')
+
+ def __repr__(self):
+ return f'{self._MODULE}.{self._NAME}'
+# return f'interpreters.queues.UNBOUND'
+
+
+UNBOUND = object.__new__(UnboundItem)
+UNBOUND_ERROR = object()
+UNBOUND_REMOVE = object()
+
+_UNBOUND_CONSTANT_TO_FLAG = {
+ UNBOUND_REMOVE: 1,
+ UNBOUND_ERROR: 2,
+ UNBOUND: 3,
+}
+_UNBOUND_FLAG_TO_CONSTANT = {v: k
+ for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
+
+
+def serialize_unbound(unbound):
+ op = unbound
+ try:
+ flag = _UNBOUND_CONSTANT_TO_FLAG[op]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
+ return flag,
+
+
+def resolve_unbound(flag, exctype_destroyed):
+ try:
+ op = _UNBOUND_FLAG_TO_CONSTANT[flag]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
+ if op is UNBOUND_REMOVE:
+ # "remove" not possible here
+ raise NotImplementedError
+ elif op is UNBOUND_ERROR:
+ raise exctype_destroyed("item's original interpreter destroyed")
+ elif op is UNBOUND:
+ return UNBOUND
+ else:
+ raise NotImplementedError(repr(op))
diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py
index fbae7e634cf..d2bd93d77f7 100644
--- a/Lib/test/support/interpreters/channels.py
+++ b/Lib/test/support/interpreters/channels.py
@@ -2,35 +2,68 @@
import time
import _interpchannels as _channels
+from . import _crossinterp
# aliases:
from _interpchannels import (
ChannelError, ChannelNotFoundError, ChannelClosedError,
ChannelEmptyError, ChannelNotEmptyError,
)
+from ._crossinterp import (
+ UNBOUND_ERROR, UNBOUND_REMOVE,
+)
__all__ = [
+ 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
'create', 'list_all',
'SendChannel', 'RecvChannel',
'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError',
+ 'ItemInterpreterDestroyed',
]
-def create():
+class ItemInterpreterDestroyed(ChannelError,
+ _crossinterp.ItemInterpreterDestroyed):
+ """Raised from get() and get_nowait()."""
+
+
+UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
+
+
+def _serialize_unbound(unbound):
+ if unbound is UNBOUND:
+ unbound = _crossinterp.UNBOUND
+ return _crossinterp.serialize_unbound(unbound)
+
+
+def _resolve_unbound(flag):
+ resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
+ if resolved is _crossinterp.UNBOUND:
+ resolved = UNBOUND
+ return resolved
+
+
+def create(*, unbounditems=UNBOUND):
"""Return (recv, send) for a new cross-interpreter channel.
The channel may be used to pass data safely between interpreters.
+
+ "unbounditems" sets the default for the send end of the channel.
+ See SendChannel.send() for supported values. The default value
+ is UNBOUND, which replaces the unbound item when received.
"""
- cid = _channels.create()
- recv, send = RecvChannel(cid), SendChannel(cid)
+ unbound = _serialize_unbound(unbounditems)
+ unboundop, = unbound
+ cid = _channels.create(unboundop)
+ recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
return recv, send
def list_all():
"""Return a list of (recv, send) for all open channels."""
- return [(RecvChannel(cid), SendChannel(cid))
- for cid in _channels.list_all()]
+ return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
+ for cid, unbound in _channels.list_all()]
class _ChannelEnd:
@@ -106,12 +139,15 @@ class RecvChannel(_ChannelEnd):
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
- obj = _channels.recv(self._id, _sentinel)
+ obj, unboundop = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
if timeout is not None and time.time() >= end:
raise TimeoutError
- obj = _channels.recv(self._id, _sentinel)
+ obj, unboundop = _channels.recv(self._id, _sentinel)
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
return obj
def recv_nowait(self, default=_NOT_SET):
@@ -122,9 +158,13 @@ class RecvChannel(_ChannelEnd):
is the same as recv().
"""
if default is _NOT_SET:
- return _channels.recv(self._id)
+ obj, unboundop = _channels.recv(self._id)
else:
- return _channels.recv(self._id, default)
+ obj, unboundop = _channels.recv(self._id, default)
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
+ return obj
def close(self):
_channels.close(self._id, recv=True)
@@ -135,43 +175,79 @@ class SendChannel(_ChannelEnd):
_end = 'send'
+ def __new__(cls, cid, *, _unbound=None):
+ if _unbound is None:
+ try:
+ op = _channels.get_channel_defaults(cid)
+ _unbound = (op,)
+ except ChannelNotFoundError:
+ _unbound = _serialize_unbound(UNBOUND)
+ self = super().__new__(cls, cid)
+ self._unbound = _unbound
+ return self
+
@property
def is_closed(self):
info = self._info
return info.closed or info.closing
- def send(self, obj, timeout=None):
+ def send(self, obj, timeout=None, *,
+ unbound=None,
+ ):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send(self._id, obj, timeout=timeout, blocking=True)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
- def send_nowait(self, obj):
+ def send_nowait(self, obj, *,
+ unbound=None,
+ ):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
- return _channels.send(self._id, obj, blocking=False)
+ return _channels.send(self._id, obj, unboundop, blocking=False)
- def send_buffer(self, obj, timeout=None):
+ def send_buffer(self, obj, timeout=None, *,
+ unbound=None,
+ ):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ _channels.send_buffer(self._id, obj, unboundop,
+ timeout=timeout, blocking=True)
- def send_buffer_nowait(self, obj):
+ def send_buffer_nowait(self, obj, *,
+ unbound=None,
+ ):
"""Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
- return _channels.send_buffer(self._id, obj, blocking=False)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
def close(self):
_channels.close(self._id, send=True)
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
index 402ceffd1bb..deb8e8613af 100644
--- a/Lib/test/support/interpreters/queues.py
+++ b/Lib/test/support/interpreters/queues.py
@@ -5,11 +5,15 @@ import queue
import time
import weakref
import _interpqueues as _queues
+from . import _crossinterp
# aliases:
from _interpqueues import (
QueueError, QueueNotFoundError,
)
+from ._crossinterp import (
+ UNBOUND_ERROR, UNBOUND_REMOVE,
+)
__all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
@@ -34,7 +38,8 @@ class QueueFull(QueueError, queue.Full):
"""
-class ItemInterpreterDestroyed(QueueError):
+class ItemInterpreterDestroyed(QueueError,
+ _crossinterp.ItemInterpreterDestroyed):
"""Raised from get() and get_nowait()."""
@@ -42,57 +47,20 @@ _SHARED_ONLY = 0
_PICKLED = 1
-class UnboundItem:
- """Represents a Queue item no longer bound to an interpreter.
-
- An item is unbound when the interpreter that added it to the queue
- is destroyed.
- """
-
- __slots__ = ()
-
- def __new__(cls):
- return UNBOUND
-
- def __repr__(self):
- return f'interpreters.queues.UNBOUND'
-
-
-UNBOUND = object.__new__(UnboundItem)
-UNBOUND_ERROR = object()
-UNBOUND_REMOVE = object()
+UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
-_UNBOUND_CONSTANT_TO_FLAG = {
- UNBOUND_REMOVE: 1,
- UNBOUND_ERROR: 2,
- UNBOUND: 3,
-}
-_UNBOUND_FLAG_TO_CONSTANT = {v: k
- for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
def _serialize_unbound(unbound):
- op = unbound
- try:
- flag = _UNBOUND_CONSTANT_TO_FLAG[op]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
- return flag,
+ if unbound is UNBOUND:
+ unbound = _crossinterp.UNBOUND
+ return _crossinterp.serialize_unbound(unbound)
def _resolve_unbound(flag):
- try:
- op = _UNBOUND_FLAG_TO_CONSTANT[flag]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
- if op is UNBOUND_REMOVE:
- # "remove" not possible here
- raise NotImplementedError
- elif op is UNBOUND_ERROR:
- raise ItemInterpreterDestroyed("item's original interpreter destroyed")
- elif op is UNBOUND:
- return UNBOUND
- else:
- raise NotImplementedError(repr(op))
+ resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
+ if resolved is _crossinterp.UNBOUND:
+ resolved = UNBOUND
+ return resolved
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
diff --git a/Lib/test/test__interpchannels.py b/Lib/test/test__interpchannels.py
index b76c58917c0..4a7f04b9df9 100644
--- a/Lib/test/test__interpchannels.py
+++ b/Lib/test/test__interpchannels.py
@@ -8,6 +8,8 @@ import unittest
from test.support import import_helper
+_channels = import_helper.import_module('_interpchannels')
+from test.support.interpreters import _crossinterp
from test.test__interpreters import (
_interpreters,
_run_output,
@@ -15,7 +17,7 @@ from test.test__interpreters import (
)
-_channels = import_helper.import_module('_interpchannels')
+REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
# Additional tests are found in Lib/test/test_interpreters/test_channels.py.
@@ -29,9 +31,19 @@ _channels = import_helper.import_module('_interpchannels')
def recv_wait(cid):
while True:
try:
- return _channels.recv(cid)
+ obj, unboundop = _channels.recv(cid)
except _channels.ChannelEmptyError:
time.sleep(0.1)
+ else:
+ assert unboundop is None, repr(unboundop)
+ return obj
+
+
+def recv_nowait(cid, *args, unbound=False):
+ obj, unboundop = _channels.recv(cid, *args)
+ assert (unboundop is None) != unbound, repr(unboundop)
+ return obj
+
#@contextmanager
#def run_threaded(id, source, **shared):
@@ -212,7 +224,7 @@ def _run_action(cid, action, end, state):
else:
raise Exception('expected ChannelEmptyError')
else:
- _channels.recv(cid)
+ recv_nowait(cid)
return state.decr()
else:
raise ValueError(end)
@@ -235,7 +247,7 @@ def _run_action(cid, action, end, state):
def clean_up_channels():
- for cid in _channels.list_all():
+ for cid, _ in _channels.list_all():
try:
_channels.destroy(cid)
except _channels.ChannelNotFoundError:
@@ -297,7 +309,7 @@ class ChannelIDTests(TestBase):
_channels._channel_id(10, send=False, recv=False)
def test_does_not_exist(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(_channels.ChannelNotFoundError):
_channels._channel_id(int(cid) + 1) # unforced
@@ -319,9 +331,9 @@ class ChannelIDTests(TestBase):
self.assertEqual(repr(cid), 'ChannelID(10)')
def test_equality(self):
- cid1 = _channels.create()
+ cid1 = _channels.create(REPLACE)
cid2 = _channels._channel_id(int(cid1))
- cid3 = _channels.create()
+ cid3 = _channels.create(REPLACE)
self.assertTrue(cid1 == cid1)
self.assertTrue(cid1 == cid2)
@@ -341,11 +353,11 @@ class ChannelIDTests(TestBase):
self.assertTrue(cid1 != cid3)
def test_shareable(self):
- chan = _channels.create()
+ chan = _channels.create(REPLACE)
- obj = _channels.create()
+ obj = _channels.create(REPLACE)
_channels.send(chan, obj, blocking=False)
- got = _channels.recv(chan)
+ got = recv_nowait(chan)
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
@@ -356,15 +368,15 @@ class ChannelIDTests(TestBase):
class ChannelTests(TestBase):
def test_create_cid(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
self.assertIsInstance(cid, _channels.ChannelID)
def test_sequential_ids(self):
- before = _channels.list_all()
- id1 = _channels.create()
- id2 = _channels.create()
- id3 = _channels.create()
- after = _channels.list_all()
+ before = [cid for cid, _ in _channels.list_all()]
+ id1 = _channels.create(REPLACE)
+ id2 = _channels.create(REPLACE)
+ id3 = _channels.create(REPLACE)
+ after = [cid for cid, _ in _channels.list_all()]
self.assertEqual(id2, int(id1) + 1)
self.assertEqual(id3, int(id2) + 1)
@@ -374,7 +386,7 @@ class ChannelTests(TestBase):
id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _interpchannels as _channels
- cid = _channels.create()
+ cid = _channels.create(3)
print(cid)
"""))
cid1 = int(out.strip())
@@ -382,7 +394,7 @@ class ChannelTests(TestBase):
id2 = _interpreters.create()
out = _run_output(id2, dedent("""
import _interpchannels as _channels
- cid = _channels.create()
+ cid = _channels.create(3)
print(cid)
"""))
cid2 = int(out.strip())
@@ -392,7 +404,7 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_none(self):
"""Test listing interpreters for a channel with no associations."""
# Test for channel with no associated _interpreters.
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
send_interps = _channels.list_interpreters(cid, send=True)
recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, [])
@@ -401,7 +413,7 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_basic(self):
"""Test basic listing channel _interpreters."""
interp0, *_ = _interpreters.get_main()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False)
# Test for a channel that has one end associated to an interpreter.
send_interps = _channels.list_interpreters(cid, send=True)
@@ -412,7 +424,7 @@ class ChannelTests(TestBase):
interp1 = _interpreters.create()
_run_output(interp1, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
# Test for channel that has both ends associated to an interpreter.
send_interps = _channels.list_interpreters(cid, send=True)
@@ -426,7 +438,7 @@ class ChannelTests(TestBase):
interp1 = _interpreters.create()
interp2 = _interpreters.create()
interp3 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
@@ -435,11 +447,11 @@ class ChannelTests(TestBase):
"""))
_run_output(interp2, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
_run_output(interp3, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
send_interps = _channels.list_interpreters(cid, send=True)
recv_interps = _channels.list_interpreters(cid, send=False)
@@ -450,11 +462,11 @@ class ChannelTests(TestBase):
"""Test listing channel interpreters with a destroyed interpreter."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
# Should be one interpreter associated with each end.
send_interps = _channels.list_interpreters(cid, send=True)
@@ -476,16 +488,16 @@ class ChannelTests(TestBase):
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
interp2 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "data", blocking=False)
_run_output(interp1, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
_channels.send(cid, "data", blocking=False)
_run_output(interp2, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
# Check the setup.
send_interps = _channels.list_interpreters(cid, send=True)
@@ -516,7 +528,7 @@ class ChannelTests(TestBase):
"""Test listing channel interpreters with a closed channel."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
# Put something in the channel so that it's not empty.
_channels.send(cid, "send", blocking=False)
@@ -538,7 +550,7 @@ class ChannelTests(TestBase):
"""Test listing channel interpreters with a channel's send end closed."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
# Put something in the channel so that it's not empty.
_channels.send(cid, "send", blocking=False)
@@ -570,7 +582,7 @@ class ChannelTests(TestBase):
_channels.list_interpreters(cid, send=False)
def test_allowed_types(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
objects = [
None,
'spam',
@@ -580,7 +592,7 @@ class ChannelTests(TestBase):
for obj in objects:
with self.subTest(obj):
_channels.send(cid, obj, blocking=False)
- got = _channels.recv(cid)
+ got = recv_nowait(cid)
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
@@ -589,7 +601,7 @@ class ChannelTests(TestBase):
# XXX What about between interpreters?
def test_run_string_arg_unresolved(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.set___main___attrs(interp, dict(cid=cid.send))
@@ -598,7 +610,7 @@ class ChannelTests(TestBase):
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
@@ -608,7 +620,7 @@ class ChannelTests(TestBase):
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
cid = _channels._channel_id(cid, _resolve=True)
interp = _interpreters.create()
@@ -618,7 +630,7 @@ class ChannelTests(TestBase):
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
@@ -627,10 +639,10 @@ class ChannelTests(TestBase):
# send/recv
def test_send_recv_main(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
orig = b'spam'
_channels.send(cid, orig, blocking=False)
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, orig)
self.assertIsNot(obj, orig)
@@ -639,27 +651,27 @@ class ChannelTests(TestBase):
id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _interpchannels as _channels
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
orig = b'spam'
_channels.send(cid, orig, blocking=False)
- obj = _channels.recv(cid)
+ obj, _ = _channels.recv(cid)
assert obj is not orig
assert obj == orig
"""))
def test_send_recv_different_interpreters(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
out = _run_output(id1, dedent(f"""
import _interpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
"""))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
def test_send_recv_different_threads(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
obj = recv_wait(cid)
@@ -674,7 +686,7 @@ class ChannelTests(TestBase):
self.assertEqual(obj, b'spam')
def test_send_recv_different_interpreters_and_threads(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
out = None
@@ -685,7 +697,7 @@ class ChannelTests(TestBase):
import _interpchannels as _channels
while True:
try:
- obj = _channels.recv({cid})
+ obj, _ = _channels.recv({cid})
break
except _channels.ChannelEmptyError:
time.sleep(0.1)
@@ -710,23 +722,23 @@ class ChannelTests(TestBase):
_channels.recv(10)
def test_recv_empty(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(_channels.ChannelEmptyError):
_channels.recv(cid)
def test_recv_default(self):
default = object()
- cid = _channels.create()
- obj1 = _channels.recv(cid, default)
+ cid = _channels.create(REPLACE)
+ obj1 = recv_nowait(cid, default)
_channels.send(cid, None, blocking=False)
_channels.send(cid, 1, blocking=False)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'eggs', blocking=False)
- obj2 = _channels.recv(cid, default)
- obj3 = _channels.recv(cid, default)
- obj4 = _channels.recv(cid)
- obj5 = _channels.recv(cid, default)
- obj6 = _channels.recv(cid, default)
+ obj2 = recv_nowait(cid, default)
+ obj3 = recv_nowait(cid, default)
+ obj4 = recv_nowait(cid)
+ obj5 = recv_nowait(cid, default)
+ obj6 = recv_nowait(cid, default)
self.assertIs(obj1, default)
self.assertIs(obj2, None)
@@ -737,7 +749,7 @@ class ChannelTests(TestBase):
def test_recv_sending_interp_destroyed(self):
with self.subTest('closed'):
- cid1 = _channels.create()
+ cid1 = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
@@ -750,7 +762,7 @@ class ChannelTests(TestBase):
_channels.recv(cid1)
del cid1
with self.subTest('still open'):
- cid2 = _channels.create()
+ cid2 = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
@@ -759,7 +771,8 @@ class ChannelTests(TestBase):
_channels.send(cid2, b'eggs', blocking=False)
_interpreters.destroy(interp)
- _channels.recv(cid2)
+ recv_nowait(cid2, unbound=True)
+ recv_nowait(cid2, unbound=False)
with self.assertRaisesRegex(RuntimeError,
f'channel {cid2} is empty'):
_channels.recv(cid2)
@@ -770,9 +783,9 @@ class ChannelTests(TestBase):
def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send_buffer(cid, buf, blocking=False)
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertIsNot(obj, buf)
self.assertIsInstance(obj, memoryview)
@@ -794,12 +807,12 @@ class ChannelTests(TestBase):
else:
send = _channels.send
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
try:
started = time.monotonic()
send(cid, obj, blocking=False)
stopped = time.monotonic()
- _channels.recv(cid)
+ recv_nowait(cid)
finally:
_channels.destroy(cid)
delay = stopped - started # seconds
@@ -813,7 +826,7 @@ class ChannelTests(TestBase):
received = None
obj = b'spam'
wait = self.build_send_waiter(obj)
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
wait()
@@ -829,7 +842,7 @@ class ChannelTests(TestBase):
received = None
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
wait()
@@ -844,7 +857,7 @@ class ChannelTests(TestBase):
def test_send_blocking_no_wait(self):
received = None
obj = b'spam'
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
received = recv_wait(cid)
@@ -858,7 +871,7 @@ class ChannelTests(TestBase):
def test_send_buffer_blocking_no_wait(self):
received = None
obj = bytearray(b'spam')
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
received = recv_wait(cid)
@@ -873,20 +886,20 @@ class ChannelTests(TestBase):
obj = b'spam'
with self.subTest('non-blocking with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(ValueError):
_channels.send(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(TimeoutError):
_channels.send(cid, obj, blocking=True, timeout=0.1)
with self.assertRaises(_channels.ChannelEmptyError):
- received = _channels.recv(cid)
+ received = recv_nowait(cid)
print(repr(received))
with self.subTest('timeout not hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
recv_wait(cid)
t = threading.Thread(target=f)
@@ -910,20 +923,20 @@ class ChannelTests(TestBase):
obj = bytearray(b'spam')
with self.subTest('non-blocking with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(ValueError):
_channels.send_buffer(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(TimeoutError):
_channels.send_buffer(cid, obj, blocking=True, timeout=0.1)
with self.assertRaises(_channels.ChannelEmptyError):
- received = _channels.recv(cid)
+ received = recv_nowait(cid)
print(repr(received))
with self.subTest('timeout not hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
recv_wait(cid)
t = threading.Thread(target=f)
@@ -936,7 +949,7 @@ class ChannelTests(TestBase):
wait = self.build_send_waiter(obj)
with self.subTest('without timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
@@ -947,7 +960,7 @@ class ChannelTests(TestBase):
t.join()
with self.subTest('with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
@@ -974,7 +987,7 @@ class ChannelTests(TestBase):
wait = self.build_send_waiter(obj, buffer=True)
with self.subTest('without timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
@@ -985,7 +998,7 @@ class ChannelTests(TestBase):
t.join()
with self.subTest('with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
@@ -999,9 +1012,9 @@ class ChannelTests(TestBase):
# close
def test_close_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1010,7 +1023,7 @@ class ChannelTests(TestBase):
_channels.recv(cid)
def test_close_multiple_users(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f"""
@@ -1034,9 +1047,9 @@ class ChannelTests(TestBase):
self.assertEqual(excsnap.type.__name__, 'ChannelClosedError')
def test_close_multiple_times(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1051,9 +1064,9 @@ class ChannelTests(TestBase):
]
for send, recv in tests:
with self.subTest((send, recv)):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid, send=send, recv=recv)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1062,56 +1075,56 @@ class ChannelTests(TestBase):
_channels.recv(cid)
def test_close_defaults_with_unused_items(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False)
def test_close_recv_with_unused_items_unforced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid, recv=True)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False)
- _channels.recv(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
+ recv_nowait(cid)
_channels.close(cid, recv=True)
def test_close_send_with_unused_items_unforced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True)
with self.assertRaises(_channels.ChannelClosedError):
_channels.send(cid, b'eggs')
- _channels.recv(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
+ recv_nowait(cid)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_close_both_with_unused_items_unforced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid, recv=True, send=True)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False)
- _channels.recv(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
+ recv_nowait(cid)
_channels.close(cid, recv=True)
def test_close_recv_with_unused_items_forced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, recv=True, force=True)
@@ -1122,7 +1135,7 @@ class ChannelTests(TestBase):
_channels.recv(cid)
def test_close_send_with_unused_items_forced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True, force=True)
@@ -1133,7 +1146,7 @@ class ChannelTests(TestBase):
_channels.recv(cid)
def test_close_both_with_unused_items_forced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True, recv=True, force=True)
@@ -1144,7 +1157,7 @@ class ChannelTests(TestBase):
_channels.recv(cid)
def test_close_never_used(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1153,7 +1166,7 @@ class ChannelTests(TestBase):
_channels.recv(cid)
def test_close_by_unassociated_interp(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
@@ -1166,11 +1179,11 @@ class ChannelTests(TestBase):
_channels.close(cid)
def test_close_used_multiple_times_by_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid, force=True)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1179,7 +1192,7 @@ class ChannelTests(TestBase):
_channels.recv(cid)
def test_channel_list_interpreters_invalid_channel(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
# Test for invalid channel ID.
with self.assertRaises(_channels.ChannelNotFoundError):
_channels.list_interpreters(1000, send=True)
@@ -1191,7 +1204,7 @@ class ChannelTests(TestBase):
def test_channel_list_interpreters_invalid_args(self):
# Tests for invalid arguments passed to the API.
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(TypeError):
_channels.list_interpreters(cid)
@@ -1240,9 +1253,9 @@ class ChannelReleaseTests(TestBase):
"""
def test_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1251,7 +1264,7 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid)
def test_multiple_users(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f"""
@@ -1260,7 +1273,7 @@ class ChannelReleaseTests(TestBase):
"""))
out = _run_output(id2, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ obj, _ = _channels.recv({cid})
_channels.release({cid})
print(repr(obj))
"""))
@@ -1271,9 +1284,9 @@ class ChannelReleaseTests(TestBase):
self.assertEqual(out.strip(), "b'spam'")
def test_no_kwargs(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1282,16 +1295,16 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid)
def test_multiple_times(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError):
_channels.release(cid, send=True, recv=True)
def test_with_unused_items(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.release(cid, send=True, recv=True)
@@ -1300,7 +1313,7 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid)
def test_never_used(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1309,14 +1322,14 @@ class ChannelReleaseTests(TestBase):
_channels.recv(cid)
def test_by_unassociated_interp(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
_channels.release({cid})
"""))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
_channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1325,7 +1338,7 @@ class ChannelReleaseTests(TestBase):
def test_close_if_unassociated(self):
# XXX Something's not right with this test...
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
@@ -1338,21 +1351,21 @@ class ChannelReleaseTests(TestBase):
def test_partially(self):
# XXX Is partial close too weird/confusing?
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, None, blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'spam', blocking=False)
_channels.release(cid, send=True)
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
def test_used_multiple_times_by_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError):
@@ -1428,9 +1441,9 @@ class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
def _new_channel(self, creator):
if creator.name == 'main':
- return _channels.create()
+ return _channels.create(REPLACE)
else:
- ch = _channels.create()
+ ch = _channels.create(REPLACE)
run_interp(creator.id, f"""
import _interpreters
cid = _xxsubchannels.create()
@@ -1439,7 +1452,7 @@ class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
_xxsubchannels.send({ch}, int(cid), blocking=False)
del _interpreters
""")
- self._cid = _channels.recv(ch)
+ self._cid = recv_nowait(ch)
return self._cid
def _get_interpreter(self, interp):
@@ -1657,7 +1670,7 @@ class ExhaustiveChannelTests(TestBase):
)
fix.record_action(action, result)
else:
- _cid = _channels.create()
+ _cid = _channels.create(REPLACE)
run_interp(interp.id, f"""
result = helpers.run_action(
{fix.cid},
@@ -1670,8 +1683,8 @@ class ExhaustiveChannelTests(TestBase):
_channels.send({_cid}, b'X' if result.closed else b'', blocking=False)
""")
result = ChannelState(
- pending=int.from_bytes(_channels.recv(_cid), 'little'),
- closed=bool(_channels.recv(_cid)),
+ pending=int.from_bytes(recv_nowait(_cid), 'little'),
+ closed=bool(recv_nowait(_cid)),
)
fix.record_action(action, result)
@@ -1729,7 +1742,7 @@ class ExhaustiveChannelTests(TestBase):
self.assertTrue(fix.state.closed)
for _ in range(fix.state.pending):
- _channels.recv(fix.cid)
+ recv_nowait(fix.cid)
self._assert_closed_in_interp(fix)
for interp in ('same', 'other'):
diff --git a/Lib/test/test_interpreters/test_channels.py b/Lib/test/test_interpreters/test_channels.py
index 6c37754142e..eada18f99d0 100644
--- a/Lib/test/test_interpreters/test_channels.py
+++ b/Lib/test/test_interpreters/test_channels.py
@@ -372,6 +372,228 @@ class TestSendRecv(TestBase):
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)
+ def test_send_cleared_with_subinterpreter(self):
+ def common(rch, sch, unbound=None, presize=0):
+ if not unbound:
+ extraargs = ''
+ elif unbound is channels.UNBOUND:
+ extraargs = ', unbound=channels.UNBOUND'
+ elif unbound is channels.UNBOUND_ERROR:
+ extraargs = ', unbound=channels.UNBOUND_ERROR'
+ elif unbound is channels.UNBOUND_REMOVE:
+ extraargs = ', unbound=channels.UNBOUND_REMOVE'
+ else:
+ raise NotImplementedError(repr(unbound))
+ interp = interpreters.create()
+
+ _run_output(interp, dedent(f"""
+ from test.support.interpreters import channels
+ sch = channels.SendChannel({sch.id})
+ obj1 = b'spam'
+ obj2 = b'eggs'
+ sch.send_nowait(obj1{extraargs})
+ sch.send_nowait(obj2{extraargs})
+ """))
+ self.assertEqual(
+ _channels.get_count(rch.id),
+ presize + 2,
+ )
+
+ if presize == 0:
+ obj1 = rch.recv()
+ self.assertEqual(obj1, b'spam')
+ self.assertEqual(
+ _channels.get_count(rch.id),
+ presize + 1,
+ )
+
+ return interp
+
+ with self.subTest('default'): # UNBOUND
+ rch, sch = channels.create()
+ interp = common(rch, sch)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ obj1 = rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ self.assertIs(obj1, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ with self.subTest('UNBOUND'):
+ rch, sch = channels.create()
+ interp = common(rch, sch, channels.UNBOUND)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ obj1 = rch.recv()
+ self.assertIs(obj1, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ with self.subTest('UNBOUND_ERROR'):
+ rch, sch = channels.create()
+ interp = common(rch, sch, channels.UNBOUND_ERROR)
+
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ with self.assertRaises(channels.ItemInterpreterDestroyed):
+ rch.recv()
+
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ with self.subTest('UNBOUND_REMOVE'):
+ rch, sch = channels.create()
+
+ interp = common(rch, sch, channels.UNBOUND_REMOVE)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
+ self.assertEqual(_channels.get_count(rch.id), 3)
+ sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
+ self.assertEqual(_channels.get_count(rch.id), 4)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 2)
+ obj1 = rch.recv()
+ obj2 = rch.recv()
+ self.assertEqual(obj1, b'ham')
+ self.assertEqual(obj2, 42)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ def test_send_cleared_with_subinterpreter_mixed(self):
+ rch, sch = channels.create()
+ interp = interpreters.create()
+
+ # If we don't associate the main interpreter with the channel
+ # then the channel will be automatically closed when interp
+ # is destroyed.
+ sch.send_nowait(None)
+ rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 0)
+
+ _run_output(interp, dedent(f"""
+ from test.support.interpreters import channels
+ sch = channels.SendChannel({sch.id})
+ sch.send_nowait(1, unbound=channels.UNBOUND)
+ sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
+ sch.send_nowait(3)
+ sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(5, unbound=channels.UNBOUND)
+ """))
+ self.assertEqual(_channels.get_count(rch.id), 5)
+
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 4)
+
+ obj1 = rch.recv()
+ self.assertIs(obj1, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 3)
+
+ with self.assertRaises(channels.ItemInterpreterDestroyed):
+ rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 2)
+
+ obj2 = rch.recv()
+ self.assertIs(obj2, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 1)
+
+ obj3 = rch.recv()
+ self.assertIs(obj3, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+
+ def test_send_cleared_with_subinterpreter_multiple(self):
+ rch, sch = channels.create()
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+
+ sch.send_nowait(1)
+ _run_output(interp1, dedent(f"""
+ from test.support.interpreters import channels
+ rch = channels.RecvChannel({rch.id})
+ sch = channels.SendChannel({sch.id})
+ obj1 = rch.recv()
+ sch.send_nowait(2, unbound=channels.UNBOUND)
+ sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
+ """))
+ _run_output(interp2, dedent(f"""
+ from test.support.interpreters import channels
+ rch = channels.RecvChannel({rch.id})
+ sch = channels.SendChannel({sch.id})
+ obj2 = rch.recv()
+ obj1 = rch.recv()
+ """))
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ sch.send_nowait(3)
+ _run_output(interp1, dedent("""
+ sch.send_nowait(4, unbound=channels.UNBOUND)
+ # interp closed here
+ sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(6, unbound=channels.UNBOUND)
+ """))
+ _run_output(interp2, dedent("""
+ sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
+ # interp closed here
+ sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
+ sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(8, unbound=channels.UNBOUND)
+ """))
+ _run_output(interp1, dedent("""
+ sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(10, unbound=channels.UNBOUND)
+ """))
+ self.assertEqual(_channels.get_count(rch.id), 10)
+
+ obj3 = rch.recv()
+ self.assertEqual(obj3, 3)
+ self.assertEqual(_channels.get_count(rch.id), 9)
+
+ obj4 = rch.recv()
+ self.assertEqual(obj4, 4)
+ self.assertEqual(_channels.get_count(rch.id), 8)
+
+ del interp1
+ self.assertEqual(_channels.get_count(rch.id), 6)
+
+ # obj5 was removed
+
+ obj6 = rch.recv()
+ self.assertIs(obj6, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 5)
+
+ obj7 = rch.recv()
+ self.assertEqual(obj7, 7)
+ self.assertEqual(_channels.get_count(rch.id), 4)
+
+ del interp2
+ self.assertEqual(_channels.get_count(rch.id), 3)
+
+ # obj1
+ with self.assertRaises(channels.ItemInterpreterDestroyed):
+ rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 2)
+
+ # obj2 was removed
+
+ obj8 = rch.recv()
+ self.assertIs(obj8, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 1)
+
+ # obj9 was removed
+
+ obj10 = rch.recv()
+ self.assertIs(obj10, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py
index 30d58a5b291..18f83d097eb 100644
--- a/Lib/test/test_interpreters/test_queues.py
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -8,11 +8,11 @@ from test.support import import_helper, Py_DEBUG
# Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_interpqueues')
from test.support import interpreters
-from test.support.interpreters import queues
+from test.support.interpreters import queues, _crossinterp
from .utils import _run_output, TestBase as _TestBase
-REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND]
+REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
def get_num_queues():
diff --git a/Modules/_interpchannelsmodule.c b/Modules/_interpchannelsmodule.c
index f0447475c49..a8b4a8d76b0 100644
--- a/Modules/_interpchannelsmodule.c
+++ b/Modules/_interpchannelsmodule.c
@@ -18,7 +18,9 @@
#endif
#define REGISTERS_HEAP_TYPES
+#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
+#undef HAS_UNBOUND_ITEMS
#undef REGISTERS_HEAP_TYPES
@@ -511,8 +513,14 @@ _waiting_finish_releasing(_waiting_t *waiting)
struct _channelitem;
typedef struct _channelitem {
+ /* The interpreter that added the item to the queue.
+ The actual bound interpid is found in item->data.
+ This is necessary because item->data might be NULL,
+ meaning the interpreter has been destroyed. */
+ int64_t interpid;
_PyCrossInterpreterData *data;
_waiting_t *waiting;
+ int unboundop;
struct _channelitem *next;
} _channelitem;
@@ -524,11 +532,22 @@ _channelitem_ID(_channelitem *item)
static void
_channelitem_init(_channelitem *item,
- _PyCrossInterpreterData *data, _waiting_t *waiting)
+ int64_t interpid, _PyCrossInterpreterData *data,
+ _waiting_t *waiting, int unboundop)
{
+ if (interpid < 0) {
+ interpid = _get_interpid(data);
+ }
+ else {
+ assert(data == NULL
+ || _PyCrossInterpreterData_INTERPID(data) < 0
+ || interpid == _PyCrossInterpreterData_INTERPID(data));
+ }
*item = (_channelitem){
+ .interpid = interpid,
.data = data,
.waiting = waiting,
+ .unboundop = unboundop,
};
if (waiting != NULL) {
waiting->itemid = _channelitem_ID(item);
@@ -536,17 +555,15 @@ _channelitem_init(_channelitem *item,
}
static void
-_channelitem_clear(_channelitem *item)
+_channelitem_clear_data(_channelitem *item, int removed)
{
- item->next = NULL;
-
if (item->data != NULL) {
// It was allocated in channel_send().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL;
}
- if (item->waiting != NULL) {
+ if (item->waiting != NULL && removed) {
if (item->waiting->status == WAITING_ACQUIRED) {
_waiting_release(item->waiting, 0);
}
@@ -554,15 +571,23 @@ _channelitem_clear(_channelitem *item)
}
}
+static void
+_channelitem_clear(_channelitem *item)
+{
+ item->next = NULL;
+ _channelitem_clear_data(item, 1);
+}
+
static _channelitem *
-_channelitem_new(_PyCrossInterpreterData *data, _waiting_t *waiting)
+_channelitem_new(int64_t interpid, _PyCrossInterpreterData *data,
+ _waiting_t *waiting, int unboundop)
{
_channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
- _channelitem_init(item, data, waiting);
+ _channelitem_init(item, interpid, data, waiting, unboundop);
return item;
}
@@ -585,17 +610,48 @@ _channelitem_free_all(_channelitem *item)
static void
_channelitem_popped(_channelitem *item,
- _PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
+ _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
+ int *p_unboundop)
{
assert(item->waiting == NULL || item->waiting->status == WAITING_ACQUIRED);
*p_data = item->data;
*p_waiting = item->waiting;
+ *p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _channelitem_clear().
item->data = NULL;
item->waiting = NULL;
_channelitem_free(item);
}
+static int
+_channelitem_clear_interpreter(_channelitem *item)
+{
+ assert(item->interpid >= 0);
+ if (item->data == NULL) {
+ // Its interpreter was already cleared (or it was never bound).
+ // For UNBOUND_REMOVE it should have been freed at that time.
+ assert(item->unboundop != UNBOUND_REMOVE);
+ return 0;
+ }
+ assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
+
+ switch (item->unboundop) {
+ case UNBOUND_REMOVE:
+ // The caller must free/clear it.
+ return 1;
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ // We won't need the cross-interpreter data later
+ // so we completely throw it away.
+ _channelitem_clear_data(item, 0);
+ return 0;
+ default:
+ Py_FatalError("not reachable");
+ return -1;
+ }
+}
+
+
typedef struct _channelqueue {
int64_t count;
_channelitem *first;
@@ -634,9 +690,10 @@ _channelqueue_free(_channelqueue *queue)
static int
_channelqueue_put(_channelqueue *queue,
- _PyCrossInterpreterData *data, _waiting_t *waiting)
+ int64_t interpid, _PyCrossInterpreterData *data,
+ _waiting_t *waiting, int unboundop)
{
- _channelitem *item = _channelitem_new(data, waiting);
+ _channelitem *item = _channelitem_new(interpid, data, waiting, unboundop);
if (item == NULL) {
return -1;
}
@@ -659,7 +716,8 @@ _channelqueue_put(_channelqueue *queue,
static int
_channelqueue_get(_channelqueue *queue,
- _PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
+ _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
+ int *p_unboundop)
{
_channelitem *item = queue->first;
if (item == NULL) {
@@ -671,7 +729,7 @@ _channelqueue_get(_channelqueue *queue,
}
queue->count -= 1;
- _channelitem_popped(item, p_data, p_waiting);
+ _channelitem_popped(item, p_data, p_waiting, p_unboundop);
return 0;
}
@@ -737,7 +795,8 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
}
queue->count -= 1;
- _channelitem_popped(item, p_data, p_waiting);
+ int unboundop;
+ _channelitem_popped(item, p_data, p_waiting, &unboundop);
}
static void
@@ -748,14 +807,17 @@ _channelqueue_clear_interpreter(_channelqueue *queue, int64_t interpid)
while (next != NULL) {
_channelitem *item = next;
next = item->next;
- if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
+ int remove = (item->interpid == interpid)
+ ? _channelitem_clear_interpreter(item)
+ : 0;
+ if (remove) {
+ _channelitem_free(item);
if (prev == NULL) {
- queue->first = item->next;
+ queue->first = next;
}
else {
- prev->next = item->next;
+ prev->next = next;
}
- _channelitem_free(item);
queue->count -= 1;
}
else {
@@ -1018,12 +1080,15 @@ typedef struct _channel {
PyThread_type_lock mutex;
_channelqueue *queue;
_channelends *ends;
+ struct {
+ int unboundop;
+ } defaults;
int open;
struct _channel_closing *closing;
} _channel_state;
static _channel_state *
-_channel_new(PyThread_type_lock mutex)
+_channel_new(PyThread_type_lock mutex, int unboundop)
{
_channel_state *chan = GLOBAL_MALLOC(_channel_state);
if (chan == NULL) {
@@ -1041,6 +1106,7 @@ _channel_new(PyThread_type_lock mutex)
GLOBAL_FREE(chan);
return NULL;
}
+ chan->defaults.unboundop = unboundop;
chan->open = 1;
chan->closing = NULL;
return chan;
@@ -1061,7 +1127,8 @@ _channel_free(_channel_state *chan)
static int
_channel_add(_channel_state *chan, int64_t interpid,
- _PyCrossInterpreterData *data, _waiting_t *waiting)
+ _PyCrossInterpreterData *data, _waiting_t *waiting,
+ int unboundop)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@@ -1075,7 +1142,7 @@ _channel_add(_channel_state *chan, int64_t interpid,
goto done;
}
- if (_channelqueue_put(chan->queue, data, waiting) != 0) {
+ if (_channelqueue_put(chan->queue, interpid, data, waiting, unboundop) != 0) {
goto done;
}
// Any errors past this point must cause a _waiting_release() call.
@@ -1088,7 +1155,8 @@ done:
static int
_channel_next(_channel_state *chan, int64_t interpid,
- _PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
+ _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
+ int *p_unboundop)
{
int err = 0;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@@ -1102,11 +1170,15 @@ _channel_next(_channel_state *chan, int64_t interpid,
goto done;
}
- int empty = _channelqueue_get(chan->queue, p_data, p_waiting);
- assert(empty == 0 || empty == ERR_CHANNEL_EMPTY);
+ int empty = _channelqueue_get(chan->queue, p_data, p_waiting, p_unboundop);
assert(!PyErr_Occurred());
- if (empty && chan->closing != NULL) {
- chan->open = 0;
+ if (empty) {
+ assert(empty == ERR_CHANNEL_EMPTY);
+ if (chan->closing != NULL) {
+ chan->open = 0;
+ }
+ err = ERR_CHANNEL_EMPTY;
+ goto done;
}
done:
@@ -1528,18 +1600,27 @@ done:
PyThread_release_lock(channels->mutex);
}
-static int64_t *
+struct channel_id_and_info {
+ int64_t id;
+ int unboundop;
+};
+
+static struct channel_id_and_info *
_channels_list_all(_channels *channels, int64_t *count)
{
- int64_t *cids = NULL;
+ struct channel_id_and_info *cids = NULL;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
+ struct channel_id_and_info *ids =
+ PyMem_NEW(struct channel_id_and_info, (Py_ssize_t)(channels->numopen));
if (ids == NULL) {
goto done;
}
_channelref *ref = channels->head;
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
- ids[i] = ref->cid;
+ ids[i] = (struct channel_id_and_info){
+ .id = ref->cid,
+ .unboundop = ref->chan->defaults.unboundop,
+ };
}
*count = channels->numopen;
@@ -1624,13 +1705,13 @@ _channel_finish_closing(_channel_state *chan) {
// Create a new channel.
static int64_t
-channel_create(_channels *channels)
+channel_create(_channels *channels, int unboundop)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_CHANNEL_MUTEX_INIT;
}
- _channel_state *chan = _channel_new(mutex);
+ _channel_state *chan = _channel_new(mutex, unboundop);
if (chan == NULL) {
PyThread_free_lock(mutex);
return -1;
@@ -1662,7 +1743,7 @@ channel_destroy(_channels *channels, int64_t cid)
// Optionally request to be notified when it is received.
static int
channel_send(_channels *channels, int64_t cid, PyObject *obj,
- _waiting_t *waiting)
+ _waiting_t *waiting, int unboundop)
{
PyInterpreterState *interp = _get_current_interp();
if (interp == NULL) {
@@ -1698,7 +1779,7 @@ channel_send(_channels *channels, int64_t cid, PyObject *obj,
}
// Add the data to the channel.
- int res = _channel_add(chan, interpid, data, waiting);
+ int res = _channel_add(chan, interpid, data, waiting, unboundop);
PyThread_release_lock(mutex);
if (res != 0) {
// We may chain an exception here:
@@ -1735,7 +1816,7 @@ channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
// Like channel_send(), but strictly wait for the object to be received.
static int
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
- PY_TIMEOUT_T timeout)
+ int unboundop, PY_TIMEOUT_T timeout)
{
// We use a stack variable here, so we must ensure that &waiting
// is not held by any channel item at the point this function exits.
@@ -1746,7 +1827,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
}
/* Queue up the object. */
- int res = channel_send(channels, cid, obj, &waiting);
+ int res = channel_send(channels, cid, obj, &waiting, unboundop);
if (res < 0) {
assert(waiting.status == WAITING_NO_STATUS);
goto finally;
@@ -1788,7 +1869,7 @@ finally:
// The current interpreter gets associated with the recv end of the channel.
// XXX Support a "wait" mutex?
static int
-channel_recv(_channels *channels, int64_t cid, PyObject **res)
+channel_recv(_channels *channels, int64_t cid, PyObject **res, int *p_unboundop)
{
int err;
*res = NULL;
@@ -1816,13 +1897,15 @@ channel_recv(_channels *channels, int64_t cid, PyObject **res)
// Pop off the next item from the channel.
_PyCrossInterpreterData *data = NULL;
_waiting_t *waiting = NULL;
- err = _channel_next(chan, interpid, &data, &waiting);
+ err = _channel_next(chan, interpid, &data, &waiting, p_unboundop);
PyThread_release_lock(mutex);
if (err != 0) {
return err;
}
else if (data == NULL) {
+ // The item was unbound.
assert(!PyErr_Occurred());
+ *res = NULL;
return 0;
}
@@ -1915,6 +1998,23 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
return (end != NULL && end->open);
}
+static int
+_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
+{
+ PyThread_type_lock mutex = NULL;
+ _channel_state *chan = NULL;
+ int err = _channels_lookup(channels, cid, &mutex, &chan);
+ if (err != 0) {
+ return err;
+ }
+ assert(chan != NULL);
+ int64_t count = chan->queue->count;
+ PyThread_release_lock(mutex);
+
+ *p_count = (Py_ssize_t)count;
+ return 0;
+}
+
/* channel info */
@@ -2767,9 +2867,22 @@ clear_interpreter(void *data)
static PyObject *
-channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored))
+channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- int64_t cid = channel_create(&_globals.channels);
+ static char *kwlist[] = {"unboundop", NULL};
+ int unboundop;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist,
+ &unboundop))
+ {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
+ return NULL;
+ }
+
+ int64_t cid = channel_create(&_globals.channels, unboundop);
if (cid < 0) {
(void)handle_channel_error(-1, self, cid);
return NULL;
@@ -2796,7 +2909,7 @@ channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored))
}
PyDoc_STRVAR(channelsmod_create_doc,
-"channel_create() -> cid\n\
+"channel_create(unboundop) -> cid\n\
\n\
Create a new cross-interpreter channel and return a unique generated ID.");
@@ -2831,7 +2944,8 @@ static PyObject *
channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
- int64_t *cids = _channels_list_all(&_globals.channels, &count);
+ struct channel_id_and_info *cids =
+ _channels_list_all(&_globals.channels, &count);
if (cids == NULL) {
if (count == 0) {
return PyList_New(0);
@@ -2848,19 +2962,26 @@ channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
ids = NULL;
goto finally;
}
- int64_t *cur = cids;
+ struct channel_id_and_info *cur = cids;
for (int64_t i=0; i < count; cur++, i++) {
PyObject *cidobj = NULL;
- int err = newchannelid(state->ChannelIDType, *cur, 0,
+ int err = newchannelid(state->ChannelIDType, cur->id, 0,
&_globals.channels, 0, 0,
(channelid **)&cidobj);
- if (handle_channel_error(err, self, *cur)) {
+ if (handle_channel_error(err, self, cur->id)) {
assert(cidobj == NULL);
Py_SETREF(ids, NULL);
break;
}
assert(cidobj != NULL);
- PyList_SET_ITEM(ids, (Py_ssize_t)i, cidobj);
+
+ PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop);
+ Py_DECREF(cidobj);
+ if (item == NULL) {
+ Py_SETREF(ids, NULL);
+ break;
+ }
+ PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
}
finally:
@@ -2942,16 +3063,24 @@ receive end.");
static PyObject *
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
+ NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
+ int unboundop = UNBOUND_REPLACE;
int blocking = 1;
PyObject *timeout_obj = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist,
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist,
channel_id_converter, &cid_data, &obj,
- &blocking, &timeout_obj)) {
+ &unboundop, &blocking, &timeout_obj))
+ {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
return NULL;
}
@@ -2964,10 +3093,10 @@ channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = channel_send_wait(&_globals.channels, cid, obj, timeout);
+ err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout);
}
else {
- err = channel_send(&_globals.channels, cid, obj, NULL);
+ err = channel_send(&_globals.channels, cid, obj, NULL, unboundop);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
@@ -2985,17 +3114,24 @@ By default this waits for the object to be received.");
static PyObject *
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
+ NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
+ int unboundop = UNBOUND_REPLACE;
int blocking = 1;
PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&O|$pO:channel_send_buffer", kwlist,
+ "O&O|i$pO:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj,
- &blocking, &timeout_obj)) {
+ &unboundop, &blocking, &timeout_obj)) {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
return NULL;
}
@@ -3013,10 +3149,11 @@ channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = channel_send_wait(&_globals.channels, cid, tempobj, timeout);
+ err = channel_send_wait(
+ &_globals.channels, cid, tempobj, unboundop, timeout);
}
else {
- err = channel_send(&_globals.channels, cid, tempobj, NULL);
+ err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
@@ -3048,25 +3185,28 @@ channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
cid = cid_data.cid;
PyObject *obj = NULL;
- int err = channel_recv(&_globals.channels, cid, &obj);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
- }
- Py_XINCREF(dflt);
- if (obj == NULL) {
+ int unboundop = 0;
+ int err = channel_recv(&_globals.channels, cid, &obj, &unboundop);
+ if (err == ERR_CHANNEL_EMPTY && dflt != NULL) {
// Use the default.
- if (dflt == NULL) {
- (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
- return NULL;
- }
obj = Py_NewRef(dflt);
+ err = 0;
}
- Py_XDECREF(dflt);
- return obj;
+ else if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ else if (obj == NULL) {
+ // The item was unbound.
+ return Py_BuildValue("Oi", Py_None, unboundop);
+ }
+
+ PyObject *res = Py_BuildValue("OO", obj, Py_None);
+ Py_DECREF(obj);
+ return res;
}
PyDoc_STRVAR(channelsmod_recv_doc,
-"channel_recv(cid, [default]) -> obj\n\
+"channel_recv(cid, [default]) -> (obj, unboundop)\n\
\n\
Return a new object from the data at the front of the channel's queue.\n\
\n\
@@ -3168,6 +3308,34 @@ Close the channel for the current interpreter. 'send' and 'recv'\n\
ends are closed. Closing an already closed end is a noop.");
static PyObject *
+channelsmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", NULL};
+ struct channel_id_converter_data cid_data = {
+ .module = self,
+ };
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&:get_count", kwlist,
+ channel_id_converter, &cid_data)) {
+ return NULL;
+ }
+ int64_t cid = cid_data.cid;
+
+ Py_ssize_t count = -1;
+ int err = _channel_get_count(&_globals.channels, cid, &count);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ assert(count >= 0);
+ return PyLong_FromSsize_t(count);
+}
+
+PyDoc_STRVAR(channelsmod_get_count_doc,
+"get_count(cid)\n\
+\n\
+Return the number of items in the channel.");
+
+static PyObject *
channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", NULL};
@@ -3195,6 +3363,38 @@ PyDoc_STRVAR(channelsmod_get_info_doc,
Return details about the channel.");
static PyObject *
+channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", NULL};
+ struct channel_id_converter_data cid_data = {
+ .module = self,
+ };
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&:get_channel_defaults", kwlist,
+ channel_id_converter, &cid_data)) {
+ return NULL;
+ }
+ int64_t cid = cid_data.cid;
+
+ PyThread_type_lock mutex = NULL;
+ _channel_state *channel = NULL;
+ int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ int unboundop = channel->defaults.unboundop;
+ PyThread_release_lock(mutex);
+
+ PyObject *defaults = Py_BuildValue("i", unboundop);
+ return defaults;
+}
+
+PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,
+"get_channel_defaults(cid)\n\
+\n\
+Return the channel's default values, set when it was created.");
+
+static PyObject *
channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
{
module_state *state = get_module_state(self);
@@ -3240,8 +3440,8 @@ channelsmod__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
}
static PyMethodDef module_functions[] = {
- {"create", channelsmod_create,
- METH_NOARGS, channelsmod_create_doc},
+ {"create", _PyCFunction_CAST(channelsmod_create),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_create_doc},
{"destroy", _PyCFunction_CAST(channelsmod_destroy),
METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc},
{"list_all", channelsmod_list_all,
@@ -3258,8 +3458,12 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc},
{"release", _PyCFunction_CAST(channelsmod_release),
METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc},
+ {"get_count", _PyCFunction_CAST(channelsmod_get_count),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_get_count_doc},
{"get_info", _PyCFunction_CAST(channelsmod_get_info),
METH_VARARGS | METH_KEYWORDS, channelsmod_get_info_doc},
+ {"get_channel_defaults", _PyCFunction_CAST(channelsmod_get_channel_defaults),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_get_channel_defaults_doc},
{"_channel_id", _PyCFunction_CAST(channelsmod__channel_id),
METH_VARARGS | METH_KEYWORDS, NULL},
{"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types),
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c
index 8e827891987..5dec240f02c 100644
--- a/Modules/_interpqueuesmodule.c
+++ b/Modules/_interpqueuesmodule.c
@@ -9,7 +9,9 @@
#include "pycore_crossinterp.h" // struct _xid
#define REGISTERS_HEAP_TYPES
+#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
+#undef HAS_UNBOUND_ITEMS
#undef REGISTERS_HEAP_TYPES
@@ -58,20 +60,6 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags)
return res;
}
-static inline int64_t
-_get_interpid(_PyCrossInterpreterData *data)
-{
- int64_t interpid;
- if (data != NULL) {
- interpid = _PyCrossInterpreterData_INTERPID(data);
- assert(!PyErr_Occurred());
- }
- else {
- interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
- }
- return interpid;
-}
-
static PyInterpreterState *
_get_current_interp(void)
{
@@ -402,32 +390,6 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
}
-/* unbound items ************************************************************/
-
-#define UNBOUND_REMOVE 1
-#define UNBOUND_ERROR 2
-#define UNBOUND_REPLACE 3
-
-// It would also be possible to add UNBOUND_REPLACE where the replacement
-// value is user-provided. There would be some limitations there, though.
-// Another possibility would be something like UNBOUND_COPY, where the
-// object is released but the underlying data is copied (with the "raw"
-// allocator) and used when the item is popped off the queue.
-
-static int
-check_unbound(int unboundop)
-{
- switch (unboundop) {
- case UNBOUND_REMOVE:
- case UNBOUND_ERROR:
- case UNBOUND_REPLACE:
- return 1;
- default:
- return 0;
- }
-}
-
-
/* the basic queue **********************************************************/
struct _queueitem;
diff --git a/Modules/_interpreters_common.h b/Modules/_interpreters_common.h
index 07120f6ccc7..0d2e0c9efd3 100644
--- a/Modules/_interpreters_common.h
+++ b/Modules/_interpreters_common.h
@@ -19,3 +19,48 @@ clear_xid_class(PyTypeObject *cls)
return _PyCrossInterpreterData_UnregisterClass(cls);
}
#endif
+
+
+static inline int64_t
+_get_interpid(_PyCrossInterpreterData *data)
+{
+ int64_t interpid;
+ if (data != NULL) {
+ interpid = _PyCrossInterpreterData_INTERPID(data);
+ assert(!PyErr_Occurred());
+ }
+ else {
+ interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
+ }
+ return interpid;
+}
+
+
+/* unbound items ************************************************************/
+
+#ifdef HAS_UNBOUND_ITEMS
+
+#define UNBOUND_REMOVE 1
+#define UNBOUND_ERROR 2
+#define UNBOUND_REPLACE 3
+
+// It would also be possible to add UNBOUND_REPLACE where the replacement
+// value is user-provided. There would be some limitations there, though.
+// Another possibility would be something like UNBOUND_COPY, where the
+// object is released but the underlying data is copied (with the "raw"
+// allocator) and used when the item is popped off the queue.
+
+static int
+check_unbound(int unboundop)
+{
+ switch (unboundop) {
+ case UNBOUND_REMOVE:
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+#endif