summaryrefslogtreecommitdiffstatshomepage
path: root/tests/extmod/select_poll_eintr.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/extmod/select_poll_eintr.py')
-rw-r--r--tests/extmod/select_poll_eintr.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/tests/extmod/select_poll_eintr.py b/tests/extmod/select_poll_eintr.py
new file mode 100644
index 0000000000..f53efcc83e
--- /dev/null
+++ b/tests/extmod/select_poll_eintr.py
@@ -0,0 +1,47 @@
+# Test interruption of select.poll by EINTR signal, when
+# MICROPY_PY_SELECT_POSIX_OPTIMISATIONS is enabled.
+
+try:
+ import time, gc, select, socket, _thread
+
+ time.time_ns # Check for time_ns on MicroPython
+ select.poll # Raises AttributeError for CPython implementations without poll()
+except (ImportError, AttributeError):
+ print("SKIP")
+ raise SystemExit
+
+
+def thread_main():
+ lock.acquire()
+ time.sleep(0.2)
+ print("thread gc start")
+ # The unix gc.collect() implementation will raise EINTR on other threads.
+ # Could possibly use _thread._interrupt_main() instead if MicroPython had it.
+ gc.collect()
+ print("thread gc end")
+
+
+# Start a thread to interrupt the main thread during its call to poll.
+lock = _thread.allocate_lock()
+lock.acquire()
+_thread.start_new_thread(thread_main, ())
+
+# Use a new UDP socket for tests, which should be writable but not readable.
+s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
+
+# Create the poller object.
+poller = select.poll()
+poller.register(s, select.POLLIN)
+
+# Poll on the UDP socket for a set timeout, which should be reached.
+print("poll")
+lock.release()
+t0 = time.time_ns()
+result = poller.poll(400)
+dt_ms = (time.time_ns() - t0) / 1e6
+print("result:", result)
+print("dt in range:", 380 <= dt_ms <= 500)
+
+# Clean up.
+s.close()