summaryrefslogtreecommitdiffstatshomepage
path: root/extmod/uasyncio/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'extmod/uasyncio/core.py')
-rw-r--r--extmod/uasyncio/core.py302
1 files changed, 0 insertions, 302 deletions
diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py
deleted file mode 100644
index f191d202e2..0000000000
--- a/extmod/uasyncio/core.py
+++ /dev/null
@@ -1,302 +0,0 @@
-# MicroPython uasyncio module
-# MIT license; Copyright (c) 2019 Damien P. George
-
-from time import ticks_ms as ticks, ticks_diff, ticks_add
-import sys, select
-
-# Import TaskQueue and Task, preferring built-in C code over Python code
-try:
- from _uasyncio import TaskQueue, Task
-except:
- from .task import TaskQueue, Task
-
-
-################################################################################
-# Exceptions
-
-
-class CancelledError(BaseException):
- pass
-
-
-class TimeoutError(Exception):
- pass
-
-
-# Used when calling Loop.call_exception_handler
-_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
-
-
-################################################################################
-# Sleep functions
-
-
-# "Yield" once, then raise StopIteration
-class SingletonGenerator:
- def __init__(self):
- self.state = None
- self.exc = StopIteration()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- if self.state is not None:
- _task_queue.push(cur_task, self.state)
- self.state = None
- return None
- else:
- self.exc.__traceback__ = None
- raise self.exc
-
-
-# Pause task execution for the given time (integer in milliseconds, uPy extension)
-# Use a SingletonGenerator to do it without allocating on the heap
-def sleep_ms(t, sgen=SingletonGenerator()):
- assert sgen.state is None
- sgen.state = ticks_add(ticks(), max(0, t))
- return sgen
-
-
-# Pause task execution for the given time (in seconds)
-def sleep(t):
- return sleep_ms(int(t * 1000))
-
-
-################################################################################
-# Queue and poller for stream IO
-
-
-class IOQueue:
- def __init__(self):
- self.poller = select.poll()
- self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
-
- def _enqueue(self, s, idx):
- if id(s) not in self.map:
- entry = [None, None, s]
- entry[idx] = cur_task
- self.map[id(s)] = entry
- self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
- else:
- sm = self.map[id(s)]
- assert sm[idx] is None
- assert sm[1 - idx] is not None
- sm[idx] = cur_task
- self.poller.modify(s, select.POLLIN | select.POLLOUT)
- # Link task to this IOQueue so it can be removed if needed
- cur_task.data = self
-
- def _dequeue(self, s):
- del self.map[id(s)]
- self.poller.unregister(s)
-
- def queue_read(self, s):
- self._enqueue(s, 0)
-
- def queue_write(self, s):
- self._enqueue(s, 1)
-
- def remove(self, task):
- while True:
- del_s = None
- for k in self.map: # Iterate without allocating on the heap
- q0, q1, s = self.map[k]
- if q0 is task or q1 is task:
- del_s = s
- break
- if del_s is not None:
- self._dequeue(s)
- else:
- break
-
- def wait_io_event(self, dt):
- for s, ev in self.poller.ipoll(dt):
- sm = self.map[id(s)]
- # print('poll', s, sm, ev)
- if ev & ~select.POLLOUT and sm[0] is not None:
- # POLLIN or error
- _task_queue.push(sm[0])
- sm[0] = None
- if ev & ~select.POLLIN and sm[1] is not None:
- # POLLOUT or error
- _task_queue.push(sm[1])
- sm[1] = None
- if sm[0] is None and sm[1] is None:
- self._dequeue(s)
- elif sm[0] is None:
- self.poller.modify(s, select.POLLOUT)
- else:
- self.poller.modify(s, select.POLLIN)
-
-
-################################################################################
-# Main run loop
-
-
-# Ensure the awaitable is a task
-def _promote_to_task(aw):
- return aw if isinstance(aw, Task) else create_task(aw)
-
-
-# Create and schedule a new task from a coroutine
-def create_task(coro):
- if not hasattr(coro, "send"):
- raise TypeError("coroutine expected")
- t = Task(coro, globals())
- _task_queue.push(t)
- return t
-
-
-# Keep scheduling tasks until there are none left to schedule
-def run_until_complete(main_task=None):
- global cur_task
- excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
- excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
- while True:
- # Wait until the head of _task_queue is ready to run
- dt = 1
- while dt > 0:
- dt = -1
- t = _task_queue.peek()
- if t:
- # A task waiting on _task_queue; "ph_key" is time to schedule task at
- dt = max(0, ticks_diff(t.ph_key, ticks()))
- elif not _io_queue.map:
- # No tasks can be woken so finished running
- return
- # print('(poll {})'.format(dt), len(_io_queue.map))
- _io_queue.wait_io_event(dt)
-
- # Get next task to run and continue it
- t = _task_queue.pop()
- cur_task = t
- try:
- # Continue running the coroutine, it's responsible for rescheduling itself
- exc = t.data
- if not exc:
- t.coro.send(None)
- else:
- # If the task is finished and on the run queue and gets here, then it
- # had an exception and was not await'ed on. Throwing into it now will
- # raise StopIteration and the code below will catch this and run the
- # call_exception_handler function.
- t.data = None
- t.coro.throw(exc)
- except excs_all as er:
- # Check the task is not on any event queue
- assert t.data is None
- # This task is done, check if it's the main task and then loop should stop
- if t is main_task:
- if isinstance(er, StopIteration):
- return er.value
- raise er
- if t.state:
- # Task was running but is now finished.
- waiting = False
- if t.state is True:
- # "None" indicates that the task is complete and not await'ed on (yet).
- t.state = None
- elif callable(t.state):
- # The task has a callback registered to be called on completion.
- t.state(t, er)
- t.state = False
- waiting = True
- else:
- # Schedule any other tasks waiting on the completion of this task.
- while t.state.peek():
- _task_queue.push(t.state.pop())
- waiting = True
- # "False" indicates that the task is complete and has been await'ed on.
- t.state = False
- if not waiting and not isinstance(er, excs_stop):
- # An exception ended this detached task, so queue it for later
- # execution to handle the uncaught exception if no other task retrieves
- # the exception in the meantime (this is handled by Task.throw).
- _task_queue.push(t)
- # Save return value of coro to pass up to caller.
- t.data = er
- elif t.state is None:
- # Task is already finished and nothing await'ed on the task,
- # so call the exception handler.
- _exc_context["exception"] = exc
- _exc_context["future"] = t
- Loop.call_exception_handler(_exc_context)
-
-
-# Create a new task from a coroutine and run it until it finishes
-def run(coro):
- return run_until_complete(create_task(coro))
-
-
-################################################################################
-# Event loop wrapper
-
-
-async def _stopper():
- pass
-
-
-_stop_task = None
-
-
-class Loop:
- _exc_handler = None
-
- def create_task(coro):
- return create_task(coro)
-
- def run_forever():
- global _stop_task
- _stop_task = Task(_stopper(), globals())
- run_until_complete(_stop_task)
- # TODO should keep running until .stop() is called, even if there're no tasks left
-
- def run_until_complete(aw):
- return run_until_complete(_promote_to_task(aw))
-
- def stop():
- global _stop_task
- if _stop_task is not None:
- _task_queue.push(_stop_task)
- # If stop() is called again, do nothing
- _stop_task = None
-
- def close():
- pass
-
- def set_exception_handler(handler):
- Loop._exc_handler = handler
-
- def get_exception_handler():
- return Loop._exc_handler
-
- def default_exception_handler(loop, context):
- print(context["message"])
- print("future:", context["future"], "coro=", context["future"].coro)
- sys.print_exception(context["exception"])
-
- def call_exception_handler(context):
- (Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
-
-
-# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
-def get_event_loop(runq_len=0, waitq_len=0):
- return Loop
-
-
-def current_task():
- return cur_task
-
-
-def new_event_loop():
- global _task_queue, _io_queue
- # TaskQueue of Task instances
- _task_queue = TaskQueue()
- # Task queue and poller for stream IO
- _io_queue = IOQueue()
- return Loop
-
-
-# Initialise default event loop
-new_event_loop()