diff options
-rw-r--r-- | extmod/asyncio/core.py | 36 | ||||
-rw-r--r-- | tests/extmod/asyncio_event_queue.py | 64 | ||||
-rw-r--r-- | tests/extmod/asyncio_event_queue.py.exp | 5 | ||||
-rw-r--r-- | tests/extmod/asyncio_iterator_event.py | 86 | ||||
-rw-r--r-- | tests/extmod/asyncio_iterator_event.py.exp | 5 | ||||
-rw-r--r-- | tests/extmod/asyncio_wait_for_linked_task.py | 66 | ||||
-rwxr-xr-x | tests/run-tests.py | 5 |
7 files changed, 255 insertions, 12 deletions
diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py index 8aad234514..5d46b4b80e 100644 --- a/extmod/asyncio/core.py +++ b/extmod/asyncio/core.py @@ -163,9 +163,16 @@ def run_until_complete(main_task=None): # A task waiting on _task_queue; "ph_key" is time to schedule task at dt = max(0, ticks_diff(t.ph_key, ticks())) elif not _io_queue.map: - # No tasks can be woken so finished running + # No tasks can be woken cur_task = None - return + if not main_task or not main_task.state: + # no main_task, or main_task is done so finished running + return + # At this point, there is theoretically nothing that could wake the + # scheduler, but it is not allowed to exit either. We keep the code + # running so that a hypothetical debugger (or other such meta-process) + # can get a view of what is happening and possibly abort. + dt = 3 # print('(poll {})'.format(dt), len(_io_queue.map)) _io_queue.wait_io_event(dt) @@ -187,31 +194,33 @@ def run_until_complete(main_task=None): except excs_all as er: # Check the task is not on any event queue assert t.data is None - # This task is done, check if it's the main task and then loop should stop - if t is main_task: + # If it's the main task, it is considered as awaited by the caller + awaited = t is main_task + if awaited: cur_task = None - if isinstance(er, StopIteration): - return er.value - raise er + if not isinstance(er, StopIteration): + t.state = False + raise er + if t.state is None: + t.state = False if t.state: # Task was running but is now finished. - waiting = False if t.state is True: # "None" indicates that the task is complete and not await'ed on (yet). - t.state = None + t.state = False if awaited else None elif callable(t.state): # The task has a callback registered to be called on completion. t.state(t, er) t.state = False - waiting = True + awaited = True else: # Schedule any other tasks waiting on the completion of this task. while t.state.peek(): _task_queue.push(t.state.pop()) - waiting = True + awaited = True # "False" indicates that the task is complete and has been await'ed on. t.state = False - if not waiting and not isinstance(er, excs_stop): + if not awaited and not isinstance(er, excs_stop): # An exception ended this detached task, so queue it for later # execution to handle the uncaught exception if no other task retrieves # the exception in the meantime (this is handled by Task.throw). @@ -229,6 +238,9 @@ def run_until_complete(main_task=None): _exc_context["exception"] = exc _exc_context["future"] = t Loop.call_exception_handler(_exc_context) + # If it's the main task then the loop should stop + if t is main_task: + return er.value # Create a new task from a coroutine and run it until it finishes diff --git a/tests/extmod/asyncio_event_queue.py b/tests/extmod/asyncio_event_queue.py new file mode 100644 index 0000000000..e0125b1aef --- /dev/null +++ b/tests/extmod/asyncio_event_queue.py @@ -0,0 +1,64 @@ +# Ensure that an asyncio task can wait on an Event when the +# _task_queue is empty +# https://github.com/micropython/micropython/issues/16569 + +try: + import asyncio +except ImportError: + print("SKIP") + raise SystemExit + +# This test requires checking that the asyncio scheduler +# remains active "indefinitely" when the task queue is empty. +# +# To check this, we need another independent scheduler that +# can wait for a certain amount of time. So we have to +# create one using micropython.schedule() and time.ticks_ms() +# +# Technically, this code breaks the rules, as it is clearly +# documented that Event.set() should _NOT_ be called from a +# schedule (soft IRQ) because in some cases, a race condition +# can occur, resulting in a crash. However: +# - since the risk of a race condition in that specific +# case has been analysed and excluded +# - given that there is no other simple alternative to +# write this test case, +# an exception to the rule was deemed acceptable. See +# https://github.com/micropython/micropython/pull/16772 + +import micropython, time + +try: + micropython.schedule +except AttributeError: + print("SKIP") + raise SystemExit + + +evt = asyncio.Event() + + +def schedule_watchdog(end_ticks): + if time.ticks_diff(end_ticks, time.ticks_ms()) <= 0: + print("asyncio still pending, unlocking event") + # Caution: about to call Event.set() from a schedule + # (see the note in the comment above) + evt.set() + return + micropython.schedule(schedule_watchdog, end_ticks) + + +async def foo(): + print("foo waiting") + schedule_watchdog(time.ticks_add(time.ticks_ms(), 100)) + await evt.wait() + print("foo done") + + +async def main(): + print("main started") + await foo() + print("main done") + + +asyncio.run(main()) diff --git a/tests/extmod/asyncio_event_queue.py.exp b/tests/extmod/asyncio_event_queue.py.exp new file mode 100644 index 0000000000..ee42c96d83 --- /dev/null +++ b/tests/extmod/asyncio_event_queue.py.exp @@ -0,0 +1,5 @@ +main started +foo waiting +asyncio still pending, unlocking event +foo done +main done diff --git a/tests/extmod/asyncio_iterator_event.py b/tests/extmod/asyncio_iterator_event.py new file mode 100644 index 0000000000..6efa6b8645 --- /dev/null +++ b/tests/extmod/asyncio_iterator_event.py @@ -0,0 +1,86 @@ +# Ensure that an asyncio task can wait on an Event when the +# _task_queue is empty, in the context of an async iterator +# https://github.com/micropython/micropython/issues/16318 + +try: + import asyncio +except ImportError: + print("SKIP") + raise SystemExit + +# This test requires checking that the asyncio scheduler +# remains active "indefinitely" when the task queue is empty. +# +# To check this, we need another independent scheduler that +# can wait for a certain amount of time. So we have to +# create one using micropython.schedule() and time.ticks_ms() +# +# Technically, this code breaks the rules, as it is clearly +# documented that Event.set() should _NOT_ be called from a +# schedule (soft IRQ) because in some cases, a race condition +# can occur, resulting in a crash. However: +# - since the risk of a race condition in that specific +# case has been analysed and excluded +# - given that there is no other simple alternative to +# write this test case, +# an exception to the rule was deemed acceptable. See +# https://github.com/micropython/micropython/pull/16772 + +import micropython, time + +try: + micropython.schedule +except AttributeError: + print("SKIP") + raise SystemExit + +ai = None + + +def schedule_watchdog(end_ticks): + if time.ticks_diff(end_ticks, time.ticks_ms()) <= 0: + print("good: asyncio iterator is still pending, exiting") + # Caution: ai.fetch_data() will invoke Event.set() + # (see the note in the comment above) + ai.fetch_data(None) + return + micropython.schedule(schedule_watchdog, end_ticks) + + +async def test(ai): + for x in range(3): + await asyncio.sleep(0.1) + ai.fetch_data(f"bar {x}") + + +class AsyncIterable: + def __init__(self): + self.message = None + self.evt = asyncio.Event() + + def __aiter__(self): + return self + + async def __anext__(self): + await self.evt.wait() + self.evt.clear() + if self.message is None: + raise StopAsyncIteration + return self.message + + def fetch_data(self, message): + self.message = message + self.evt.set() + + +async def main(): + global ai + ai = AsyncIterable() + asyncio.create_task(test(ai)) + schedule_watchdog(time.ticks_add(time.ticks_ms(), 500)) + async for message in ai: + print(message) + print("end main") + + +asyncio.run(main()) diff --git a/tests/extmod/asyncio_iterator_event.py.exp b/tests/extmod/asyncio_iterator_event.py.exp new file mode 100644 index 0000000000..a1893197d0 --- /dev/null +++ b/tests/extmod/asyncio_iterator_event.py.exp @@ -0,0 +1,5 @@ +bar 0 +bar 1 +bar 2 +good: asyncio iterator is still pending, exiting +end main diff --git a/tests/extmod/asyncio_wait_for_linked_task.py b/tests/extmod/asyncio_wait_for_linked_task.py new file mode 100644 index 0000000000..4dda62d547 --- /dev/null +++ b/tests/extmod/asyncio_wait_for_linked_task.py @@ -0,0 +1,66 @@ +# Test asyncio.wait_for, with dependent tasks +# https://github.com/micropython/micropython/issues/16759 + +try: + import asyncio +except ImportError: + print("SKIP") + raise SystemExit + + +# CPython 3.12 deprecated calling get_event_loop() when there is no current event +# loop, so to make this test run on CPython requires setting the event loop. +if hasattr(asyncio, "set_event_loop"): + asyncio.set_event_loop(asyncio.new_event_loop()) + + +class Worker: + def __init__(self): + self._eventLoop = None + self._tasks = [] + + def launchTask(self, asyncJob): + if self._eventLoop is None: + self._eventLoop = asyncio.get_event_loop() + return self._eventLoop.create_task(asyncJob) + + async def job(self, prerequisite, taskName): + if prerequisite: + await prerequisite + await asyncio.sleep(0.1) + print(taskName, "work completed") + + def planTasks(self): + self._tasks.append(self.launchTask(self.job(None, "task0"))) + self._tasks.append(self.launchTask(self.job(self._tasks[0], "task1"))) + self._tasks.append(self.launchTask(self.job(self._tasks[1], "task2"))) + + async def waitForTask(self, taskIdx): + return await self._tasks[taskIdx] + + def syncWaitForTask(self, taskIdx): + return self._eventLoop.run_until_complete(self._tasks[taskIdx]) + + +async def async_test(): + print("--- async test") + worker = Worker() + worker.planTasks() + await worker.waitForTask(0) + print("-> task0 done") + await worker.waitForTask(2) + print("-> task2 done") + + +def sync_test(): + print("--- sync test") + worker = Worker() + worker.planTasks() + worker.syncWaitForTask(0) + print("-> task0 done") + worker.syncWaitForTask(2) + print("-> task2 done") + + +asyncio.get_event_loop().run_until_complete(async_test()) +sync_test() diff --git a/tests/run-tests.py b/tests/run-tests.py index 9e7cab4689..ac411a0be6 100755 --- a/tests/run-tests.py +++ b/tests/run-tests.py @@ -162,6 +162,9 @@ platform_tests_to_skip = { "extmod/asyncio_new_event_loop.py", "extmod/asyncio_threadsafeflag.py", "extmod/asyncio_wait_for_fwd.py", + "extmod/asyncio_event_queue.py", + "extmod/asyncio_iterator_event.py", + "extmod/asyncio_wait_for_linked_task.py", "extmod/binascii_a2b_base64.py", "extmod/deflate_compress_memory_error.py", # tries to allocate unlimited memory "extmod/re_stack_overflow.py", @@ -843,6 +846,8 @@ def run_tests(pyb, tests, args, result_dir, num_threads=1): ) # native doesn't have proper traceback info skip_tests.add("micropython/schedule.py") # native code doesn't check pending events skip_tests.add("stress/bytecode_limit.py") # bytecode specific test + skip_tests.add("extmod/asyncio_event_queue.py") # native can't run schedule + skip_tests.add("extmod/asyncio_iterator_event.py") # native can't run schedule def run_one_test(test_file): test_file = test_file.replace("\\", "/") |