aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/test/test_asyncio/test_tasks.py
diff options
context:
space:
mode:
authorJustin Turner Arthur <justinarthur@gmail.com>2024-04-01 12:07:29 -0500
committerGitHub <noreply@github.com>2024-04-01 20:07:29 +0300
commitc741ad3537193c63fe697a8f0316aecd45eeb9ba (patch)
tree4dc0eaac7cde2150ee10811779f1f62177524dc3 /Lib/test/test_asyncio/test_tasks.py
parentddf814db744006e0f42328aa15ace97c9d8ad681 (diff)
downloadcpython-c741ad3537193c63fe697a8f0316aecd45eeb9ba.tar.gz
cpython-c741ad3537193c63fe697a8f0316aecd45eeb9ba.zip
gh-77714: Provide an async iterator version of as_completed (GH-22491)
* as_completed returns object that is both iterator and async iterator * Existing tests adjusted to test both the old and new style * New test to ensure iterator can be resumed * New test to ensure async iterator yields any passed-in Futures as-is Co-authored-by: Serhiy Storchaka <storchaka@gmail.com> Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r--Lib/test/test_asyncio/test_tasks.py282
1 files changed, 219 insertions, 63 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 4dfaff847ed..bc6d88e65a4 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -1,6 +1,7 @@
"""Tests for tasks.py."""
import collections
+import contextlib
import contextvars
import gc
import io
@@ -1409,12 +1410,6 @@ class BaseTaskTests:
yield 0.01
yield 0
- loop = self.new_test_loop(gen)
- # disable "slow callback" warning
- loop.slow_callback_duration = 1.0
- completed = set()
- time_shifted = False
-
async def sleeper(dt, x):
nonlocal time_shifted
await asyncio.sleep(dt)
@@ -1424,21 +1419,78 @@ class BaseTaskTests:
loop.advance_time(0.14)
return x
- a = sleeper(0.01, 'a')
- b = sleeper(0.01, 'b')
- c = sleeper(0.15, 'c')
+ async def try_iterator(awaitables):
+ values = []
+ for f in asyncio.as_completed(awaitables):
+ values.append(await f)
+ return values
- async def foo():
+ async def try_async_iterator(awaitables):
values = []
- for f in asyncio.as_completed([b, c, a]):
+ async for f in asyncio.as_completed(awaitables):
values.append(await f)
return values
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
- self.assertTrue('a' in res[:2])
- self.assertTrue('b' in res[:2])
- self.assertEqual(res[2], 'c')
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ loop = self.new_test_loop(gen)
+ # disable "slow callback" warning
+ loop.slow_callback_duration = 1.0
+
+ completed = set()
+ time_shifted = False
+
+ a = sleeper(0.01, 'a')
+ b = sleeper(0.01, 'b')
+ c = sleeper(0.15, 'c')
+
+ res = loop.run_until_complete(self.new_task(loop, foo([b, c, a])))
+ self.assertAlmostEqual(0.15, loop.time())
+ self.assertTrue('a' in res[:2])
+ self.assertTrue('b' in res[:2])
+ self.assertEqual(res[2], 'c')
+
+ def test_as_completed_same_tasks_in_as_out(self):
+ # Ensures that asynchronously iterating as_completed's iterator
+ # yields awaitables are the same awaitables that were passed in when
+ # those awaitables are futures.
+ async def try_async_iterator(awaitables):
+ awaitables_out = set()
+ async for out_aw in asyncio.as_completed(awaitables):
+ awaitables_out.add(out_aw)
+ return awaitables_out
+
+ async def coro(i):
+ return i
+
+ with contextlib.closing(asyncio.new_event_loop()) as loop:
+ # Coroutines shouldn't be yielded back as finished coroutines
+ # can't be re-used.
+ awaitables_in = frozenset(
+ (coro(0), coro(1), coro(2), coro(3))
+ )
+ awaitables_out = loop.run_until_complete(
+ try_async_iterator(awaitables_in)
+ )
+ if awaitables_in - awaitables_out != awaitables_in:
+ raise self.failureException('Got original coroutines '
+ 'out of as_completed iterator.')
+
+ # Tasks should be yielded back.
+ coro_obj_a = coro('a')
+ task_b = loop.create_task(coro('b'))
+ coro_obj_c = coro('c')
+ task_d = loop.create_task(coro('d'))
+ awaitables_in = frozenset(
+ (coro_obj_a, task_b, coro_obj_c, task_d)
+ )
+ awaitables_out = loop.run_until_complete(
+ try_async_iterator(awaitables_in)
+ )
+ if awaitables_in & awaitables_out != {task_b, task_d}:
+ raise self.failureException('Only tasks should be yielded '
+ 'from as_completed iterator '
+ 'as-is.')
def test_as_completed_with_timeout(self):
@@ -1448,12 +1500,7 @@ class BaseTaskTests:
yield 0
yield 0.1
- loop = self.new_test_loop(gen)
-
- a = loop.create_task(asyncio.sleep(0.1, 'a'))
- b = loop.create_task(asyncio.sleep(0.15, 'b'))
-
- async def foo():
+ async def try_iterator():
values = []
for f in asyncio.as_completed([a, b], timeout=0.12):
if values:
@@ -1465,16 +1512,33 @@ class BaseTaskTests:
values.append((2, exc))
return values
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertEqual(len(res), 2, res)
- self.assertEqual(res[0], (1, 'a'))
- self.assertEqual(res[1][0], 2)
- self.assertIsInstance(res[1][1], asyncio.TimeoutError)
- self.assertAlmostEqual(0.12, loop.time())
+ async def try_async_iterator():
+ values = []
+ try:
+ async for f in asyncio.as_completed([a, b], timeout=0.12):
+ v = await f
+ values.append((1, v))
+ loop.advance_time(0.02)
+ except asyncio.TimeoutError as exc:
+ values.append((2, exc))
+ return values
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ loop = self.new_test_loop(gen)
+ a = loop.create_task(asyncio.sleep(0.1, 'a'))
+ b = loop.create_task(asyncio.sleep(0.15, 'b'))
+
+ res = loop.run_until_complete(self.new_task(loop, foo()))
+ self.assertEqual(len(res), 2, res)
+ self.assertEqual(res[0], (1, 'a'))
+ self.assertEqual(res[1][0], 2)
+ self.assertIsInstance(res[1][1], asyncio.TimeoutError)
+ self.assertAlmostEqual(0.12, loop.time())
+
+ # move forward to close generator
+ loop.advance_time(10)
+ loop.run_until_complete(asyncio.wait([a, b]))
def test_as_completed_with_unused_timeout(self):
@@ -1483,19 +1547,75 @@ class BaseTaskTests:
yield 0
yield 0.01
- loop = self.new_test_loop(gen)
-
- a = asyncio.sleep(0.01, 'a')
-
- async def foo():
+ async def try_iterator():
for f in asyncio.as_completed([a], timeout=1):
v = await f
self.assertEqual(v, 'a')
- loop.run_until_complete(self.new_task(loop, foo()))
+ async def try_async_iterator():
+ async for f in asyncio.as_completed([a], timeout=1):
+ v = await f
+ self.assertEqual(v, 'a')
- def test_as_completed_reverse_wait(self):
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ a = asyncio.sleep(0.01, 'a')
+ loop = self.new_test_loop(gen)
+ loop.run_until_complete(self.new_task(loop, foo()))
+ loop.close()
+
+ def test_as_completed_resume_iterator(self):
+ # Test that as_completed returns an iterator that can be resumed
+ # the next time iteration is performed (i.e. if __iter__ is called
+ # again)
+ async def try_iterator(awaitables):
+ iterations = 0
+ iterator = asyncio.as_completed(awaitables)
+ collected = []
+ for f in iterator:
+ collected.append(await f)
+ iterations += 1
+ if iterations == 2:
+ break
+ self.assertEqual(len(collected), 2)
+
+ # Resume same iterator:
+ for f in iterator:
+ collected.append(await f)
+ return collected
+
+ async def try_async_iterator(awaitables):
+ iterations = 0
+ iterator = asyncio.as_completed(awaitables)
+ collected = []
+ async for f in iterator:
+ collected.append(await f)
+ iterations += 1
+ if iterations == 2:
+ break
+ self.assertEqual(len(collected), 2)
+
+ # Resume same iterator:
+ async for f in iterator:
+ collected.append(await f)
+ return collected
+
+ async def coro(i):
+ return i
+
+ with contextlib.closing(asyncio.new_event_loop()) as loop:
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ results = loop.run_until_complete(
+ foo((coro(0), coro(1), coro(2), coro(3)))
+ )
+ self.assertCountEqual(results, (0, 1, 2, 3))
+ def test_as_completed_reverse_wait(self):
+ # Tests the plain iterator style of as_completed iteration to
+ # ensure that the first future awaited resolves to the first
+ # completed awaitable from the set we passed in, even if it wasn't
+ # the first future generated by as_completed.
def gen():
yield 0
yield 0.05
@@ -1522,7 +1642,8 @@ class BaseTaskTests:
loop.run_until_complete(test())
def test_as_completed_concurrent(self):
-
+ # Ensure that more than one future or coroutine yielded from
+ # as_completed can be awaited concurrently.
def gen():
when = yield
self.assertAlmostEqual(0.05, when)
@@ -1530,38 +1651,55 @@ class BaseTaskTests:
self.assertAlmostEqual(0.05, when)
yield 0.05
- a = asyncio.sleep(0.05, 'a')
- b = asyncio.sleep(0.05, 'b')
- fs = {a, b}
+ async def try_iterator(fs):
+ return list(asyncio.as_completed(fs))
- async def test():
- futs = list(asyncio.as_completed(fs))
- self.assertEqual(len(futs), 2)
- done, pending = await asyncio.wait(
- [asyncio.ensure_future(fut) for fut in futs]
- )
- self.assertEqual(set(f.result() for f in done), {'a', 'b'})
+ async def try_async_iterator(fs):
+ return [f async for f in asyncio.as_completed(fs)]
- loop = self.new_test_loop(gen)
- loop.run_until_complete(test())
+ for runner in try_iterator, try_async_iterator:
+ with self.subTest(method=runner.__name__):
+ a = asyncio.sleep(0.05, 'a')
+ b = asyncio.sleep(0.05, 'b')
+ fs = {a, b}
+
+ async def test():
+ futs = await runner(fs)
+ self.assertEqual(len(futs), 2)
+ done, pending = await asyncio.wait(
+ [asyncio.ensure_future(fut) for fut in futs]
+ )
+ self.assertEqual(set(f.result() for f in done), {'a', 'b'})
+
+ loop = self.new_test_loop(gen)
+ loop.run_until_complete(test())
def test_as_completed_duplicate_coroutines(self):
async def coro(s):
return s
- async def runner():
+ async def try_iterator():
result = []
c = coro('ham')
for f in asyncio.as_completed([c, c, coro('spam')]):
result.append(await f)
return result
- fut = self.new_task(self.loop, runner())
- self.loop.run_until_complete(fut)
- result = fut.result()
- self.assertEqual(set(result), {'ham', 'spam'})
- self.assertEqual(len(result), 2)
+ async def try_async_iterator():
+ result = []
+ c = coro('ham')
+ async for f in asyncio.as_completed([c, c, coro('spam')]):
+ result.append(await f)
+ return result
+
+ for runner in try_iterator, try_async_iterator:
+ with self.subTest(method=runner.__name__):
+ fut = self.new_task(self.loop, runner())
+ self.loop.run_until_complete(fut)
+ result = fut.result()
+ self.assertEqual(set(result), {'ham', 'spam'})
+ self.assertEqual(len(result), 2)
def test_as_completed_coroutine_without_loop(self):
async def coro():
@@ -1570,8 +1708,8 @@ class BaseTaskTests:
a = coro()
self.addCleanup(a.close)
- futs = asyncio.as_completed([a])
with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ futs = asyncio.as_completed([a])
list(futs)
def test_as_completed_coroutine_use_running_loop(self):
@@ -2044,14 +2182,32 @@ class BaseTaskTests:
self.assertEqual(res, 42)
def test_as_completed_invalid_args(self):
+ # as_completed() expects a list of futures, not a future instance
+ # TypeError should be raised either on iterator construction or first
+ # iteration
+
+ # Plain iterator
fut = self.new_future(self.loop)
+ with self.assertRaises(TypeError):
+ iterator = asyncio.as_completed(fut)
+ next(iterator)
+ coro = coroutine_function()
+ with self.assertRaises(TypeError):
+ iterator = asyncio.as_completed(coro)
+ next(iterator)
+ coro.close()
- # as_completed() expects a list of futures, not a future instance
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(fut))
+ # Async iterator
+ async def try_async_iterator(aw):
+ async for f in asyncio.as_completed(aw):
+ break
+
+ fut = self.new_future(self.loop)
+ with self.assertRaises(TypeError):
+ self.loop.run_until_complete(try_async_iterator(fut))
coro = coroutine_function()
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(coro))
+ with self.assertRaises(TypeError):
+ self.loop.run_until_complete(try_async_iterator(coro))
coro.close()
def test_wait_invalid_args(self):