aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/InternalDocs/asyncio.md
diff options
context:
space:
mode:
Diffstat (limited to 'InternalDocs/asyncio.md')
-rw-r--r--InternalDocs/asyncio.md328
1 files changed, 328 insertions, 0 deletions
diff --git a/InternalDocs/asyncio.md b/InternalDocs/asyncio.md
new file mode 100644
index 00000000000..22159852ca5
--- /dev/null
+++ b/InternalDocs/asyncio.md
@@ -0,0 +1,328 @@
+asyncio
+=======
+
+
+This document describes the working and implementation details of the
+[`asyncio`](https://docs.python.org/3/library/asyncio.html) module.
+
+**The following section describes the implementation details of the C implementation**.
+
+# Task management
+
+## Pre-Python 3.14 implementation
+
+Before Python 3.14, the C implementation of `asyncio` used a
+[`WeakSet`](https://docs.python.org/3/library/weakref.html#weakref.WeakSet)
+to store all the tasks created by the event loop. `WeakSet` was used
+so that the event loop doesn't hold strong references to the tasks,
+allowing them to be garbage collected when they are no longer needed.
+The current task of the event loop was stored in a dict mapping the
+event loop to the current task.
+
+```c
+ /* Dictionary containing tasks that are currently active in
+ all running event loops. {EventLoop: Task} */
+ PyObject *current_tasks;
+
+ /* WeakSet containing all tasks scheduled to run on event loops. */
+ PyObject *scheduled_tasks;
+```
+
+This implementation had a few drawbacks:
+
+1. **Performance**: Using a `WeakSet` for storing tasks is
+inefficient, as it requires maintaining a full set of weak references
+to tasks along with corresponding weakref callback to cleanup the
+tasks when they are garbage collected. This increases the work done
+by the garbage collector, and in applications with a large number of
+tasks, this becomes a bottleneck, with increased memory usage and
+lower performance. Looking up the current task was slow as it required
+a dictionary lookup on the `current_tasks` dict.
+
+2. **Thread safety**: Before Python 3.14, concurrent iterations over
+`WeakSet` was not thread safe[^1]. This meant calling APIs like
+`asyncio.all_tasks()` could lead to inconsistent results or even
+`RuntimeError` if used in multiple threads[^2].
+
+3. **Poor scaling in free-threading**: Using global `WeakSet` for
+storing all tasks across all threads lead to contention when adding
+and removing tasks from the set which is a frequent operation. As such
+it performed poorly in free-threading and did not scale well with the
+number of threads. Similarly, accessing the current task in multiple
+threads did not scale due to contention on the global `current_tasks`
+dictionary.
+
+## Python 3.14 implementation
+
+To address these issues, Python 3.14 implements several changes to
+improve the performance and thread safety of tasks management.
+
+- **Per-thread double linked list for tasks**: Python 3.14 introduces
+ a per-thread circular double linked list implementation for
+ storing tasks. This allows each thread to maintain its own list of
+ tasks and allows for lock free addition and removal of tasks. This
+ is designed to be efficient, and thread-safe and scales well with
+ the number of threads in free-threading. This also allows external
+ introspection tools such as `python -m asyncio pstree` to inspect
+ tasks running in all threads and was implemented as part of [Audit
+ asyncio thread
+ safety](https://github.com/python/cpython/issues/128002).
+
+- **Per-thread current task**: Python 3.14 stores the current task on
+ the current thread state instead of a global dictionary. This
+ allows for faster access to the current task without the need for
+ a dictionary lookup. Each thread maintains its own current task,
+ which is stored in the `PyThreadState` structure. This was
+ implemented in https://github.com/python/cpython/issues/129898.
+
+Storing the current task and list of all tasks per-thread instead of
+storing it per-loop was chosen primarily to support external
+introspection tools such as `python -m asyncio pstree` as looking up
+arbitrary attributes on the loop object is not possible
+externally. Storing data per-thread also makes it easy to support
+third party event loop implementations such as `uvloop`, and is more
+efficient for the single threaded asyncio use-case as it avoids the
+overhead of attribute lookups on the loop object and several other
+calls on the performance critical path of adding and removing tasks
+from the per-loop task list.
+
+## Per-thread double linked list for tasks
+
+This implementation uses a circular doubly linked list to store tasks
+on the thread states. This is used for all tasks which are instances
+of `asyncio.Task` or subclasses of it, for third-party tasks a
+fallback `WeakSet` implementation is used. The linked list is
+implemented using an embedded `llist_node` structure within each
+`TaskObj`. By embedding the list node directly into the task object,
+the implementation avoids additional memory allocations for linked
+list nodes.
+
+The `PyThreadState` structure gained a new field `asyncio_tasks_head`,
+which serves as the head of the circular linked list of tasks. This
+allows for lock free addition and removal of tasks from the list.
+
+It is possible that when a thread state is deallocated, there are
+lingering tasks in its list; this can happen if another thread has
+references to the tasks of this thread. Therefore, the
+`PyInterpreterState` structure also gains a new `asyncio_tasks_head`
+field to store any lingering tasks. When a thread state is
+deallocated, any remaining lingering tasks are moved to the
+interpreter state tasks list, and the thread state tasks list is
+cleared. The `asyncio_tasks_lock` is used protect the interpreter's
+tasks list from concurrent modifications.
+
+```c
+typedef struct TaskObj {
+ ...
+ struct llist_node asyncio_node;
+} TaskObj;
+
+typedef struct PyThreadState {
+ ...
+ struct llist_node asyncio_tasks_head;
+} PyThreadState;
+
+typedef struct PyInterpreterState {
+ ...
+ struct llist_node asyncio_tasks_head;
+ PyMutex asyncio_tasks_lock;
+} PyInterpreterState;
+```
+
+When a task is created, it is added to the current thread's list of
+tasks by the `register_task` function. When the task is done, it is
+removed from the list by the `unregister_task` function. In
+free-threading, the thread id of the thread which created the task is
+stored in `task_tid` field of the `TaskObj`. This is used to check if
+the task is being removed from the correct thread's task list. If the
+current thread is same as the thread which created it then no locking
+is required, otherwise in free-threading, the `stop-the-world` pause
+is used to pause all other threads and then safely remove the task
+from the tasks list.
+
+```mermaid
+
+flowchart TD
+ subgraph one["Executing Thread"]
+ A["task = asyncio.create_task(coro())"] -->B("register_task(task)")
+ B --> C{"task->task_state?"}
+ C -->|pending| D["task_step(task)"]
+ C -->|done| F["unregister_task(task)"]
+ C -->|cancelled| F["unregister_task(task)"]
+ D --> C
+ F --> G{"free-threading?"}
+ G --> |false| H["unregister_task_safe(task)"]
+ G --> |true| J{"correct thread? <br>task->task_tid == _Py_ThreadId()"}
+ J --> |true| H
+ J --> |false| I["stop the world <br> pause all threads"]
+ I --> H["unregister_task_safe(task)"]
+ end
+ subgraph two["Thread deallocating"]
+ A1{"thread's task list empty? <br> llist_empty(tstate->asyncio_tasks_head)"}
+ A1 --> |true| B1["deallocate thread<br>free_threadstate(tstate)"]
+ A1 --> |false| C1["add tasks to interpreter's task list<br> llist_concat(&tstate->interp->asyncio_tasks_head,
+ &tstate->asyncio_tasks_head)"]
+ C1 --> B1
+ end
+
+ one --> two
+```
+
+`asyncio.all_tasks` now iterates over the per-thread task lists of all
+threads and the interpreter's task list to get all the tasks. In
+free-threading, this is done by pausing all the threads using the
+`stop-the-world` pause to ensure that no tasks are being added or
+removed while iterating over the lists. This allows for a consistent
+view of all task lists across all threads and is thread safe.
+
+This design allows for lock free execution and scales well in
+free-threading with multiple event loops running in different threads.
+
+## Per-thread current task
+
+This implementation stores the current task in the `PyThreadState`
+structure, which allows for faster access to the current task without
+the need for a dictionary lookup.
+
+```c
+typedef struct PyThreadState {
+ ...
+ PyObject *asyncio_current_loop;
+ PyObject *asyncio_current_task;
+} PyThreadState;
+```
+
+When a task is entered or left, the current task is updated in the
+thread state using `enter_task` and `leave_task` functions. When
+`current_task(loop)` is called where `loop` is the current running
+event loop of the current thread, no locking is required as the
+current task is stored in the thread state and is returned directly
+(general case). Otherwise, if the `loop` is not current running event
+loop, the `stop-the-world` pause is used to pause all threads in
+free-threading and then by iterating over all the thread states and
+checking if the `loop` matches with `tstate->asyncio_current_loop`,
+the current task is found and returned. If no matching thread state is
+found, `None` is returned.
+
+In free-threading, it avoids contention on a global dictionary as
+threads can access the current task of thier running loop without any
+locking.
+
+---
+
+**The following section describes the implementation details of the Python implementation**.
+
+# async generators
+
+This section describes the implementation details of async generators in `asyncio`.
+
+Since async generators are meant to be used from coroutines,
+their finalization (execution of finally blocks) needs
+to be done while the loop is running.
+Most async generators are closed automatically
+when they are fully iterated over and exhausted; however,
+if the async generator is not fully iterated over,
+it may not be closed properly, leading to the `finally` blocks not being executed.
+
+Consider the following code:
+```py
+import asyncio
+
+async def agen():
+ try:
+ yield 1
+ finally:
+ await asyncio.sleep(1)
+ print("finally executed")
+
+
+async def main():
+ async for i in agen():
+ break
+
+loop = asyncio.EventLoop()
+loop.run_until_complete(main())
+```
+
+The above code will not print "finally executed", because the
+async generator `agen` is not fully iterated over
+and it is not closed manually by awaiting `agen.aclose()`.
+
+To solve this, `asyncio` uses the `sys.set_asyncgen_hooks` function to
+set hooks for finalizing async generators as described in
+[PEP 525](https://peps.python.org/pep-0525/).
+
+- **firstiter hook**: When the async generator is iterated over for the first time,
+the *firstiter hook* is called. The async generator is added to `loop._asyncgens` WeakSet
+and the event loop tracks all active async generators.
+
+- **finalizer hook**: When the async generator is about to be finalized,
+the *finalizer hook* is called. The event loop removes the async generator
+from `loop._asyncgens` WeakSet, and schedules the finalization of the async
+generator by creating a task calling `agen.aclose()`. This ensures that the
+finally block is executed while the event loop is running. When the loop is
+shutting down, the loop checks if there are active async generators and if so,
+it similarly schedules the finalization of all active async generators by calling
+`agen.aclose()` on each of them and waits for them to complete before shutting
+down the loop.
+
+This ensures that the async generator's `finally` blocks are executed even
+if the generator is not explicitly closed.
+
+Consider the following example:
+
+```python
+import asyncio
+
+async def agen():
+ try:
+ yield 1
+ yield 2
+ finally:
+ print("executing finally block")
+
+async def main():
+ async for item in agen():
+ print(item)
+ break # not fully iterated
+
+asyncio.run(main())
+```
+
+```mermaid
+flowchart TD
+ subgraph one["Loop running"]
+ A["asyncio.run(main())"] --> B
+ B["set async generator hooks <br> sys.set_asyncgen_hooks()"] --> C
+ C["async for item in agen"] --> F
+ F{"first iteration?"} --> |true|D
+ F{"first iteration?"} --> |false|H
+ D["calls firstiter hook<br>loop._asyncgen_firstiter_hook(agen)"] --> E
+ E["add agen to WeakSet<br> loop._asyncgens.add(agen)"] --> H
+ H["item = await agen.\_\_anext\_\_()"] --> J
+ J{"StopAsyncIteration?"} --> |true|M
+ J{"StopAsyncIteration?"} --> |false|I
+ I["print(item)"] --> S
+ S{"continue iterating?"} --> |true|C
+ S{"continue iterating?"} --> |false|M
+ M{"agen is no longer referenced?"} --> |true|N
+ M{"agen is no longer referenced?"} --> |false|two
+ N["finalize agen<br>_PyGen_Finalize(agen)"] --> O
+ O["calls finalizer hook<br>loop._asyncgen_finalizer_hook(agen)"] --> P
+ P["remove agen from WeakSet<br>loop._asyncgens.discard(agen)"] --> Q
+ Q["schedule task to close it<br>self.create_task(agen.aclose())"] --> R
+ R["print('executing finally block')"] --> E1
+
+ end
+
+ subgraph two["Loop shutting down"]
+ A1{"check for alive async generators?"} --> |true|B1
+ B1["close all async generators <br> await asyncio.gather\(*\[ag.aclose\(\) for ag in loop._asyncgens\]"] --> R
+ A1{"check for alive async generators?"} --> |false|E1
+ E1["loop.close()"]
+ end
+
+```
+
+[^1]: https://github.com/python/cpython/issues/123089
+[^2]: https://github.com/python/cpython/issues/80788