summaryrefslogtreecommitdiffstatshomepage
path: root/extmod/uasyncio
diff options
context:
space:
mode:
authorJim Mussared <jim.mussared@gmail.com>2023-06-08 15:51:50 +1000
committerDamien George <damien@micropython.org>2023-06-19 17:33:03 +1000
commit2fbc08c462e247e7f78460783c9a07c76c5b762e (patch)
treecfda4eb3b04a6cc281fb0ea668a15b6216353c16 /extmod/uasyncio
parented962f1f233eb74edf2cee83dc488d3cac5e02ee (diff)
downloadmicropython-2fbc08c462e247e7f78460783c9a07c76c5b762e.tar.gz
micropython-2fbc08c462e247e7f78460783c9a07c76c5b762e.zip
extmod/asyncio: Rename uasyncio to asyncio.
The asyncio module now has much better CPython compatibility and deserves to be just called "asyncio". This will avoid people having to write `from uasyncio import asyncio`. Renames all files, and updates port manifests to use the new path. Also renames the built-in _uasyncio to _asyncio. This work was funded through GitHub Sponsors. Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
Diffstat (limited to 'extmod/uasyncio')
-rw-r--r--extmod/uasyncio/__init__.py31
-rw-r--r--extmod/uasyncio/core.py302
-rw-r--r--extmod/uasyncio/event.py66
-rw-r--r--extmod/uasyncio/funcs.py130
-rw-r--r--extmod/uasyncio/lock.py55
-rw-r--r--extmod/uasyncio/manifest.py15
-rw-r--r--extmod/uasyncio/stream.py189
-rw-r--r--extmod/uasyncio/task.py177
8 files changed, 0 insertions, 965 deletions
diff --git a/extmod/uasyncio/__init__.py b/extmod/uasyncio/__init__.py
deleted file mode 100644
index 373de52af4..0000000000
--- a/extmod/uasyncio/__init__.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019 Damien P. George
-
-from .core import *
-
-__version__ = (3, 0, 0)
-
-_attrs = {
- "wait_for": "funcs",
- "wait_for_ms": "funcs",
- "gather": "funcs",
- "Event": "event",
- "ThreadSafeFlag": "event",
- "Lock": "lock",
- "open_connection": "stream",
- "start_server": "stream",
- "StreamReader": "stream",
- "StreamWriter": "stream",
-}
-
-
-# Lazy loader, effectively does:
-# global attr
-# from .mod import attr
-def __getattr__(attr):
- mod = _attrs.get(attr, None)
- if mod is None:
- raise AttributeError(attr)
- value = getattr(__import__(mod, None, None, True, 1), attr)
- globals()[attr] = value
- return value
diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py
deleted file mode 100644
index f191d202e2..0000000000
--- a/extmod/uasyncio/core.py
+++ /dev/null
@@ -1,302 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019 Damien P. George
-
-from time import ticks_ms as ticks, ticks_diff, ticks_add
-import sys, select
-
-# Import TaskQueue and Task, preferring built-in C code over Python code
-try:
- from _uasyncio import TaskQueue, Task
-except:
- from .task import TaskQueue, Task
-
-
-################################################################################
-# Exceptions
-
-
-class CancelledError(BaseException):
- pass
-
-
-class TimeoutError(Exception):
- pass
-
-
-# Used when calling Loop.call_exception_handler
-_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
-
-
-################################################################################
-# Sleep functions
-
-
-# "Yield" once, then raise StopIteration
-class SingletonGenerator:
- def __init__(self):
- self.state = None
- self.exc = StopIteration()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- if self.state is not None:
- _task_queue.push(cur_task, self.state)
- self.state = None
- return None
- else:
- self.exc.__traceback__ = None
- raise self.exc
-
-
-# Pause task execution for the given time (integer in milliseconds, uPy extension)
-# Use a SingletonGenerator to do it without allocating on the heap
-def sleep_ms(t, sgen=SingletonGenerator()):
- assert sgen.state is None
- sgen.state = ticks_add(ticks(), max(0, t))
- return sgen
-
-
-# Pause task execution for the given time (in seconds)
-def sleep(t):
- return sleep_ms(int(t * 1000))
-
-
-################################################################################
-# Queue and poller for stream IO
-
-
-class IOQueue:
- def __init__(self):
- self.poller = select.poll()
- self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
-
- def _enqueue(self, s, idx):
- if id(s) not in self.map:
- entry = [None, None, s]
- entry[idx] = cur_task
- self.map[id(s)] = entry
- self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
- else:
- sm = self.map[id(s)]
- assert sm[idx] is None
- assert sm[1 - idx] is not None
- sm[idx] = cur_task
- self.poller.modify(s, select.POLLIN | select.POLLOUT)
- # Link task to this IOQueue so it can be removed if needed
- cur_task.data = self
-
- def _dequeue(self, s):
- del self.map[id(s)]
- self.poller.unregister(s)
-
- def queue_read(self, s):
- self._enqueue(s, 0)
-
- def queue_write(self, s):
- self._enqueue(s, 1)
-
- def remove(self, task):
- while True:
- del_s = None
- for k in self.map: # Iterate without allocating on the heap
- q0, q1, s = self.map[k]
- if q0 is task or q1 is task:
- del_s = s
- break
- if del_s is not None:
- self._dequeue(s)
- else:
- break
-
- def wait_io_event(self, dt):
- for s, ev in self.poller.ipoll(dt):
- sm = self.map[id(s)]
- # print('poll', s, sm, ev)
- if ev & ~select.POLLOUT and sm[0] is not None:
- # POLLIN or error
- _task_queue.push(sm[0])
- sm[0] = None
- if ev & ~select.POLLIN and sm[1] is not None:
- # POLLOUT or error
- _task_queue.push(sm[1])
- sm[1] = None
- if sm[0] is None and sm[1] is None:
- self._dequeue(s)
- elif sm[0] is None:
- self.poller.modify(s, select.POLLOUT)
- else:
- self.poller.modify(s, select.POLLIN)
-
-
-################################################################################
-# Main run loop
-
-
-# Ensure the awaitable is a task
-def _promote_to_task(aw):
- return aw if isinstance(aw, Task) else create_task(aw)
-
-
-# Create and schedule a new task from a coroutine
-def create_task(coro):
- if not hasattr(coro, "send"):
- raise TypeError("coroutine expected")
- t = Task(coro, globals())
- _task_queue.push(t)
- return t
-
-
-# Keep scheduling tasks until there are none left to schedule
-def run_until_complete(main_task=None):
- global cur_task
- excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
- excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
- while True:
- # Wait until the head of _task_queue is ready to run
- dt = 1
- while dt > 0:
- dt = -1
- t = _task_queue.peek()
- if t:
- # A task waiting on _task_queue; "ph_key" is time to schedule task at
- dt = max(0, ticks_diff(t.ph_key, ticks()))
- elif not _io_queue.map:
- # No tasks can be woken so finished running
- return
- # print('(poll {})'.format(dt), len(_io_queue.map))
- _io_queue.wait_io_event(dt)
-
- # Get next task to run and continue it
- t = _task_queue.pop()
- cur_task = t
- try:
- # Continue running the coroutine, it's responsible for rescheduling itself
- exc = t.data
- if not exc:
- t.coro.send(None)
- else:
- # If the task is finished and on the run queue and gets here, then it
- # had an exception and was not await'ed on. Throwing into it now will
- # raise StopIteration and the code below will catch this and run the
- # call_exception_handler function.
- t.data = None
- t.coro.throw(exc)
- except excs_all as er:
- # Check the task is not on any event queue
- assert t.data is None
- # This task is done, check if it's the main task and then loop should stop
- if t is main_task:
- if isinstance(er, StopIteration):
- return er.value
- raise er
- if t.state:
- # Task was running but is now finished.
- waiting = False
- if t.state is True:
- # "None" indicates that the task is complete and not await'ed on (yet).
- t.state = None
- elif callable(t.state):
- # The task has a callback registered to be called on completion.
- t.state(t, er)
- t.state = False
- waiting = True
- else:
- # Schedule any other tasks waiting on the completion of this task.
- while t.state.peek():
- _task_queue.push(t.state.pop())
- waiting = True
- # "False" indicates that the task is complete and has been await'ed on.
- t.state = False
- if not waiting and not isinstance(er, excs_stop):
- # An exception ended this detached task, so queue it for later
- # execution to handle the uncaught exception if no other task retrieves
- # the exception in the meantime (this is handled by Task.throw).
- _task_queue.push(t)
- # Save return value of coro to pass up to caller.
- t.data = er
- elif t.state is None:
- # Task is already finished and nothing await'ed on the task,
- # so call the exception handler.
- _exc_context["exception"] = exc
- _exc_context["future"] = t
- Loop.call_exception_handler(_exc_context)
-
-
-# Create a new task from a coroutine and run it until it finishes
-def run(coro):
- return run_until_complete(create_task(coro))
-
-
-################################################################################
-# Event loop wrapper
-
-
-async def _stopper():
- pass
-
-
-_stop_task = None
-
-
-class Loop:
- _exc_handler = None
-
- def create_task(coro):
- return create_task(coro)
-
- def run_forever():
- global _stop_task
- _stop_task = Task(_stopper(), globals())
- run_until_complete(_stop_task)
- # TODO should keep running until .stop() is called, even if there're no tasks left
-
- def run_until_complete(aw):
- return run_until_complete(_promote_to_task(aw))
-
- def stop():
- global _stop_task
- if _stop_task is not None:
- _task_queue.push(_stop_task)
- # If stop() is called again, do nothing
- _stop_task = None
-
- def close():
- pass
-
- def set_exception_handler(handler):
- Loop._exc_handler = handler
-
- def get_exception_handler():
- return Loop._exc_handler
-
- def default_exception_handler(loop, context):
- print(context["message"])
- print("future:", context["future"], "coro=", context["future"].coro)
- sys.print_exception(context["exception"])
-
- def call_exception_handler(context):
- (Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
-
-
-# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
-def get_event_loop(runq_len=0, waitq_len=0):
- return Loop
-
-
-def current_task():
- return cur_task
-
-
-def new_event_loop():
- global _task_queue, _io_queue
- # TaskQueue of Task instances
- _task_queue = TaskQueue()
- # Task queue and poller for stream IO
- _io_queue = IOQueue()
- return Loop
-
-
-# Initialise default event loop
-new_event_loop()
diff --git a/extmod/uasyncio/event.py b/extmod/uasyncio/event.py
deleted file mode 100644
index 8a90534590..0000000000
--- a/extmod/uasyncio/event.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019-2020 Damien P. George
-
-from . import core
-
-
-# Event class for primitive events that can be waited on, set, and cleared
-class Event:
- def __init__(self):
- self.state = False # False=unset; True=set
- self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
-
- def is_set(self):
- return self.state
-
- def set(self):
- # Event becomes set, schedule any tasks waiting on it
- # Note: This must not be called from anything except the thread running
- # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
- while self.waiting.peek():
- core._task_queue.push(self.waiting.pop())
- self.state = True
-
- def clear(self):
- self.state = False
-
- # async
- def wait(self):
- if not self.state:
- # Event not set, put the calling task on the event's waiting queue
- self.waiting.push(core.cur_task)
- # Set calling task's data to the event's queue so it can be removed if needed
- core.cur_task.data = self.waiting
- yield
- return True
-
-
-# MicroPython-extension: This can be set from outside the asyncio event loop,
-# such as other threads, IRQs or scheduler context. Implementation is a stream
-# that asyncio will poll until a flag is set.
-# Note: Unlike Event, this is self-clearing after a wait().
-try:
- import io
-
- class ThreadSafeFlag(io.IOBase):
- def __init__(self):
- self.state = 0
-
- def ioctl(self, req, flags):
- if req == 3: # MP_STREAM_POLL
- return self.state * flags
- return None
-
- def set(self):
- self.state = 1
-
- def clear(self):
- self.state = 0
-
- async def wait(self):
- if not self.state:
- yield core._io_queue.queue_read(self)
- self.state = 0
-
-except ImportError:
- pass
diff --git a/extmod/uasyncio/funcs.py b/extmod/uasyncio/funcs.py
deleted file mode 100644
index dc52ad3958..0000000000
--- a/extmod/uasyncio/funcs.py
+++ /dev/null
@@ -1,130 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019-2022 Damien P. George
-
-from . import core
-
-
-async def _run(waiter, aw):
- try:
- result = await aw
- status = True
- except BaseException as er:
- result = None
- status = er
- if waiter.data is None:
- # The waiter is still waiting, cancel it.
- if waiter.cancel():
- # Waiter was cancelled by us, change its CancelledError to an instance of
- # CancelledError that contains the status and result of waiting on aw.
- # If the wait_for task subsequently gets cancelled externally then this
- # instance will be reset to a CancelledError instance without arguments.
- waiter.data = core.CancelledError(status, result)
-
-
-async def wait_for(aw, timeout, sleep=core.sleep):
- aw = core._promote_to_task(aw)
- if timeout is None:
- return await aw
-
- # Run aw in a separate runner task that manages its exceptions.
- runner_task = core.create_task(_run(core.cur_task, aw))
-
- try:
- # Wait for the timeout to elapse.
- await sleep(timeout)
- except core.CancelledError as er:
- status = er.value
- if status is None:
- # This wait_for was cancelled externally, so cancel aw and re-raise.
- runner_task.cancel()
- raise er
- elif status is True:
- # aw completed successfully and cancelled the sleep, so return aw's result.
- return er.args[1]
- else:
- # aw raised an exception, propagate it out to the caller.
- raise status
-
- # The sleep finished before aw, so cancel aw and raise TimeoutError.
- runner_task.cancel()
- await runner_task
- raise core.TimeoutError
-
-
-def wait_for_ms(aw, timeout):
- return wait_for(aw, timeout, core.sleep_ms)
-
-
-class _Remove:
- @staticmethod
- def remove(t):
- pass
-
-
-# async
-def gather(*aws, return_exceptions=False):
- if not aws:
- return []
-
- def done(t, er):
- # Sub-task "t" has finished, with exception "er".
- nonlocal state
- if gather_task.data is not _Remove:
- # The main gather task has already been scheduled, so do nothing.
- # This happens if another sub-task already raised an exception and
- # woke the main gather task (via this done function), or if the main
- # gather task was cancelled externally.
- return
- elif not return_exceptions and not isinstance(er, StopIteration):
- # A sub-task raised an exception, indicate that to the gather task.
- state = er
- else:
- state -= 1
- if state:
- # Still some sub-tasks running.
- return
- # Gather waiting is done, schedule the main gather task.
- core._task_queue.push(gather_task)
-
- ts = [core._promote_to_task(aw) for aw in aws]
- for i in range(len(ts)):
- if ts[i].state is not True:
- # Task is not running, gather not currently supported for this case.
- raise RuntimeError("can't gather")
- # Register the callback to call when the task is done.
- ts[i].state = done
-
- # Set the state for execution of the gather.
- gather_task = core.cur_task
- state = len(ts)
- cancel_all = False
-
- # Wait for the a sub-task to need attention.
- gather_task.data = _Remove
- try:
- yield
- except core.CancelledError as er:
- cancel_all = True
- state = er
-
- # Clean up tasks.
- for i in range(len(ts)):
- if ts[i].state is done:
- # Sub-task is still running, deregister the callback and cancel if needed.
- ts[i].state = True
- if cancel_all:
- ts[i].cancel()
- elif isinstance(ts[i].data, StopIteration):
- # Sub-task ran to completion, get its return value.
- ts[i] = ts[i].data.value
- else:
- # Sub-task had an exception with return_exceptions==True, so get its exception.
- ts[i] = ts[i].data
-
- # Either this gather was cancelled, or one of the sub-tasks raised an exception with
- # return_exceptions==False, so reraise the exception here.
- if state:
- raise state
-
- # Return the list of return values of each sub-task.
- return ts
diff --git a/extmod/uasyncio/lock.py b/extmod/uasyncio/lock.py
deleted file mode 100644
index 85a9437b4f..0000000000
--- a/extmod/uasyncio/lock.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019-2020 Damien P. George
-
-from . import core
-
-
-# Lock class for primitive mutex capability
-class Lock:
- def __init__(self):
- # The state can take the following values:
- # - 0: unlocked
- # - 1: locked
- # - <Task>: unlocked but this task has been scheduled to acquire the lock next
- self.state = 0
- # Queue of Tasks waiting to acquire this Lock
- self.waiting = core.TaskQueue()
-
- def locked(self):
- return self.state == 1
-
- def release(self):
- if self.state != 1:
- raise RuntimeError("Lock not acquired")
- if self.waiting.peek():
- # Task(s) waiting on lock, schedule next Task
- self.state = self.waiting.pop()
- core._task_queue.push(self.state)
- else:
- # No Task waiting so unlock
- self.state = 0
-
- # async
- def acquire(self):
- if self.state != 0:
- # Lock unavailable, put the calling Task on the waiting queue
- self.waiting.push(core.cur_task)
- # Set calling task's data to the lock's queue so it can be removed if needed
- core.cur_task.data = self.waiting
- try:
- yield
- except core.CancelledError as er:
- if self.state == core.cur_task:
- # Cancelled while pending on resume, schedule next waiting Task
- self.state = 1
- self.release()
- raise er
- # Lock available, set it as locked
- self.state = 1
- return True
-
- async def __aenter__(self):
- return await self.acquire()
-
- async def __aexit__(self, exc_type, exc, tb):
- return self.release()
diff --git a/extmod/uasyncio/manifest.py b/extmod/uasyncio/manifest.py
deleted file mode 100644
index d425a467b3..0000000000
--- a/extmod/uasyncio/manifest.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# This list of package files doesn't include task.py because that's provided
-# by the C module.
-package(
- "uasyncio",
- (
- "__init__.py",
- "core.py",
- "event.py",
- "funcs.py",
- "lock.py",
- "stream.py",
- ),
- base_path="..",
- opt=3,
-)
diff --git a/extmod/uasyncio/stream.py b/extmod/uasyncio/stream.py
deleted file mode 100644
index ac297cce04..0000000000
--- a/extmod/uasyncio/stream.py
+++ /dev/null
@@ -1,189 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019-2020 Damien P. George
-
-from . import core
-
-
-class Stream:
- def __init__(self, s, e={}):
- self.s = s
- self.e = e
- self.out_buf = b""
-
- def get_extra_info(self, v):
- return self.e[v]
-
- async def __aenter__(self):
- return self
-
- async def __aexit__(self, exc_type, exc, tb):
- await self.close()
-
- def close(self):
- pass
-
- async def wait_closed(self):
- # TODO yield?
- self.s.close()
-
- # async
- def read(self, n=-1):
- r = b""
- while True:
- yield core._io_queue.queue_read(self.s)
- r2 = self.s.read(n)
- if r2 is not None:
- if n >= 0:
- return r2
- if not len(r2):
- return r
- r += r2
-
- # async
- def readinto(self, buf):
- yield core._io_queue.queue_read(self.s)
- return self.s.readinto(buf)
-
- # async
- def readexactly(self, n):
- r = b""
- while n:
- yield core._io_queue.queue_read(self.s)
- r2 = self.s.read(n)
- if r2 is not None:
- if not len(r2):
- raise EOFError
- r += r2
- n -= len(r2)
- return r
-
- # async
- def readline(self):
- l = b""
- while True:
- yield core._io_queue.queue_read(self.s)
- l2 = self.s.readline() # may do multiple reads but won't block
- l += l2
- if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
- return l
-
- def write(self, buf):
- if not self.out_buf:
- # Try to write immediately to the underlying stream.
- ret = self.s.write(buf)
- if ret == len(buf):
- return
- if ret is not None:
- buf = buf[ret:]
- self.out_buf += buf
-
- # async
- def drain(self):
- if not self.out_buf:
- # Drain must always yield, so a tight loop of write+drain can't block the scheduler.
- return (yield from core.sleep_ms(0))
- mv = memoryview(self.out_buf)
- off = 0
- while off < len(mv):
- yield core._io_queue.queue_write(self.s)
- ret = self.s.write(mv[off:])
- if ret is not None:
- off += ret
- self.out_buf = b""
-
-
-# Stream can be used for both reading and writing to save code size
-StreamReader = Stream
-StreamWriter = Stream
-
-
-# Create a TCP stream connection to a remote host
-#
-# async
-def open_connection(host, port):
- from errno import EINPROGRESS
- import socket
-
- ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
- s = socket.socket(ai[0], ai[1], ai[2])
- s.setblocking(False)
- ss = Stream(s)
- try:
- s.connect(ai[-1])
- except OSError as er:
- if er.errno != EINPROGRESS:
- raise er
- yield core._io_queue.queue_write(s)
- return ss, ss
-
-
-# Class representing a TCP stream server, can be closed and used in "async with"
-class Server:
- async def __aenter__(self):
- return self
-
- async def __aexit__(self, exc_type, exc, tb):
- self.close()
- await self.wait_closed()
-
- def close(self):
- self.task.cancel()
-
- async def wait_closed(self):
- await self.task
-
- async def _serve(self, s, cb):
- # Accept incoming connections
- while True:
- try:
- yield core._io_queue.queue_read(s)
- except core.CancelledError:
- # Shutdown server
- s.close()
- return
- try:
- s2, addr = s.accept()
- except:
- # Ignore a failed accept
- continue
- s2.setblocking(False)
- s2s = Stream(s2, {"peername": addr})
- core.create_task(cb(s2s, s2s))
-
-
-# Helper function to start a TCP stream server, running as a new task
-# TODO could use an accept-callback on socket read activity instead of creating a task
-async def start_server(cb, host, port, backlog=5):
- import socket
-
- # Create and bind server socket.
- host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
- s = socket.socket()
- s.setblocking(False)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.bind(host[-1])
- s.listen(backlog)
-
- # Create and return server object and task.
- srv = Server()
- srv.task = core.create_task(srv._serve(s, cb))
- return srv
-
-
-################################################################################
-# Legacy uasyncio compatibility
-
-
-async def stream_awrite(self, buf, off=0, sz=-1):
- if off != 0 or sz != -1:
- buf = memoryview(buf)
- if sz == -1:
- sz = len(buf)
- buf = buf[off : off + sz]
- self.write(buf)
- await self.drain()
-
-
-Stream.aclose = Stream.wait_closed
-Stream.awrite = stream_awrite
-Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?
diff --git a/extmod/uasyncio/task.py b/extmod/uasyncio/task.py
deleted file mode 100644
index 4ead2a1308..0000000000
--- a/extmod/uasyncio/task.py
+++ /dev/null
@@ -1,177 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019-2020 Damien P. George
-
-# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
-# They can optionally be replaced by C implementations.
-
-from . import core
-
-
-# pairing-heap meld of 2 heaps; O(1)
-def ph_meld(h1, h2):
- if h1 is None:
- return h2
- if h2 is None:
- return h1
- lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
- if lt:
- if h1.ph_child is None:
- h1.ph_child = h2
- else:
- h1.ph_child_last.ph_next = h2
- h1.ph_child_last = h2
- h2.ph_next = None
- h2.ph_rightmost_parent = h1
- return h1
- else:
- h1.ph_next = h2.ph_child
- h2.ph_child = h1
- if h1.ph_next is None:
- h2.ph_child_last = h1
- h1.ph_rightmost_parent = h2
- return h2
-
-
-# pairing-heap pairing operation; amortised O(log N)
-def ph_pairing(child):
- heap = None
- while child is not None:
- n1 = child
- child = child.ph_next
- n1.ph_next = None
- if child is not None:
- n2 = child
- child = child.ph_next
- n2.ph_next = None
- n1 = ph_meld(n1, n2)
- heap = ph_meld(heap, n1)
- return heap
-
-
-# pairing-heap delete of a node; stable, amortised O(log N)
-def ph_delete(heap, node):
- if node is heap:
- child = heap.ph_child
- node.ph_child = None
- return ph_pairing(child)
- # Find parent of node
- parent = node
- while parent.ph_next is not None:
- parent = parent.ph_next
- parent = parent.ph_rightmost_parent
- # Replace node with pairing of its children
- if node is parent.ph_child and node.ph_child is None:
- parent.ph_child = node.ph_next
- node.ph_next = None
- return heap
- elif node is parent.ph_child:
- child = node.ph_child
- next = node.ph_next
- node.ph_child = None
- node.ph_next = None
- node = ph_pairing(child)
- parent.ph_child = node
- else:
- n = parent.ph_child
- while node is not n.ph_next:
- n = n.ph_next
- child = node.ph_child
- next = node.ph_next
- node.ph_child = None
- node.ph_next = None
- node = ph_pairing(child)
- if node is None:
- node = n
- else:
- n.ph_next = node
- node.ph_next = next
- if next is None:
- node.ph_rightmost_parent = parent
- parent.ph_child_last = node
- return heap
-
-
-# TaskQueue class based on the above pairing-heap functions.
-class TaskQueue:
- def __init__(self):
- self.heap = None
-
- def peek(self):
- return self.heap
-
- def push(self, v, key=None):
- assert v.ph_child is None
- assert v.ph_next is None
- v.data = None
- v.ph_key = key if key is not None else core.ticks()
- self.heap = ph_meld(v, self.heap)
-
- def pop(self):
- v = self.heap
- assert v.ph_next is None
- self.heap = ph_pairing(v.ph_child)
- v.ph_child = None
- return v
-
- def remove(self, v):
- self.heap = ph_delete(self.heap, v)
-
-
-# Task class representing a coroutine, can be waited on and cancelled.
-class Task:
- def __init__(self, coro, globals=None):
- self.coro = coro # Coroutine of this Task
- self.data = None # General data for queue it is waiting on
- self.state = True # None, False, True, a callable, or a TaskQueue instance
- self.ph_key = 0 # Pairing heap
- self.ph_child = None # Paring heap
- self.ph_child_last = None # Paring heap
- self.ph_next = None # Paring heap
- self.ph_rightmost_parent = None # Paring heap
-
- def __iter__(self):
- if not self.state:
- # Task finished, signal that is has been await'ed on.
- self.state = False
- elif self.state is True:
- # Allocated head of linked list of Tasks waiting on completion of this task.
- self.state = TaskQueue()
- elif type(self.state) is not TaskQueue:
- # Task has state used for another purpose, so can't also wait on it.
- raise RuntimeError("can't wait")
- return self
-
- def __next__(self):
- if not self.state:
- # Task finished, raise return value to caller so it can continue.
- raise self.data
- else:
- # Put calling task on waiting queue.
- self.state.push(core.cur_task)
- # Set calling task's data to this task that it waits on, to double-link it.
- core.cur_task.data = self
-
- def done(self):
- return not self.state
-
- def cancel(self):
- # Check if task is already finished.
- if not self.state:
- return False
- # Can't cancel self (not supported yet).
- if self is core.cur_task:
- raise RuntimeError("can't cancel self")
- # If Task waits on another task then forward the cancel to the one it's waiting on.
- while isinstance(self.data, Task):
- self = self.data
- # Reschedule Task as a cancelled task.
- if hasattr(self.data, "remove"):
- # Not on the main running queue, remove the task from the queue it's on.
- self.data.remove(self)
- core._task_queue.push(self)
- elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
- # On the main running queue but scheduled in the future, so bring it forward to now.
- core._task_queue.remove(self)
- core._task_queue.push(self)
- self.data = core.CancelledError
- return True