diff options
author | Jim Mussared <jim.mussared@gmail.com> | 2023-06-08 15:51:50 +1000 |
---|---|---|
committer | Damien George <damien@micropython.org> | 2023-06-19 17:33:03 +1000 |
commit | 2fbc08c462e247e7f78460783c9a07c76c5b762e (patch) | |
tree | cfda4eb3b04a6cc281fb0ea668a15b6216353c16 /extmod/uasyncio | |
parent | ed962f1f233eb74edf2cee83dc488d3cac5e02ee (diff) | |
download | micropython-2fbc08c462e247e7f78460783c9a07c76c5b762e.tar.gz micropython-2fbc08c462e247e7f78460783c9a07c76c5b762e.zip |
extmod/asyncio: Rename uasyncio to asyncio.
The asyncio module now has much better CPython compatibility and
deserves to be just called "asyncio".
This will avoid people having to write `from uasyncio import asyncio`.
Renames all files, and updates port manifests to use the new path. Also
renames the built-in _uasyncio to _asyncio.
This work was funded through GitHub Sponsors.
Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
Diffstat (limited to 'extmod/uasyncio')
-rw-r--r-- | extmod/uasyncio/__init__.py | 31 | ||||
-rw-r--r-- | extmod/uasyncio/core.py | 302 | ||||
-rw-r--r-- | extmod/uasyncio/event.py | 66 | ||||
-rw-r--r-- | extmod/uasyncio/funcs.py | 130 | ||||
-rw-r--r-- | extmod/uasyncio/lock.py | 55 | ||||
-rw-r--r-- | extmod/uasyncio/manifest.py | 15 | ||||
-rw-r--r-- | extmod/uasyncio/stream.py | 189 | ||||
-rw-r--r-- | extmod/uasyncio/task.py | 177 |
8 files changed, 0 insertions, 965 deletions
diff --git a/extmod/uasyncio/__init__.py b/extmod/uasyncio/__init__.py deleted file mode 100644 index 373de52af4..0000000000 --- a/extmod/uasyncio/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019 Damien P. George - -from .core import * - -__version__ = (3, 0, 0) - -_attrs = { - "wait_for": "funcs", - "wait_for_ms": "funcs", - "gather": "funcs", - "Event": "event", - "ThreadSafeFlag": "event", - "Lock": "lock", - "open_connection": "stream", - "start_server": "stream", - "StreamReader": "stream", - "StreamWriter": "stream", -} - - -# Lazy loader, effectively does: -# global attr -# from .mod import attr -def __getattr__(attr): - mod = _attrs.get(attr, None) - if mod is None: - raise AttributeError(attr) - value = getattr(__import__(mod, None, None, True, 1), attr) - globals()[attr] = value - return value diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py deleted file mode 100644 index f191d202e2..0000000000 --- a/extmod/uasyncio/core.py +++ /dev/null @@ -1,302 +0,0 @@ -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019 Damien P. George - -from time import ticks_ms as ticks, ticks_diff, ticks_add -import sys, select - -# Import TaskQueue and Task, preferring built-in C code over Python code -try: - from _uasyncio import TaskQueue, Task -except: - from .task import TaskQueue, Task - - -################################################################################ -# Exceptions - - -class CancelledError(BaseException): - pass - - -class TimeoutError(Exception): - pass - - -# Used when calling Loop.call_exception_handler -_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None} - - -################################################################################ -# Sleep functions - - -# "Yield" once, then raise StopIteration -class SingletonGenerator: - def __init__(self): - self.state = None - self.exc = StopIteration() - - def __iter__(self): - return self - - def __next__(self): - if self.state is not None: - _task_queue.push(cur_task, self.state) - self.state = None - return None - else: - self.exc.__traceback__ = None - raise self.exc - - -# Pause task execution for the given time (integer in milliseconds, uPy extension) -# Use a SingletonGenerator to do it without allocating on the heap -def sleep_ms(t, sgen=SingletonGenerator()): - assert sgen.state is None - sgen.state = ticks_add(ticks(), max(0, t)) - return sgen - - -# Pause task execution for the given time (in seconds) -def sleep(t): - return sleep_ms(int(t * 1000)) - - -################################################################################ -# Queue and poller for stream IO - - -class IOQueue: - def __init__(self): - self.poller = select.poll() - self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream] - - def _enqueue(self, s, idx): - if id(s) not in self.map: - entry = [None, None, s] - entry[idx] = cur_task - self.map[id(s)] = entry - self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) - else: - sm = self.map[id(s)] - assert sm[idx] is None - assert sm[1 - idx] is not None - sm[idx] = cur_task - self.poller.modify(s, select.POLLIN | select.POLLOUT) - # Link task to this IOQueue so it can be removed if needed - cur_task.data = self - - def _dequeue(self, s): - del self.map[id(s)] - self.poller.unregister(s) - - def queue_read(self, s): - self._enqueue(s, 0) - - def queue_write(self, s): - self._enqueue(s, 1) - - def remove(self, task): - while True: - del_s = None - for k in self.map: # Iterate without allocating on the heap - q0, q1, s = self.map[k] - if q0 is task or q1 is task: - del_s = s - break - if del_s is not None: - self._dequeue(s) - else: - break - - def wait_io_event(self, dt): - for s, ev in self.poller.ipoll(dt): - sm = self.map[id(s)] - # print('poll', s, sm, ev) - if ev & ~select.POLLOUT and sm[0] is not None: - # POLLIN or error - _task_queue.push(sm[0]) - sm[0] = None - if ev & ~select.POLLIN and sm[1] is not None: - # POLLOUT or error - _task_queue.push(sm[1]) - sm[1] = None - if sm[0] is None and sm[1] is None: - self._dequeue(s) - elif sm[0] is None: - self.poller.modify(s, select.POLLOUT) - else: - self.poller.modify(s, select.POLLIN) - - -################################################################################ -# Main run loop - - -# Ensure the awaitable is a task -def _promote_to_task(aw): - return aw if isinstance(aw, Task) else create_task(aw) - - -# Create and schedule a new task from a coroutine -def create_task(coro): - if not hasattr(coro, "send"): - raise TypeError("coroutine expected") - t = Task(coro, globals()) - _task_queue.push(t) - return t - - -# Keep scheduling tasks until there are none left to schedule -def run_until_complete(main_task=None): - global cur_task - excs_all = (CancelledError, Exception) # To prevent heap allocation in loop - excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop - while True: - # Wait until the head of _task_queue is ready to run - dt = 1 - while dt > 0: - dt = -1 - t = _task_queue.peek() - if t: - # A task waiting on _task_queue; "ph_key" is time to schedule task at - dt = max(0, ticks_diff(t.ph_key, ticks())) - elif not _io_queue.map: - # No tasks can be woken so finished running - return - # print('(poll {})'.format(dt), len(_io_queue.map)) - _io_queue.wait_io_event(dt) - - # Get next task to run and continue it - t = _task_queue.pop() - cur_task = t - try: - # Continue running the coroutine, it's responsible for rescheduling itself - exc = t.data - if not exc: - t.coro.send(None) - else: - # If the task is finished and on the run queue and gets here, then it - # had an exception and was not await'ed on. Throwing into it now will - # raise StopIteration and the code below will catch this and run the - # call_exception_handler function. - t.data = None - t.coro.throw(exc) - except excs_all as er: - # Check the task is not on any event queue - assert t.data is None - # This task is done, check if it's the main task and then loop should stop - if t is main_task: - if isinstance(er, StopIteration): - return er.value - raise er - if t.state: - # Task was running but is now finished. - waiting = False - if t.state is True: - # "None" indicates that the task is complete and not await'ed on (yet). - t.state = None - elif callable(t.state): - # The task has a callback registered to be called on completion. - t.state(t, er) - t.state = False - waiting = True - else: - # Schedule any other tasks waiting on the completion of this task. - while t.state.peek(): - _task_queue.push(t.state.pop()) - waiting = True - # "False" indicates that the task is complete and has been await'ed on. - t.state = False - if not waiting and not isinstance(er, excs_stop): - # An exception ended this detached task, so queue it for later - # execution to handle the uncaught exception if no other task retrieves - # the exception in the meantime (this is handled by Task.throw). - _task_queue.push(t) - # Save return value of coro to pass up to caller. - t.data = er - elif t.state is None: - # Task is already finished and nothing await'ed on the task, - # so call the exception handler. - _exc_context["exception"] = exc - _exc_context["future"] = t - Loop.call_exception_handler(_exc_context) - - -# Create a new task from a coroutine and run it until it finishes -def run(coro): - return run_until_complete(create_task(coro)) - - -################################################################################ -# Event loop wrapper - - -async def _stopper(): - pass - - -_stop_task = None - - -class Loop: - _exc_handler = None - - def create_task(coro): - return create_task(coro) - - def run_forever(): - global _stop_task - _stop_task = Task(_stopper(), globals()) - run_until_complete(_stop_task) - # TODO should keep running until .stop() is called, even if there're no tasks left - - def run_until_complete(aw): - return run_until_complete(_promote_to_task(aw)) - - def stop(): - global _stop_task - if _stop_task is not None: - _task_queue.push(_stop_task) - # If stop() is called again, do nothing - _stop_task = None - - def close(): - pass - - def set_exception_handler(handler): - Loop._exc_handler = handler - - def get_exception_handler(): - return Loop._exc_handler - - def default_exception_handler(loop, context): - print(context["message"]) - print("future:", context["future"], "coro=", context["future"].coro) - sys.print_exception(context["exception"]) - - def call_exception_handler(context): - (Loop._exc_handler or Loop.default_exception_handler)(Loop, context) - - -# The runq_len and waitq_len arguments are for legacy uasyncio compatibility -def get_event_loop(runq_len=0, waitq_len=0): - return Loop - - -def current_task(): - return cur_task - - -def new_event_loop(): - global _task_queue, _io_queue - # TaskQueue of Task instances - _task_queue = TaskQueue() - # Task queue and poller for stream IO - _io_queue = IOQueue() - return Loop - - -# Initialise default event loop -new_event_loop() diff --git a/extmod/uasyncio/event.py b/extmod/uasyncio/event.py deleted file mode 100644 index 8a90534590..0000000000 --- a/extmod/uasyncio/event.py +++ /dev/null @@ -1,66 +0,0 @@ -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George - -from . import core - - -# Event class for primitive events that can be waited on, set, and cleared -class Event: - def __init__(self): - self.state = False # False=unset; True=set - self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event - - def is_set(self): - return self.state - - def set(self): - # Event becomes set, schedule any tasks waiting on it - # Note: This must not be called from anything except the thread running - # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). - while self.waiting.peek(): - core._task_queue.push(self.waiting.pop()) - self.state = True - - def clear(self): - self.state = False - - # async - def wait(self): - if not self.state: - # Event not set, put the calling task on the event's waiting queue - self.waiting.push(core.cur_task) - # Set calling task's data to the event's queue so it can be removed if needed - core.cur_task.data = self.waiting - yield - return True - - -# MicroPython-extension: This can be set from outside the asyncio event loop, -# such as other threads, IRQs or scheduler context. Implementation is a stream -# that asyncio will poll until a flag is set. -# Note: Unlike Event, this is self-clearing after a wait(). -try: - import io - - class ThreadSafeFlag(io.IOBase): - def __init__(self): - self.state = 0 - - def ioctl(self, req, flags): - if req == 3: # MP_STREAM_POLL - return self.state * flags - return None - - def set(self): - self.state = 1 - - def clear(self): - self.state = 0 - - async def wait(self): - if not self.state: - yield core._io_queue.queue_read(self) - self.state = 0 - -except ImportError: - pass diff --git a/extmod/uasyncio/funcs.py b/extmod/uasyncio/funcs.py deleted file mode 100644 index dc52ad3958..0000000000 --- a/extmod/uasyncio/funcs.py +++ /dev/null @@ -1,130 +0,0 @@ -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2022 Damien P. George - -from . import core - - -async def _run(waiter, aw): - try: - result = await aw - status = True - except BaseException as er: - result = None - status = er - if waiter.data is None: - # The waiter is still waiting, cancel it. - if waiter.cancel(): - # Waiter was cancelled by us, change its CancelledError to an instance of - # CancelledError that contains the status and result of waiting on aw. - # If the wait_for task subsequently gets cancelled externally then this - # instance will be reset to a CancelledError instance without arguments. - waiter.data = core.CancelledError(status, result) - - -async def wait_for(aw, timeout, sleep=core.sleep): - aw = core._promote_to_task(aw) - if timeout is None: - return await aw - - # Run aw in a separate runner task that manages its exceptions. - runner_task = core.create_task(_run(core.cur_task, aw)) - - try: - # Wait for the timeout to elapse. - await sleep(timeout) - except core.CancelledError as er: - status = er.value - if status is None: - # This wait_for was cancelled externally, so cancel aw and re-raise. - runner_task.cancel() - raise er - elif status is True: - # aw completed successfully and cancelled the sleep, so return aw's result. - return er.args[1] - else: - # aw raised an exception, propagate it out to the caller. - raise status - - # The sleep finished before aw, so cancel aw and raise TimeoutError. - runner_task.cancel() - await runner_task - raise core.TimeoutError - - -def wait_for_ms(aw, timeout): - return wait_for(aw, timeout, core.sleep_ms) - - -class _Remove: - @staticmethod - def remove(t): - pass - - -# async -def gather(*aws, return_exceptions=False): - if not aws: - return [] - - def done(t, er): - # Sub-task "t" has finished, with exception "er". - nonlocal state - if gather_task.data is not _Remove: - # The main gather task has already been scheduled, so do nothing. - # This happens if another sub-task already raised an exception and - # woke the main gather task (via this done function), or if the main - # gather task was cancelled externally. - return - elif not return_exceptions and not isinstance(er, StopIteration): - # A sub-task raised an exception, indicate that to the gather task. - state = er - else: - state -= 1 - if state: - # Still some sub-tasks running. - return - # Gather waiting is done, schedule the main gather task. - core._task_queue.push(gather_task) - - ts = [core._promote_to_task(aw) for aw in aws] - for i in range(len(ts)): - if ts[i].state is not True: - # Task is not running, gather not currently supported for this case. - raise RuntimeError("can't gather") - # Register the callback to call when the task is done. - ts[i].state = done - - # Set the state for execution of the gather. - gather_task = core.cur_task - state = len(ts) - cancel_all = False - - # Wait for the a sub-task to need attention. - gather_task.data = _Remove - try: - yield - except core.CancelledError as er: - cancel_all = True - state = er - - # Clean up tasks. - for i in range(len(ts)): - if ts[i].state is done: - # Sub-task is still running, deregister the callback and cancel if needed. - ts[i].state = True - if cancel_all: - ts[i].cancel() - elif isinstance(ts[i].data, StopIteration): - # Sub-task ran to completion, get its return value. - ts[i] = ts[i].data.value - else: - # Sub-task had an exception with return_exceptions==True, so get its exception. - ts[i] = ts[i].data - - # Either this gather was cancelled, or one of the sub-tasks raised an exception with - # return_exceptions==False, so reraise the exception here. - if state: - raise state - - # Return the list of return values of each sub-task. - return ts diff --git a/extmod/uasyncio/lock.py b/extmod/uasyncio/lock.py deleted file mode 100644 index 85a9437b4f..0000000000 --- a/extmod/uasyncio/lock.py +++ /dev/null @@ -1,55 +0,0 @@ -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George - -from . import core - - -# Lock class for primitive mutex capability -class Lock: - def __init__(self): - # The state can take the following values: - # - 0: unlocked - # - 1: locked - # - <Task>: unlocked but this task has been scheduled to acquire the lock next - self.state = 0 - # Queue of Tasks waiting to acquire this Lock - self.waiting = core.TaskQueue() - - def locked(self): - return self.state == 1 - - def release(self): - if self.state != 1: - raise RuntimeError("Lock not acquired") - if self.waiting.peek(): - # Task(s) waiting on lock, schedule next Task - self.state = self.waiting.pop() - core._task_queue.push(self.state) - else: - # No Task waiting so unlock - self.state = 0 - - # async - def acquire(self): - if self.state != 0: - # Lock unavailable, put the calling Task on the waiting queue - self.waiting.push(core.cur_task) - # Set calling task's data to the lock's queue so it can be removed if needed - core.cur_task.data = self.waiting - try: - yield - except core.CancelledError as er: - if self.state == core.cur_task: - # Cancelled while pending on resume, schedule next waiting Task - self.state = 1 - self.release() - raise er - # Lock available, set it as locked - self.state = 1 - return True - - async def __aenter__(self): - return await self.acquire() - - async def __aexit__(self, exc_type, exc, tb): - return self.release() diff --git a/extmod/uasyncio/manifest.py b/extmod/uasyncio/manifest.py deleted file mode 100644 index d425a467b3..0000000000 --- a/extmod/uasyncio/manifest.py +++ /dev/null @@ -1,15 +0,0 @@ -# This list of package files doesn't include task.py because that's provided -# by the C module. -package( - "uasyncio", - ( - "__init__.py", - "core.py", - "event.py", - "funcs.py", - "lock.py", - "stream.py", - ), - base_path="..", - opt=3, -) diff --git a/extmod/uasyncio/stream.py b/extmod/uasyncio/stream.py deleted file mode 100644 index ac297cce04..0000000000 --- a/extmod/uasyncio/stream.py +++ /dev/null @@ -1,189 +0,0 @@ -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George - -from . import core - - -class Stream: - def __init__(self, s, e={}): - self.s = s - self.e = e - self.out_buf = b"" - - def get_extra_info(self, v): - return self.e[v] - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc, tb): - await self.close() - - def close(self): - pass - - async def wait_closed(self): - # TODO yield? - self.s.close() - - # async - def read(self, n=-1): - r = b"" - while True: - yield core._io_queue.queue_read(self.s) - r2 = self.s.read(n) - if r2 is not None: - if n >= 0: - return r2 - if not len(r2): - return r - r += r2 - - # async - def readinto(self, buf): - yield core._io_queue.queue_read(self.s) - return self.s.readinto(buf) - - # async - def readexactly(self, n): - r = b"" - while n: - yield core._io_queue.queue_read(self.s) - r2 = self.s.read(n) - if r2 is not None: - if not len(r2): - raise EOFError - r += r2 - n -= len(r2) - return r - - # async - def readline(self): - l = b"" - while True: - yield core._io_queue.queue_read(self.s) - l2 = self.s.readline() # may do multiple reads but won't block - l += l2 - if not l2 or l[-1] == 10: # \n (check l in case l2 is str) - return l - - def write(self, buf): - if not self.out_buf: - # Try to write immediately to the underlying stream. - ret = self.s.write(buf) - if ret == len(buf): - return - if ret is not None: - buf = buf[ret:] - self.out_buf += buf - - # async - def drain(self): - if not self.out_buf: - # Drain must always yield, so a tight loop of write+drain can't block the scheduler. - return (yield from core.sleep_ms(0)) - mv = memoryview(self.out_buf) - off = 0 - while off < len(mv): - yield core._io_queue.queue_write(self.s) - ret = self.s.write(mv[off:]) - if ret is not None: - off += ret - self.out_buf = b"" - - -# Stream can be used for both reading and writing to save code size -StreamReader = Stream -StreamWriter = Stream - - -# Create a TCP stream connection to a remote host -# -# async -def open_connection(host, port): - from errno import EINPROGRESS - import socket - - ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! - s = socket.socket(ai[0], ai[1], ai[2]) - s.setblocking(False) - ss = Stream(s) - try: - s.connect(ai[-1]) - except OSError as er: - if er.errno != EINPROGRESS: - raise er - yield core._io_queue.queue_write(s) - return ss, ss - - -# Class representing a TCP stream server, can be closed and used in "async with" -class Server: - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc, tb): - self.close() - await self.wait_closed() - - def close(self): - self.task.cancel() - - async def wait_closed(self): - await self.task - - async def _serve(self, s, cb): - # Accept incoming connections - while True: - try: - yield core._io_queue.queue_read(s) - except core.CancelledError: - # Shutdown server - s.close() - return - try: - s2, addr = s.accept() - except: - # Ignore a failed accept - continue - s2.setblocking(False) - s2s = Stream(s2, {"peername": addr}) - core.create_task(cb(s2s, s2s)) - - -# Helper function to start a TCP stream server, running as a new task -# TODO could use an accept-callback on socket read activity instead of creating a task -async def start_server(cb, host, port, backlog=5): - import socket - - # Create and bind server socket. - host = socket.getaddrinfo(host, port)[0] # TODO this is blocking! - s = socket.socket() - s.setblocking(False) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(host[-1]) - s.listen(backlog) - - # Create and return server object and task. - srv = Server() - srv.task = core.create_task(srv._serve(s, cb)) - return srv - - -################################################################################ -# Legacy uasyncio compatibility - - -async def stream_awrite(self, buf, off=0, sz=-1): - if off != 0 or sz != -1: - buf = memoryview(buf) - if sz == -1: - sz = len(buf) - buf = buf[off : off + sz] - self.write(buf) - await self.drain() - - -Stream.aclose = Stream.wait_closed -Stream.awrite = stream_awrite -Stream.awritestr = stream_awrite # TODO explicitly convert to bytes? diff --git a/extmod/uasyncio/task.py b/extmod/uasyncio/task.py deleted file mode 100644 index 4ead2a1308..0000000000 --- a/extmod/uasyncio/task.py +++ /dev/null @@ -1,177 +0,0 @@ -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George - -# This file contains the core TaskQueue based on a pairing heap, and the core Task class. -# They can optionally be replaced by C implementations. - -from . import core - - -# pairing-heap meld of 2 heaps; O(1) -def ph_meld(h1, h2): - if h1 is None: - return h2 - if h2 is None: - return h1 - lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0 - if lt: - if h1.ph_child is None: - h1.ph_child = h2 - else: - h1.ph_child_last.ph_next = h2 - h1.ph_child_last = h2 - h2.ph_next = None - h2.ph_rightmost_parent = h1 - return h1 - else: - h1.ph_next = h2.ph_child - h2.ph_child = h1 - if h1.ph_next is None: - h2.ph_child_last = h1 - h1.ph_rightmost_parent = h2 - return h2 - - -# pairing-heap pairing operation; amortised O(log N) -def ph_pairing(child): - heap = None - while child is not None: - n1 = child - child = child.ph_next - n1.ph_next = None - if child is not None: - n2 = child - child = child.ph_next - n2.ph_next = None - n1 = ph_meld(n1, n2) - heap = ph_meld(heap, n1) - return heap - - -# pairing-heap delete of a node; stable, amortised O(log N) -def ph_delete(heap, node): - if node is heap: - child = heap.ph_child - node.ph_child = None - return ph_pairing(child) - # Find parent of node - parent = node - while parent.ph_next is not None: - parent = parent.ph_next - parent = parent.ph_rightmost_parent - # Replace node with pairing of its children - if node is parent.ph_child and node.ph_child is None: - parent.ph_child = node.ph_next - node.ph_next = None - return heap - elif node is parent.ph_child: - child = node.ph_child - next = node.ph_next - node.ph_child = None - node.ph_next = None - node = ph_pairing(child) - parent.ph_child = node - else: - n = parent.ph_child - while node is not n.ph_next: - n = n.ph_next - child = node.ph_child - next = node.ph_next - node.ph_child = None - node.ph_next = None - node = ph_pairing(child) - if node is None: - node = n - else: - n.ph_next = node - node.ph_next = next - if next is None: - node.ph_rightmost_parent = parent - parent.ph_child_last = node - return heap - - -# TaskQueue class based on the above pairing-heap functions. -class TaskQueue: - def __init__(self): - self.heap = None - - def peek(self): - return self.heap - - def push(self, v, key=None): - assert v.ph_child is None - assert v.ph_next is None - v.data = None - v.ph_key = key if key is not None else core.ticks() - self.heap = ph_meld(v, self.heap) - - def pop(self): - v = self.heap - assert v.ph_next is None - self.heap = ph_pairing(v.ph_child) - v.ph_child = None - return v - - def remove(self, v): - self.heap = ph_delete(self.heap, v) - - -# Task class representing a coroutine, can be waited on and cancelled. -class Task: - def __init__(self, coro, globals=None): - self.coro = coro # Coroutine of this Task - self.data = None # General data for queue it is waiting on - self.state = True # None, False, True, a callable, or a TaskQueue instance - self.ph_key = 0 # Pairing heap - self.ph_child = None # Paring heap - self.ph_child_last = None # Paring heap - self.ph_next = None # Paring heap - self.ph_rightmost_parent = None # Paring heap - - def __iter__(self): - if not self.state: - # Task finished, signal that is has been await'ed on. - self.state = False - elif self.state is True: - # Allocated head of linked list of Tasks waiting on completion of this task. - self.state = TaskQueue() - elif type(self.state) is not TaskQueue: - # Task has state used for another purpose, so can't also wait on it. - raise RuntimeError("can't wait") - return self - - def __next__(self): - if not self.state: - # Task finished, raise return value to caller so it can continue. - raise self.data - else: - # Put calling task on waiting queue. - self.state.push(core.cur_task) - # Set calling task's data to this task that it waits on, to double-link it. - core.cur_task.data = self - - def done(self): - return not self.state - - def cancel(self): - # Check if task is already finished. - if not self.state: - return False - # Can't cancel self (not supported yet). - if self is core.cur_task: - raise RuntimeError("can't cancel self") - # If Task waits on another task then forward the cancel to the one it's waiting on. - while isinstance(self.data, Task): - self = self.data - # Reschedule Task as a cancelled task. - if hasattr(self.data, "remove"): - # Not on the main running queue, remove the task from the queue it's on. - self.data.remove(self) - core._task_queue.push(self) - elif core.ticks_diff(self.ph_key, core.ticks()) > 0: - # On the main running queue but scheduled in the future, so bring it forward to now. - core._task_queue.remove(self) - core._task_queue.push(self) - self.data = core.CancelledError - return True |