aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
authorJustin Turner Arthur <justinarthur@gmail.com>2024-04-01 12:07:29 -0500
committerGitHub <noreply@github.com>2024-04-01 20:07:29 +0300
commitc741ad3537193c63fe697a8f0316aecd45eeb9ba (patch)
tree4dc0eaac7cde2150ee10811779f1f62177524dc3 /Lib/asyncio/tasks.py
parentddf814db744006e0f42328aa15ace97c9d8ad681 (diff)
downloadcpython-c741ad3537193c63fe697a8f0316aecd45eeb9ba.tar.gz
cpython-c741ad3537193c63fe697a8f0316aecd45eeb9ba.zip
gh-77714: Provide an async iterator version of as_completed (GH-22491)
* as_completed returns object that is both iterator and async iterator * Existing tests adjusted to test both the old and new style * New test to ensure iterator can be resumed * New test to ensure async iterator yields any passed-in Futures as-is Co-authored-by: Serhiy Storchaka <storchaka@gmail.com> Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r--Lib/asyncio/tasks.py152
1 files changed, 108 insertions, 44 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 48e31af9a43..7fb697b9441 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -25,6 +25,7 @@ from . import coroutines
from . import events
from . import exceptions
from . import futures
+from . import queues
from . import timeouts
# Helper to generate new task names
@@ -564,62 +565,125 @@ async def _cancel_and_wait(fut):
fut.remove_done_callback(cb)
-# This is *not* a @coroutine! It is just an iterator (yielding Futures).
+class _AsCompletedIterator:
+ """Iterator of awaitables representing tasks of asyncio.as_completed.
+
+ As an asynchronous iterator, iteration yields futures as they finish. As a
+ plain iterator, new coroutines are yielded that will return or raise the
+ result of the next underlying future to complete.
+ """
+ def __init__(self, aws, timeout):
+ self._done = queues.Queue()
+ self._timeout_handle = None
+
+ loop = events.get_event_loop()
+ todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
+ for f in todo:
+ f.add_done_callback(self._handle_completion)
+ if todo and timeout is not None:
+ self._timeout_handle = (
+ loop.call_later(timeout, self._handle_timeout)
+ )
+ self._todo = todo
+ self._todo_left = len(todo)
+
+ def __aiter__(self):
+ return self
+
+ def __iter__(self):
+ return self
+
+ async def __anext__(self):
+ if not self._todo_left:
+ raise StopAsyncIteration
+ assert self._todo_left > 0
+ self._todo_left -= 1
+ return await self._wait_for_one()
+
+ def __next__(self):
+ if not self._todo_left:
+ raise StopIteration
+ assert self._todo_left > 0
+ self._todo_left -= 1
+ return self._wait_for_one(resolve=True)
+
+ def _handle_timeout(self):
+ for f in self._todo:
+ f.remove_done_callback(self._handle_completion)
+ self._done.put_nowait(None) # Sentinel for _wait_for_one().
+ self._todo.clear() # Can't do todo.remove(f) in the loop.
+
+ def _handle_completion(self, f):
+ if not self._todo:
+ return # _handle_timeout() was here first.
+ self._todo.remove(f)
+ self._done.put_nowait(f)
+ if not self._todo and self._timeout_handle is not None:
+ self._timeout_handle.cancel()
+
+ async def _wait_for_one(self, resolve=False):
+ # Wait for the next future to be done and return it unless resolve is
+ # set, in which case return either the result of the future or raise
+ # an exception.
+ f = await self._done.get()
+ if f is None:
+ # Dummy value from _handle_timeout().
+ raise exceptions.TimeoutError
+ return f.result() if resolve else f
+
+
def as_completed(fs, *, timeout=None):
- """Return an iterator whose values are coroutines.
+ """Create an iterator of awaitables or their results in completion order.
- When waiting for the yielded coroutines you'll get the results (or
- exceptions!) of the original Futures (or coroutines), in the order
- in which and as soon as they complete.
+ Run the supplied awaitables concurrently. The returned object can be
+ iterated to obtain the results of the awaitables as they finish.
- This differs from PEP 3148; the proper way to use this is:
+ The object returned can be iterated as an asynchronous iterator or a plain
+ iterator. When asynchronous iteration is used, the originally-supplied
+ awaitables are yielded if they are tasks or futures. This makes it easy to
+ correlate previously-scheduled tasks with their results:
- for f in as_completed(fs):
- result = await f # The 'await' may raise.
- # Use result.
+ ipv4_connect = create_task(open_connection("127.0.0.1", 80))
+ ipv6_connect = create_task(open_connection("::1", 80))
+ tasks = [ipv4_connect, ipv6_connect]
- If a timeout is specified, the 'await' will raise
- TimeoutError when the timeout occurs before all Futures are done.
+ async for earliest_connect in as_completed(tasks):
+ # earliest_connect is done. The result can be obtained by
+ # awaiting it or calling earliest_connect.result()
+ reader, writer = await earliest_connect
- Note: The futures 'f' are not necessarily members of fs.
- """
- if futures.isfuture(fs) or coroutines.iscoroutine(fs):
- raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
+ if earliest_connect is ipv6_connect:
+ print("IPv6 connection established.")
+ else:
+ print("IPv4 connection established.")
- from .queues import Queue # Import here to avoid circular import problem.
- done = Queue()
+ During asynchronous iteration, implicitly-created tasks will be yielded for
+ supplied awaitables that aren't tasks or futures.
- loop = events.get_event_loop()
- todo = {ensure_future(f, loop=loop) for f in set(fs)}
- timeout_handle = None
+ When used as a plain iterator, each iteration yields a new coroutine that
+ returns the result or raises the exception of the next completed awaitable.
+ This pattern is compatible with Python versions older than 3.13:
- def _on_timeout():
- for f in todo:
- f.remove_done_callback(_on_completion)
- done.put_nowait(None) # Queue a dummy value for _wait_for_one().
- todo.clear() # Can't do todo.remove(f) in the loop.
+ ipv4_connect = create_task(open_connection("127.0.0.1", 80))
+ ipv6_connect = create_task(open_connection("::1", 80))
+ tasks = [ipv4_connect, ipv6_connect]
- def _on_completion(f):
- if not todo:
- return # _on_timeout() was here first.
- todo.remove(f)
- done.put_nowait(f)
- if not todo and timeout_handle is not None:
- timeout_handle.cancel()
+ for next_connect in as_completed(tasks):
+ # next_connect is not one of the original task objects. It must be
+ # awaited to obtain the result value or raise the exception of the
+ # awaitable that finishes next.
+ reader, writer = await next_connect
- async def _wait_for_one():
- f = await done.get()
- if f is None:
- # Dummy value from _on_timeout().
- raise exceptions.TimeoutError
- return f.result() # May raise f.exception().
+ A TimeoutError is raised if the timeout occurs before all awaitables are
+ done. This is raised by the async for loop during asynchronous iteration or
+ by the coroutines yielded during plain iteration.
+ """
+ if inspect.isawaitable(fs):
+ raise TypeError(
+ f"expects an iterable of awaitables, not {type(fs).__name__}"
+ )
- for f in todo:
- f.add_done_callback(_on_completion)
- if todo and timeout is not None:
- timeout_handle = loop.call_later(timeout, _on_timeout)
- for _ in range(len(todo)):
- yield _wait_for_one()
+ return _AsCompletedIterator(fs, timeout)
@types.coroutine