aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_interpreters/test_queues.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_interpreters/test_queues.py')
-rw-r--r--Lib/test/test_interpreters/test_queues.py271
1 files changed, 132 insertions, 139 deletions
diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py
index 18f83d097eb..cb17340f581 100644
--- a/Lib/test/test_interpreters/test_queues.py
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -7,8 +7,8 @@ import unittest
from test.support import import_helper, Py_DEBUG
# Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_interpqueues')
-from test.support import interpreters
-from test.support.interpreters import queues, _crossinterp
+from concurrent import interpreters
+from concurrent.interpreters import _queues as queues, _crossinterp
from .utils import _run_output, TestBase as _TestBase
@@ -42,7 +42,7 @@ class LowLevelTests(TestBase):
importlib.reload(queues)
def test_create_destroy(self):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.assertRaises(queues.QueueNotFoundError):
@@ -56,7 +56,7 @@ class LowLevelTests(TestBase):
'-c',
dedent(f"""
import {_queues.__name__} as _queues
- _queues.create(2, 0, {REPLACE})
+ _queues.create(2, {REPLACE}, -1)
"""),
)
self.assertEqual(stdout, '')
@@ -67,13 +67,13 @@ class LowLevelTests(TestBase):
def test_bind_release(self):
with self.subTest('typical'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('bind too much'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@@ -81,7 +81,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('nested'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@@ -89,7 +89,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('release without binding'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
with self.assertRaises(queues.QueueError):
_queues.release(qid)
@@ -126,19 +126,19 @@ class QueueTests(TestBase):
interp = interpreters.create()
interp.exec(dedent(f"""
- from test.support.interpreters import queues
+ from concurrent.interpreters import _queues as queues
queue1 = queues.Queue({queue1.id})
"""));
with self.subTest('same interpreter'):
queue2 = queues.create()
- queue1.put(queue2, syncobj=True)
+ queue1.put(queue2)
queue3 = queue1.get()
self.assertIs(queue3, queue2)
with self.subTest('from current interpreter'):
queue4 = queues.create()
- queue1.put(queue4, syncobj=True)
+ queue1.put(queue4)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
@@ -149,7 +149,7 @@ class QueueTests(TestBase):
with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
- queue1.put(queue5, syncobj=True)
+ queue1.put(queue5)
print(queue5.id)
"""))
qid = int(out)
@@ -198,7 +198,7 @@ class TestQueueOps(TestBase):
def test_empty(self):
queue = queues.create()
before = queue.empty()
- queue.put(None, syncobj=True)
+ queue.put(None)
during = queue.empty()
queue.get()
after = queue.empty()
@@ -208,18 +208,64 @@ class TestQueueOps(TestBase):
self.assertIs(after, True)
def test_full(self):
- expected = [False, False, False, True, False, False, False]
- actual = []
- queue = queues.create(3)
- for _ in range(3):
- actual.append(queue.full())
- queue.put(None, syncobj=True)
- actual.append(queue.full())
- for _ in range(3):
- queue.get()
- actual.append(queue.full())
+ for maxsize in [1, 3, 11]:
+ with self.subTest(f'maxsize={maxsize}'):
+ num_to_add = maxsize
+ expected = [False] * (num_to_add * 2 + 3)
+ expected[maxsize] = True
+ expected[maxsize + 1] = True
+
+ queue = queues.create(maxsize)
+ actual = []
+ empty = [queue.empty()]
+
+ for _ in range(num_to_add):
+ actual.append(queue.full())
+ queue.put_nowait(None)
+ actual.append(queue.full())
+ with self.assertRaises(queues.QueueFull):
+ queue.put_nowait(None)
+ empty.append(queue.empty())
+
+ for _ in range(num_to_add):
+ actual.append(queue.full())
+ queue.get_nowait()
+ actual.append(queue.full())
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+ actual.append(queue.full())
+ empty.append(queue.empty())
- self.assertEqual(actual, expected)
+ self.assertEqual(actual, expected)
+ self.assertEqual(empty, [True, False, True])
+
+ # no max size
+ for args in [(), (0,), (-1,), (-10,)]:
+ with self.subTest(f'maxsize={args[0]}' if args else '<default>'):
+ num_to_add = 13
+ expected = [False] * (num_to_add * 2 + 3)
+
+ queue = queues.create(*args)
+ actual = []
+ empty = [queue.empty()]
+
+ for _ in range(num_to_add):
+ actual.append(queue.full())
+ queue.put_nowait(None)
+ actual.append(queue.full())
+ empty.append(queue.empty())
+
+ for _ in range(num_to_add):
+ actual.append(queue.full())
+ queue.get_nowait()
+ actual.append(queue.full())
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+ actual.append(queue.full())
+ empty.append(queue.empty())
+
+ self.assertEqual(actual, expected)
+ self.assertEqual(empty, [True, False, True])
def test_qsize(self):
expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0]
@@ -227,16 +273,16 @@ class TestQueueOps(TestBase):
queue = queues.create()
for _ in range(3):
actual.append(queue.qsize())
- queue.put(None, syncobj=True)
+ queue.put(None)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
- queue.put(None, syncobj=True)
+ queue.put(None)
actual.append(queue.qsize())
for _ in range(3):
queue.get()
actual.append(queue.qsize())
- queue.put(None, syncobj=True)
+ queue.put(None)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
@@ -245,70 +291,32 @@ class TestQueueOps(TestBase):
def test_put_get_main(self):
expected = list(range(20))
- for syncobj in (True, False):
- kwds = dict(syncobj=syncobj)
- with self.subTest(f'syncobj={syncobj}'):
- queue = queues.create()
- for i in range(20):
- queue.put(i, **kwds)
- actual = [queue.get() for _ in range(20)]
+ queue = queues.create()
+ for i in range(20):
+ queue.put(i)
+ actual = [queue.get() for _ in range(20)]
- self.assertEqual(actual, expected)
+ self.assertEqual(actual, expected)
def test_put_timeout(self):
- for syncobj in (True, False):
- kwds = dict(syncobj=syncobj)
- with self.subTest(f'syncobj={syncobj}'):
- queue = queues.create(2)
- queue.put(None, **kwds)
- queue.put(None, **kwds)
- with self.assertRaises(queues.QueueFull):
- queue.put(None, timeout=0.1, **kwds)
- queue.get()
- queue.put(None, **kwds)
+ queue = queues.create(2)
+ queue.put(None)
+ queue.put(None)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, timeout=0.1)
+ queue.get()
+ queue.put(None)
def test_put_nowait(self):
- for syncobj in (True, False):
- kwds = dict(syncobj=syncobj)
- with self.subTest(f'syncobj={syncobj}'):
- queue = queues.create(2)
- queue.put_nowait(None, **kwds)
- queue.put_nowait(None, **kwds)
- with self.assertRaises(queues.QueueFull):
- queue.put_nowait(None, **kwds)
- queue.get()
- queue.put_nowait(None, **kwds)
-
- def test_put_syncobj(self):
- for obj in [
- None,
- True,
- 10,
- 'spam',
- b'spam',
- (0, 'a'),
- ]:
- with self.subTest(repr(obj)):
- queue = queues.create()
-
- queue.put(obj, syncobj=True)
- obj2 = queue.get()
- self.assertEqual(obj2, obj)
-
- queue.put(obj, syncobj=True)
- obj2 = queue.get_nowait()
- self.assertEqual(obj2, obj)
-
- for obj in [
- [1, 2, 3],
- {'a': 13, 'b': 17},
- ]:
- with self.subTest(repr(obj)):
- queue = queues.create()
- with self.assertRaises(interpreters.NotShareableError):
- queue.put(obj, syncobj=True)
+ queue = queues.create(2)
+ queue.put_nowait(None)
+ queue.put_nowait(None)
+ with self.assertRaises(queues.QueueFull):
+ queue.put_nowait(None)
+ queue.get()
+ queue.put_nowait(None)
- def test_put_not_syncobj(self):
+ def test_put_full_fallback(self):
for obj in [
None,
True,
@@ -323,11 +331,11 @@ class TestQueueOps(TestBase):
with self.subTest(repr(obj)):
queue = queues.create()
- queue.put(obj, syncobj=False)
+ queue.put(obj)
obj2 = queue.get()
self.assertEqual(obj2, obj)
- queue.put(obj, syncobj=False)
+ queue.put(obj)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
@@ -341,24 +349,9 @@ class TestQueueOps(TestBase):
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
- def test_put_get_default_syncobj(self):
- expected = list(range(20))
- queue = queues.create(syncobj=True)
- for methname in ('get', 'get_nowait'):
- with self.subTest(f'{methname}()'):
- get = getattr(queue, methname)
- for i in range(20):
- queue.put(i)
- actual = [get() for _ in range(20)]
- self.assertEqual(actual, expected)
-
- obj = [1, 2, 3] # lists are not shareable
- with self.assertRaises(interpreters.NotShareableError):
- queue.put(obj)
-
- def test_put_get_default_not_syncobj(self):
+ def test_put_get_full_fallback(self):
expected = list(range(20))
- queue = queues.create(syncobj=False)
+ queue = queues.create()
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
@@ -377,14 +370,14 @@ class TestQueueOps(TestBase):
def test_put_get_same_interpreter(self):
interp = interpreters.create()
interp.exec(dedent("""
- from test.support.interpreters import queues
+ from concurrent.interpreters import _queues as queues
queue = queues.create()
"""))
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
interp.exec(dedent(f"""
orig = b'spam'
- queue.put(orig, syncobj=True)
+ queue.put(orig)
obj = queue.{methname}()
assert obj == orig, 'expected: obj == orig'
assert obj is not orig, 'expected: obj is not orig'
@@ -399,12 +392,12 @@ class TestQueueOps(TestBase):
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
obj1 = b'spam'
- queue1.put(obj1, syncobj=True)
+ queue1.put(obj1)
out = _run_output(
interp,
dedent(f"""
- from test.support.interpreters import queues
+ from concurrent.interpreters import _queues as queues
queue1 = queues.Queue({queue1.id})
queue2 = queues.Queue({queue2.id})
assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1'
@@ -416,7 +409,7 @@ class TestQueueOps(TestBase):
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
- queue2.put(obj2, syncobj=True)
+ queue2.put(obj2)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
self.assertEqual(len(queues.list_all()), 2)
@@ -433,22 +426,22 @@ class TestQueueOps(TestBase):
if not unbound:
extraargs = ''
elif unbound is queues.UNBOUND:
- extraargs = ', unbound=queues.UNBOUND'
+ extraargs = ', unbounditems=queues.UNBOUND'
elif unbound is queues.UNBOUND_ERROR:
- extraargs = ', unbound=queues.UNBOUND_ERROR'
+ extraargs = ', unbounditems=queues.UNBOUND_ERROR'
elif unbound is queues.UNBOUND_REMOVE:
- extraargs = ', unbound=queues.UNBOUND_REMOVE'
+ extraargs = ', unbounditems=queues.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
_run_output(interp, dedent(f"""
- from test.support.interpreters import queues
+ from concurrent.interpreters import _queues as queues
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
- queue.put(obj1, syncobj=True{extraargs})
- queue.put(obj2, syncobj=True{extraargs})
+ queue.put(obj1{extraargs})
+ queue.put(obj2{extraargs})
"""))
self.assertEqual(queue.qsize(), presize + 2)
@@ -501,11 +494,11 @@ class TestQueueOps(TestBase):
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
- queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
+ queue.put(b'ham', unbounditems=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 1)
interp = common(queue, queues.UNBOUND_REMOVE, 1)
self.assertEqual(queue.qsize(), 3)
- queue.put(42, unbound=queues.UNBOUND_REMOVE)
+ queue.put(42, unbounditems=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 4)
del interp
self.assertEqual(queue.qsize(), 2)
@@ -521,13 +514,13 @@ class TestQueueOps(TestBase):
queue = queues.create()
interp = interpreters.create()
_run_output(interp, dedent(f"""
- from test.support.interpreters import queues
+ from concurrent.interpreters import _queues as queues
queue = queues.Queue({queue.id})
- queue.put(1, syncobj=True, unbound=queues.UNBOUND)
- queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
- queue.put(3, syncobj=True)
- queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(5, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(1, unbounditems=queues.UNBOUND)
+ queue.put(2, unbounditems=queues.UNBOUND_ERROR)
+ queue.put(3)
+ queue.put(4, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(5, unbounditems=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 5)
@@ -555,16 +548,16 @@ class TestQueueOps(TestBase):
interp1 = interpreters.create()
interp2 = interpreters.create()
- queue.put(1, syncobj=True)
+ queue.put(1)
_run_output(interp1, dedent(f"""
- from test.support.interpreters import queues
+ from concurrent.interpreters import _queues as queues
queue = queues.Queue({queue.id})
obj1 = queue.get()
- queue.put(2, syncobj=True, unbound=queues.UNBOUND)
- queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+ queue.put(2, unbounditems=queues.UNBOUND)
+ queue.put(obj1, unbounditems=queues.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
- from test.support.interpreters import queues
+ from concurrent.interpreters import _queues as queues
queue = queues.Queue({queue.id})
obj2 = queue.get()
obj1 = queue.get()
@@ -572,21 +565,21 @@ class TestQueueOps(TestBase):
self.assertEqual(queue.qsize(), 0)
queue.put(3)
_run_output(interp1, dedent("""
- queue.put(4, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(4, unbounditems=queues.UNBOUND)
# interp closed here
- queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(6, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(5, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(6, unbounditems=queues.UNBOUND)
"""))
_run_output(interp2, dedent("""
- queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
+ queue.put(7, unbounditems=queues.UNBOUND_ERROR)
# interp closed here
- queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
- queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(8, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(obj1, unbounditems=queues.UNBOUND_ERROR)
+ queue.put(obj2, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(8, unbounditems=queues.UNBOUND)
"""))
_run_output(interp1, dedent("""
- queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(10, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(9, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(10, unbounditems=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 10)
@@ -642,12 +635,12 @@ class TestQueueOps(TestBase):
break
except queues.QueueEmpty:
continue
- queue2.put(obj, syncobj=True)
+ queue2.put(obj)
t = threading.Thread(target=f)
t.start()
orig = b'spam'
- queue1.put(orig, syncobj=True)
+ queue1.put(orig)
obj = queue2.get()
t.join()