summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--extmod/asyncio/core.py36
-rw-r--r--tests/extmod/asyncio_event_queue.py64
-rw-r--r--tests/extmod/asyncio_event_queue.py.exp5
-rw-r--r--tests/extmod/asyncio_iterator_event.py86
-rw-r--r--tests/extmod/asyncio_iterator_event.py.exp5
-rw-r--r--tests/extmod/asyncio_wait_for_linked_task.py66
-rwxr-xr-xtests/run-tests.py5
7 files changed, 255 insertions, 12 deletions
diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py
index 8aad234514..5d46b4b80e 100644
--- a/extmod/asyncio/core.py
+++ b/extmod/asyncio/core.py
@@ -163,9 +163,16 @@ def run_until_complete(main_task=None):
# A task waiting on _task_queue; "ph_key" is time to schedule task at
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
- # No tasks can be woken so finished running
+ # No tasks can be woken
cur_task = None
- return
+ if not main_task or not main_task.state:
+ # no main_task, or main_task is done so finished running
+ return
+ # At this point, there is theoretically nothing that could wake the
+ # scheduler, but it is not allowed to exit either. We keep the code
+ # running so that a hypothetical debugger (or other such meta-process)
+ # can get a view of what is happening and possibly abort.
+ dt = 3
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)
@@ -187,31 +194,33 @@ def run_until_complete(main_task=None):
except excs_all as er:
# Check the task is not on any event queue
assert t.data is None
- # This task is done, check if it's the main task and then loop should stop
- if t is main_task:
+ # If it's the main task, it is considered as awaited by the caller
+ awaited = t is main_task
+ if awaited:
cur_task = None
- if isinstance(er, StopIteration):
- return er.value
- raise er
+ if not isinstance(er, StopIteration):
+ t.state = False
+ raise er
+ if t.state is None:
+ t.state = False
if t.state:
# Task was running but is now finished.
- waiting = False
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
- t.state = None
+ t.state = False if awaited else None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
- waiting = True
+ awaited = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
_task_queue.push(t.state.pop())
- waiting = True
+ awaited = True
# "False" indicates that the task is complete and has been await'ed on.
t.state = False
- if not waiting and not isinstance(er, excs_stop):
+ if not awaited and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
@@ -229,6 +238,9 @@ def run_until_complete(main_task=None):
_exc_context["exception"] = exc
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)
+ # If it's the main task then the loop should stop
+ if t is main_task:
+ return er.value
# Create a new task from a coroutine and run it until it finishes
diff --git a/tests/extmod/asyncio_event_queue.py b/tests/extmod/asyncio_event_queue.py
new file mode 100644
index 0000000000..e0125b1aef
--- /dev/null
+++ b/tests/extmod/asyncio_event_queue.py
@@ -0,0 +1,64 @@
+# Ensure that an asyncio task can wait on an Event when the
+# _task_queue is empty
+# https://github.com/micropython/micropython/issues/16569
+
+try:
+ import asyncio
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# This test requires checking that the asyncio scheduler
+# remains active "indefinitely" when the task queue is empty.
+#
+# To check this, we need another independent scheduler that
+# can wait for a certain amount of time. So we have to
+# create one using micropython.schedule() and time.ticks_ms()
+#
+# Technically, this code breaks the rules, as it is clearly
+# documented that Event.set() should _NOT_ be called from a
+# schedule (soft IRQ) because in some cases, a race condition
+# can occur, resulting in a crash. However:
+# - since the risk of a race condition in that specific
+# case has been analysed and excluded
+# - given that there is no other simple alternative to
+# write this test case,
+# an exception to the rule was deemed acceptable. See
+# https://github.com/micropython/micropython/pull/16772
+
+import micropython, time
+
+try:
+ micropython.schedule
+except AttributeError:
+ print("SKIP")
+ raise SystemExit
+
+
+evt = asyncio.Event()
+
+
+def schedule_watchdog(end_ticks):
+ if time.ticks_diff(end_ticks, time.ticks_ms()) <= 0:
+ print("asyncio still pending, unlocking event")
+ # Caution: about to call Event.set() from a schedule
+ # (see the note in the comment above)
+ evt.set()
+ return
+ micropython.schedule(schedule_watchdog, end_ticks)
+
+
+async def foo():
+ print("foo waiting")
+ schedule_watchdog(time.ticks_add(time.ticks_ms(), 100))
+ await evt.wait()
+ print("foo done")
+
+
+async def main():
+ print("main started")
+ await foo()
+ print("main done")
+
+
+asyncio.run(main())
diff --git a/tests/extmod/asyncio_event_queue.py.exp b/tests/extmod/asyncio_event_queue.py.exp
new file mode 100644
index 0000000000..ee42c96d83
--- /dev/null
+++ b/tests/extmod/asyncio_event_queue.py.exp
@@ -0,0 +1,5 @@
+main started
+foo waiting
+asyncio still pending, unlocking event
+foo done
+main done
diff --git a/tests/extmod/asyncio_iterator_event.py b/tests/extmod/asyncio_iterator_event.py
new file mode 100644
index 0000000000..6efa6b8645
--- /dev/null
+++ b/tests/extmod/asyncio_iterator_event.py
@@ -0,0 +1,86 @@
+# Ensure that an asyncio task can wait on an Event when the
+# _task_queue is empty, in the context of an async iterator
+# https://github.com/micropython/micropython/issues/16318
+
+try:
+ import asyncio
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+# This test requires checking that the asyncio scheduler
+# remains active "indefinitely" when the task queue is empty.
+#
+# To check this, we need another independent scheduler that
+# can wait for a certain amount of time. So we have to
+# create one using micropython.schedule() and time.ticks_ms()
+#
+# Technically, this code breaks the rules, as it is clearly
+# documented that Event.set() should _NOT_ be called from a
+# schedule (soft IRQ) because in some cases, a race condition
+# can occur, resulting in a crash. However:
+# - since the risk of a race condition in that specific
+# case has been analysed and excluded
+# - given that there is no other simple alternative to
+# write this test case,
+# an exception to the rule was deemed acceptable. See
+# https://github.com/micropython/micropython/pull/16772
+
+import micropython, time
+
+try:
+ micropython.schedule
+except AttributeError:
+ print("SKIP")
+ raise SystemExit
+
+ai = None
+
+
+def schedule_watchdog(end_ticks):
+ if time.ticks_diff(end_ticks, time.ticks_ms()) <= 0:
+ print("good: asyncio iterator is still pending, exiting")
+ # Caution: ai.fetch_data() will invoke Event.set()
+ # (see the note in the comment above)
+ ai.fetch_data(None)
+ return
+ micropython.schedule(schedule_watchdog, end_ticks)
+
+
+async def test(ai):
+ for x in range(3):
+ await asyncio.sleep(0.1)
+ ai.fetch_data(f"bar {x}")
+
+
+class AsyncIterable:
+ def __init__(self):
+ self.message = None
+ self.evt = asyncio.Event()
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ await self.evt.wait()
+ self.evt.clear()
+ if self.message is None:
+ raise StopAsyncIteration
+ return self.message
+
+ def fetch_data(self, message):
+ self.message = message
+ self.evt.set()
+
+
+async def main():
+ global ai
+ ai = AsyncIterable()
+ asyncio.create_task(test(ai))
+ schedule_watchdog(time.ticks_add(time.ticks_ms(), 500))
+ async for message in ai:
+ print(message)
+ print("end main")
+
+
+asyncio.run(main())
diff --git a/tests/extmod/asyncio_iterator_event.py.exp b/tests/extmod/asyncio_iterator_event.py.exp
new file mode 100644
index 0000000000..a1893197d0
--- /dev/null
+++ b/tests/extmod/asyncio_iterator_event.py.exp
@@ -0,0 +1,5 @@
+bar 0
+bar 1
+bar 2
+good: asyncio iterator is still pending, exiting
+end main
diff --git a/tests/extmod/asyncio_wait_for_linked_task.py b/tests/extmod/asyncio_wait_for_linked_task.py
new file mode 100644
index 0000000000..4dda62d547
--- /dev/null
+++ b/tests/extmod/asyncio_wait_for_linked_task.py
@@ -0,0 +1,66 @@
+# Test asyncio.wait_for, with dependent tasks
+# https://github.com/micropython/micropython/issues/16759
+
+try:
+ import asyncio
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+
+# CPython 3.12 deprecated calling get_event_loop() when there is no current event
+# loop, so to make this test run on CPython requires setting the event loop.
+if hasattr(asyncio, "set_event_loop"):
+ asyncio.set_event_loop(asyncio.new_event_loop())
+
+
+class Worker:
+ def __init__(self):
+ self._eventLoop = None
+ self._tasks = []
+
+ def launchTask(self, asyncJob):
+ if self._eventLoop is None:
+ self._eventLoop = asyncio.get_event_loop()
+ return self._eventLoop.create_task(asyncJob)
+
+ async def job(self, prerequisite, taskName):
+ if prerequisite:
+ await prerequisite
+ await asyncio.sleep(0.1)
+ print(taskName, "work completed")
+
+ def planTasks(self):
+ self._tasks.append(self.launchTask(self.job(None, "task0")))
+ self._tasks.append(self.launchTask(self.job(self._tasks[0], "task1")))
+ self._tasks.append(self.launchTask(self.job(self._tasks[1], "task2")))
+
+ async def waitForTask(self, taskIdx):
+ return await self._tasks[taskIdx]
+
+ def syncWaitForTask(self, taskIdx):
+ return self._eventLoop.run_until_complete(self._tasks[taskIdx])
+
+
+async def async_test():
+ print("--- async test")
+ worker = Worker()
+ worker.planTasks()
+ await worker.waitForTask(0)
+ print("-> task0 done")
+ await worker.waitForTask(2)
+ print("-> task2 done")
+
+
+def sync_test():
+ print("--- sync test")
+ worker = Worker()
+ worker.planTasks()
+ worker.syncWaitForTask(0)
+ print("-> task0 done")
+ worker.syncWaitForTask(2)
+ print("-> task2 done")
+
+
+asyncio.get_event_loop().run_until_complete(async_test())
+sync_test()
diff --git a/tests/run-tests.py b/tests/run-tests.py
index 9e7cab4689..ac411a0be6 100755
--- a/tests/run-tests.py
+++ b/tests/run-tests.py
@@ -162,6 +162,9 @@ platform_tests_to_skip = {
"extmod/asyncio_new_event_loop.py",
"extmod/asyncio_threadsafeflag.py",
"extmod/asyncio_wait_for_fwd.py",
+ "extmod/asyncio_event_queue.py",
+ "extmod/asyncio_iterator_event.py",
+ "extmod/asyncio_wait_for_linked_task.py",
"extmod/binascii_a2b_base64.py",
"extmod/deflate_compress_memory_error.py", # tries to allocate unlimited memory
"extmod/re_stack_overflow.py",
@@ -843,6 +846,8 @@ def run_tests(pyb, tests, args, result_dir, num_threads=1):
) # native doesn't have proper traceback info
skip_tests.add("micropython/schedule.py") # native code doesn't check pending events
skip_tests.add("stress/bytecode_limit.py") # bytecode specific test
+ skip_tests.add("extmod/asyncio_event_queue.py") # native can't run schedule
+ skip_tests.add("extmod/asyncio_iterator_event.py") # native can't run schedule
def run_one_test(test_file):
test_file = test_file.replace("\\", "/")