blob: 4dda62d5476c7d9519c4246186e445e14bed2fc4 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# Test asyncio.wait_for, with dependent tasks
# https://github.com/micropython/micropython/issues/16759
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
# CPython 3.12 deprecated calling get_event_loop() when there is no current event
# loop, so to make this test run on CPython requires setting the event loop.
if hasattr(asyncio, "set_event_loop"):
asyncio.set_event_loop(asyncio.new_event_loop())
class Worker:
def __init__(self):
self._eventLoop = None
self._tasks = []
def launchTask(self, asyncJob):
if self._eventLoop is None:
self._eventLoop = asyncio.get_event_loop()
return self._eventLoop.create_task(asyncJob)
async def job(self, prerequisite, taskName):
if prerequisite:
await prerequisite
await asyncio.sleep(0.1)
print(taskName, "work completed")
def planTasks(self):
self._tasks.append(self.launchTask(self.job(None, "task0")))
self._tasks.append(self.launchTask(self.job(self._tasks[0], "task1")))
self._tasks.append(self.launchTask(self.job(self._tasks[1], "task2")))
async def waitForTask(self, taskIdx):
return await self._tasks[taskIdx]
def syncWaitForTask(self, taskIdx):
return self._eventLoop.run_until_complete(self._tasks[taskIdx])
async def async_test():
print("--- async test")
worker = Worker()
worker.planTasks()
await worker.waitForTask(0)
print("-> task0 done")
await worker.waitForTask(2)
print("-> task2 done")
def sync_test():
print("--- sync test")
worker = Worker()
worker.planTasks()
worker.syncWaitForTask(0)
print("-> task0 done")
worker.syncWaitForTask(2)
print("-> task2 done")
asyncio.get_event_loop().run_until_complete(async_test())
sync_test()
|