summaryrefslogtreecommitdiffstatshomepage
path: root/tests/thread/stress_schedule.py
blob: 97876f0f77ca8987fe694b3e0680d642751cec7f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# This test ensures that the scheduler doesn't trigger any assertions
# while dealing with concurrent access from multiple threads.

import _thread
import time
import micropython
import gc

try:
    micropython.schedule
except AttributeError:
    print("SKIP")
    raise SystemExit

gc.disable()

_NUM_TASKS = 10000
_TIMEOUT_MS = 10000

n = 0  # How many times the task successfully ran.
t = None  # Start time of test, assigned here to preallocate entry in globals dict.
thread_run = True  # If the thread should continue running.


def task(x):
    global n
    n += 1


def thread():
    while thread_run:
        try:
            micropython.schedule(task, None)
        except RuntimeError:
            # Queue full, back off.
            time.sleep_ms(10)


for i in range(8):
    try:
        _thread.start_new_thread(thread, ())
    except OSError:
        # System cannot create a new thead, so stop trying to create them.
        break

# Wait up to 10 seconds for 10000 tasks to be scheduled.
t = time.ticks_ms()
while n < _NUM_TASKS and time.ticks_diff(time.ticks_ms(), t) < _TIMEOUT_MS:
    pass

# Stop all threads.
thread_run = False
time.sleep_ms(20)

gc.enable()

if n < _NUM_TASKS:
    # Not all the tasks were scheduled, likely the scheduler stopped working.
    print(n)
else:
    print("PASS")