summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--extmod/uasyncio/__init__.py26
-rw-r--r--extmod/uasyncio/core.py231
-rw-r--r--extmod/uasyncio/event.py31
-rw-r--r--extmod/uasyncio/funcs.py50
-rw-r--r--extmod/uasyncio/lock.py53
-rw-r--r--extmod/uasyncio/stream.py141
-rw-r--r--extmod/uasyncio/task.py168
7 files changed, 700 insertions, 0 deletions
diff --git a/extmod/uasyncio/__init__.py b/extmod/uasyncio/__init__.py
new file mode 100644
index 0000000000..6bff13883a
--- /dev/null
+++ b/extmod/uasyncio/__init__.py
@@ -0,0 +1,26 @@
+# MicroPython uasyncio module
+# MIT license; Copyright (c) 2019 Damien P. George
+
+from .core import *
+
+__version__ = (3, 0, 0)
+
+_attrs = {
+ "wait_for": "funcs",
+ "gather": "funcs",
+ "Event": "event",
+ "Lock": "lock",
+ "open_connection": "stream",
+ "start_server": "stream",
+}
+
+# Lazy loader, effectively does:
+# global attr
+# from .mod import attr
+def __getattr__(attr):
+ mod = _attrs.get(attr, None)
+ if mod is None:
+ raise AttributeError(attr)
+ value = getattr(__import__(mod, None, None, True, 1), attr)
+ globals()[attr] = value
+ return value
diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py
new file mode 100644
index 0000000000..049e2537f8
--- /dev/null
+++ b/extmod/uasyncio/core.py
@@ -0,0 +1,231 @@
+# MicroPython uasyncio module
+# MIT license; Copyright (c) 2019 Damien P. George
+
+from time import ticks_ms as ticks, ticks_diff, ticks_add
+import sys, select
+
+# Import TaskQueue and Task
+from .task import TaskQueue, Task
+
+
+################################################################################
+# Exceptions
+
+
+class CancelledError(BaseException):
+ pass
+
+
+class TimeoutError(Exception):
+ pass
+
+
+################################################################################
+# Sleep functions
+
+# "Yield" once, then raise StopIteration
+class SingletonGenerator:
+ def __init__(self):
+ self.state = None
+ self.exc = StopIteration()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ if self.state is not None:
+ _task_queue.push_sorted(cur_task, self.state)
+ self.state = None
+ return None
+ else:
+ self.exc.__traceback__ = None
+ raise self.exc
+
+
+# Pause task execution for the given time (integer in milliseconds, uPy extension)
+# Use a SingletonGenerator to do it without allocating on the heap
+def sleep_ms(t, sgen=SingletonGenerator()):
+ assert sgen.state is None
+ sgen.state = ticks_add(ticks(), t)
+ return sgen
+
+
+# Pause task execution for the given time (in seconds)
+def sleep(t):
+ return sleep_ms(int(t * 1000))
+
+
+################################################################################
+# Queue and poller for stream IO
+
+
+class IOQueue:
+ def __init__(self):
+ self.poller = select.poll()
+ self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
+
+ def _enqueue(self, s, idx):
+ if id(s) not in self.map:
+ entry = [None, None, s]
+ entry[idx] = cur_task
+ self.map[id(s)] = entry
+ self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
+ else:
+ sm = self.map[id(s)]
+ assert sm[idx] is None
+ assert sm[1 - idx] is not None
+ sm[idx] = cur_task
+ self.poller.modify(s, select.POLLIN | select.POLLOUT)
+ # Link task to this IOQueue so it can be removed if needed
+ cur_task.data = self
+
+ def _dequeue(self, s):
+ del self.map[id(s)]
+ self.poller.unregister(s)
+
+ def queue_read(self, s):
+ self._enqueue(s, 0)
+
+ def queue_write(self, s):
+ self._enqueue(s, 1)
+
+ def remove(self, task):
+ while True:
+ del_s = None
+ for k in self.map: # Iterate without allocating on the heap
+ q0, q1, s = self.map[k]
+ if q0 is task or q1 is task:
+ del_s = s
+ break
+ if del_s is not None:
+ self._dequeue(s)
+ else:
+ break
+
+ def wait_io_event(self, dt):
+ for s, ev in self.poller.ipoll(dt):
+ sm = self.map[id(s)]
+ # print('poll', s, sm, ev)
+ if ev & ~select.POLLOUT and sm[0] is not None:
+ # POLLIN or error
+ _task_queue.push_head(sm[0])
+ sm[0] = None
+ if ev & ~select.POLLIN and sm[1] is not None:
+ # POLLOUT or error
+ _task_queue.push_head(sm[1])
+ sm[1] = None
+ if sm[0] is None and sm[1] is None:
+ self._dequeue(s)
+ elif sm[0] is None:
+ self.poller.modify(s, select.POLLOUT)
+ else:
+ self.poller.modify(s, select.POLLIN)
+
+
+################################################################################
+# Main run loop
+
+# TaskQueue of Task instances
+_task_queue = TaskQueue()
+
+# Task queue and poller for stream IO
+_io_queue = IOQueue()
+
+
+# Ensure the awaitable is a task
+def _promote_to_task(aw):
+ return aw if isinstance(aw, Task) else create_task(aw)
+
+
+# Create and schedule a new task from a coroutine
+def create_task(coro):
+ if not hasattr(coro, "send"):
+ raise TypeError("coroutine expected")
+ t = Task(coro, globals())
+ _task_queue.push_head(t)
+ return t
+
+
+# Keep scheduling tasks until there are none left to schedule
+def run_until_complete(main_task=None):
+ global cur_task
+ excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
+ excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
+ while True:
+ # Wait until the head of _task_queue is ready to run
+ dt = 1
+ while dt > 0:
+ dt = -1
+ t = _task_queue.peek()
+ if t:
+ # A task waiting on _task_queue; "ph_key" is time to schedule task at
+ dt = max(0, ticks_diff(t.ph_key, ticks()))
+ elif not _io_queue.map:
+ # No tasks can be woken so finished running
+ return
+ # print('(poll {})'.format(dt), len(_io_queue.map))
+ _io_queue.wait_io_event(dt)
+
+ # Get next task to run and continue it
+ t = _task_queue.pop_head()
+ cur_task = t
+ try:
+ # Continue running the coroutine, it's responsible for rescheduling itself
+ exc = t.data
+ if not exc:
+ t.coro.send(None)
+ else:
+ t.data = None
+ t.coro.throw(exc)
+ except excs_all as er:
+ # Check the task is not on any event queue
+ assert t.data is None
+ # This task is done, check if it's the main task and then loop should stop
+ if t is main_task:
+ if isinstance(er, StopIteration):
+ return er.value
+ raise er
+ # Save return value of coro to pass up to caller
+ t.data = er
+ # Schedule any other tasks waiting on the completion of this task
+ waiting = False
+ if hasattr(t, "waiting"):
+ while t.waiting.peek():
+ _task_queue.push_head(t.waiting.pop_head())
+ waiting = True
+ t.waiting = None # Free waiting queue head
+ # Print out exception for detached tasks
+ if not waiting and not isinstance(er, excs_stop):
+ print("task raised exception:", t.coro)
+ sys.print_exception(er)
+ # Indicate task is done
+ t.coro = None
+
+
+# Create a new task from a coroutine and run it until it finishes
+def run(coro):
+ return run_until_complete(create_task(coro))
+
+
+################################################################################
+# Event loop wrapper
+
+
+class Loop:
+ def create_task(self, coro):
+ return create_task(coro)
+
+ def run_forever(self):
+ run_until_complete()
+ # TODO should keep running until .stop() is called, even if there're no tasks left
+
+ def run_until_complete(self, aw):
+ return run_until_complete(_promote_to_task(aw))
+
+ def close(self):
+ pass
+
+
+# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
+def get_event_loop(runq_len=0, waitq_len=0):
+ return Loop()
diff --git a/extmod/uasyncio/event.py b/extmod/uasyncio/event.py
new file mode 100644
index 0000000000..31cb00e055
--- /dev/null
+++ b/extmod/uasyncio/event.py
@@ -0,0 +1,31 @@
+# MicroPython uasyncio module
+# MIT license; Copyright (c) 2019-2020 Damien P. George
+
+from . import core
+
+# Event class for primitive events that can be waited on, set, and cleared
+class Event:
+ def __init__(self):
+ self.state = False # False=unset; True=set
+ self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
+
+ def is_set(self):
+ return self.state
+
+ def set(self):
+ # Event becomes set, schedule any tasks waiting on it
+ while self.waiting.peek():
+ core._task_queue.push_head(self.waiting.pop_head())
+ self.state = True
+
+ def clear(self):
+ self.state = False
+
+ async def wait(self):
+ if not self.state:
+ # Event not set, put the calling task on the event's waiting queue
+ self.waiting.push_head(core.cur_task)
+ # Set calling task's data to the event's queue so it can be removed if needed
+ core.cur_task.data = self.waiting
+ yield
+ return True
diff --git a/extmod/uasyncio/funcs.py b/extmod/uasyncio/funcs.py
new file mode 100644
index 0000000000..7a4bddf256
--- /dev/null
+++ b/extmod/uasyncio/funcs.py
@@ -0,0 +1,50 @@
+# MicroPython uasyncio module
+# MIT license; Copyright (c) 2019-2020 Damien P. George
+
+from . import core
+
+
+async def wait_for(aw, timeout):
+ aw = core._promote_to_task(aw)
+ if timeout is None:
+ return await aw
+
+ def cancel(aw, timeout):
+ await core.sleep(timeout)
+ aw.cancel()
+
+ cancel_task = core.create_task(cancel(aw, timeout))
+ try:
+ ret = await aw
+ except core.CancelledError:
+ # Ignore CancelledError from aw, it's probably due to timeout
+ pass
+ finally:
+ # Cancel the "cancel" task if it's still active (optimisation instead of cancel_task.cancel())
+ if cancel_task.coro is not None:
+ core._task_queue.remove(cancel_task)
+ if cancel_task.coro is None:
+ # Cancel task ran to completion, ie there was a timeout
+ raise core.TimeoutError
+ return ret
+
+
+async def gather(*aws, return_exceptions=False):
+ ts = [core._promote_to_task(aw) for aw in aws]
+ for i in range(len(ts)):
+ try:
+ # TODO handle cancel of gather itself
+ # if ts[i].coro:
+ # iter(ts[i]).waiting.push_head(cur_task)
+ # try:
+ # yield
+ # except CancelledError as er:
+ # # cancel all waiting tasks
+ # raise er
+ ts[i] = await ts[i]
+ except Exception as er:
+ if return_exceptions:
+ ts[i] = er
+ else:
+ raise er
+ return ts
diff --git a/extmod/uasyncio/lock.py b/extmod/uasyncio/lock.py
new file mode 100644
index 0000000000..18a55cb483
--- /dev/null
+++ b/extmod/uasyncio/lock.py
@@ -0,0 +1,53 @@
+# MicroPython uasyncio module
+# MIT license; Copyright (c) 2019-2020 Damien P. George
+
+from . import core
+
+# Lock class for primitive mutex capability
+class Lock:
+ def __init__(self):
+ # The state can take the following values:
+ # - 0: unlocked
+ # - 1: locked
+ # - <Task>: unlocked but this task has been scheduled to acquire the lock next
+ self.state = 0
+ # Queue of Tasks waiting to acquire this Lock
+ self.waiting = core.TaskQueue()
+
+ def locked(self):
+ return self.state == 1
+
+ def release(self):
+ if self.state != 1:
+ raise RuntimeError
+ if self.waiting.peek():
+ # Task(s) waiting on lock, schedule next Task
+ self.state = self.waiting.pop_head()
+ core._task_queue.push_head(self.state)
+ else:
+ # No Task waiting so unlock
+ self.state = 0
+
+ async def acquire(self):
+ if self.state != 0:
+ # Lock unavailable, put the calling Task on the waiting queue
+ self.waiting.push_head(core.cur_task)
+ # Set calling task's data to the lock's queue so it can be removed if needed
+ core.cur_task.data = self.waiting
+ try:
+ yield
+ except core.CancelledError as er:
+ if self.state == core.cur_task:
+ # Cancelled while pending on resume, schedule next waiting Task
+ self.state = 1
+ self.release()
+ raise er
+ # Lock available, set it as locked
+ self.state = 1
+ return True
+
+ async def __aenter__(self):
+ return await self.acquire()
+
+ async def __aexit__(self, exc_type, exc, tb):
+ return self.release()
diff --git a/extmod/uasyncio/stream.py b/extmod/uasyncio/stream.py
new file mode 100644
index 0000000000..7803ac4bfa
--- /dev/null
+++ b/extmod/uasyncio/stream.py
@@ -0,0 +1,141 @@
+# MicroPython uasyncio module
+# MIT license; Copyright (c) 2019-2020 Damien P. George
+
+from . import core
+
+
+class Stream:
+ def __init__(self, s, e={}):
+ self.s = s
+ self.e = e
+ self.out_buf = b""
+
+ def get_extra_info(self, v):
+ return self.e[v]
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb):
+ await self.close()
+
+ def close(self):
+ pass
+
+ async def wait_closed(self):
+ # TODO yield?
+ self.s.close()
+
+ async def read(self, n):
+ yield core._io_queue.queue_read(self.s)
+ return self.s.read(n)
+
+ async def readline(self):
+ l = b""
+ while True:
+ yield core._io_queue.queue_read(self.s)
+ l2 = self.s.readline() # may do multiple reads but won't block
+ l += l2
+ if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
+ return l
+
+ def write(self, buf):
+ self.out_buf += buf
+
+ async def drain(self):
+ mv = memoryview(self.out_buf)
+ off = 0
+ while off < len(mv):
+ yield core._io_queue.queue_write(self.s)
+ ret = self.s.write(mv[off:])
+ if ret is not None:
+ off += ret
+ self.out_buf = b""
+
+
+# Create a TCP stream connection to a remote host
+async def open_connection(host, port):
+ from uerrno import EINPROGRESS
+ import usocket as socket
+
+ ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
+ s = socket.socket()
+ s.setblocking(False)
+ ss = Stream(s)
+ try:
+ s.connect(ai[-1])
+ except OSError as er:
+ if er.args[0] != EINPROGRESS:
+ raise er
+ yield core._io_queue.queue_write(s)
+ return ss, ss
+
+
+# Class representing a TCP stream server, can be closed and used in "async with"
+class Server:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb):
+ self.close()
+ await self.wait_closed()
+
+ def close(self):
+ self.task.cancel()
+
+ async def wait_closed(self):
+ await self.task
+
+ async def _serve(self, cb, host, port, backlog):
+ import usocket as socket
+
+ ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
+ s = socket.socket()
+ s.setblocking(False)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind(ai[-1])
+ s.listen(backlog)
+ self.task = core.cur_task
+ # Accept incoming connections
+ while True:
+ try:
+ yield core._io_queue.queue_read(s)
+ except core.CancelledError:
+ # Shutdown server
+ s.close()
+ return
+ try:
+ s2, addr = s.accept()
+ except:
+ # Ignore a failed accept
+ continue
+ s2.setblocking(False)
+ s2s = Stream(s2, {"peername": addr})
+ core.create_task(cb(s2s, s2s))
+
+
+# Helper function to start a TCP stream server, running as a new task
+# TODO could use an accept-callback on socket read activity instead of creating a task
+async def start_server(cb, host, port, backlog=5):
+ s = Server()
+ core.create_task(s._serve(cb, host, port, backlog))
+ return s
+
+
+################################################################################
+# Legacy uasyncio compatibility
+
+
+async def stream_awrite(self, buf, off=0, sz=-1):
+ if off != 0 or sz != -1:
+ buf = memoryview(buf)
+ if sz == -1:
+ sz = len(buf)
+ buf = buf[off : off + sz]
+ self.write(buf)
+ await self.drain()
+
+
+Stream.aclose = Stream.wait_closed
+Stream.awrite = stream_awrite
+Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?
diff --git a/extmod/uasyncio/task.py b/extmod/uasyncio/task.py
new file mode 100644
index 0000000000..1d5bcc5cf7
--- /dev/null
+++ b/extmod/uasyncio/task.py
@@ -0,0 +1,168 @@
+# MicroPython uasyncio module
+# MIT license; Copyright (c) 2019-2020 Damien P. George
+
+# This file contains the core TaskQueue based on a pairing heap, and the core Task class.
+# They can optionally be replaced by C implementations.
+
+from . import core
+
+
+# pairing-heap meld of 2 heaps; O(1)
+def ph_meld(h1, h2):
+ if h1 is None:
+ return h2
+ if h2 is None:
+ return h1
+ lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
+ if lt:
+ if h1.ph_child is None:
+ h1.ph_child = h2
+ else:
+ h1.ph_child_last.ph_next = h2
+ h1.ph_child_last = h2
+ h2.ph_next = None
+ h2.ph_rightmost_parent = h1
+ return h1
+ else:
+ h1.ph_next = h2.ph_child
+ h2.ph_child = h1
+ if h1.ph_next is None:
+ h2.ph_child_last = h1
+ h1.ph_rightmost_parent = h2
+ return h2
+
+
+# pairing-heap pairing operation; amortised O(log N)
+def ph_pairing(child):
+ heap = None
+ while child is not None:
+ n1 = child
+ child = child.ph_next
+ n1.ph_next = None
+ if child is not None:
+ n2 = child
+ child = child.ph_next
+ n2.ph_next = None
+ n1 = ph_meld(n1, n2)
+ heap = ph_meld(heap, n1)
+ return heap
+
+
+# pairing-heap delete of a node; stable, amortised O(log N)
+def ph_delete(heap, node):
+ if node is heap:
+ child = heap.ph_child
+ node.ph_child = None
+ return ph_pairing(child)
+ # Find parent of node
+ parent = node
+ while parent.ph_next is not None:
+ parent = parent.ph_next
+ parent = parent.ph_rightmost_parent
+ # Replace node with pairing of its children
+ if node is parent.ph_child and node.ph_child is None:
+ parent.ph_child = node.ph_next
+ node.ph_next = None
+ return heap
+ elif node is parent.ph_child:
+ child = node.ph_child
+ next = node.ph_next
+ node.ph_child = None
+ node.ph_next = None
+ node = ph_pairing(child)
+ parent.ph_child = node
+ else:
+ n = parent.ph_child
+ while node is not n.ph_next:
+ n = n.ph_next
+ child = node.ph_child
+ next = node.ph_next
+ node.ph_child = None
+ node.ph_next = None
+ node = ph_pairing(child)
+ if node is None:
+ node = n
+ else:
+ n.ph_next = node
+ node.ph_next = next
+ if next is None:
+ node.ph_rightmost_parent = parent
+ parent.ph_child_last = node
+ return heap
+
+
+# TaskQueue class based on the above pairing-heap functions.
+class TaskQueue:
+ def __init__(self):
+ self.heap = None
+
+ def peek(self):
+ return self.heap
+
+ def push_sorted(self, v, key):
+ v.data = None
+ v.ph_key = key
+ v.ph_child = None
+ v.ph_next = None
+ self.heap = ph_meld(v, self.heap)
+
+ def push_head(self, v):
+ self.push_sorted(v, core.ticks())
+
+ def pop_head(self):
+ v = self.heap
+ self.heap = ph_pairing(self.heap.ph_child)
+ return v
+
+ def remove(self, v):
+ self.heap = ph_delete(self.heap, v)
+
+
+# Task class representing a coroutine, can be waited on and cancelled.
+class Task:
+ def __init__(self, coro, globals=None):
+ self.coro = coro # Coroutine of this Task
+ self.data = None # General data for queue it is waiting on
+ self.ph_key = 0 # Pairing heap
+ self.ph_child = None # Paring heap
+ self.ph_child_last = None # Paring heap
+ self.ph_next = None # Paring heap
+ self.ph_rightmost_parent = None # Paring heap
+
+ def __iter__(self):
+ if not hasattr(self, "waiting"):
+ # Lazily allocated head of linked list of Tasks waiting on completion of this task.
+ self.waiting = TaskQueue()
+ return self
+
+ def __next__(self):
+ if not self.coro:
+ # Task finished, raise return value to caller so it can continue.
+ raise self.data
+ else:
+ # Put calling task on waiting queue.
+ self.waiting.push_head(core.cur_task)
+ # Set calling task's data to this task that it waits on, to double-link it.
+ core.cur_task.data = self
+
+ def cancel(self):
+ # Check if task is already finished.
+ if self.coro is None:
+ return False
+ # Can't cancel self (not supported yet).
+ if self is core.cur_task:
+ raise RuntimeError("cannot cancel self")
+ # If Task waits on another task then forward the cancel to the one it's waiting on.
+ while isinstance(self.data, Task):
+ self = self.data
+ # Reschedule Task as a cancelled task.
+ if hasattr(self.data, "remove"):
+ # Not on the main running queue, remove the task from the queue it's on.
+ self.data.remove(self)
+ core._task_queue.push_head(self)
+ elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
+ # On the main running queue but scheduled in the future, so bring it forward to now.
+ core._task_queue.remove(self)
+ core._task_queue.push_head(self)
+ self.data = core.CancelledError
+ return True