summaryrefslogtreecommitdiffstatshomepage
path: root/tests/extmod/select_ipoll.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/extmod/select_ipoll.py')
-rw-r--r--tests/extmod/select_ipoll.py55
1 files changed, 55 insertions, 0 deletions
diff --git a/tests/extmod/select_ipoll.py b/tests/extmod/select_ipoll.py
new file mode 100644
index 0000000000..0b661c11c8
--- /dev/null
+++ b/tests/extmod/select_ipoll.py
@@ -0,0 +1,55 @@
+# Test select.ipoll().
+
+try:
+ import socket, select
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+
+def print_poll_output(lst):
+ print([(type(obj), flags) for obj, flags in lst])
+
+
+poller = select.poll()
+
+# Use a new UDP socket for tests, which should be writable but not readable.
+try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
+except OSError:
+ print("SKIP")
+ raise SystemExit
+
+poller.register(s)
+
+# Basic polling.
+print_poll_output(poller.ipoll(0))
+
+# Pass in flags=1 for one-shot behaviour.
+print_poll_output(poller.ipoll(0, 1))
+
+# Socket should be deregistered and poll should return nothing.
+print_poll_output(poller.ipoll(0))
+
+# Create a second socket.
+s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+s2.bind(socket.getaddrinfo("127.0.0.1", 8001)[0][-1])
+
+# Register both sockets (to reset the first one).
+poller.register(s)
+poller.register(s2)
+
+# Basic polling with two sockets.
+print_poll_output(poller.ipoll(0))
+
+# Unregister the first socket, to test polling the remaining one.
+poller.unregister(s)
+print_poll_output(poller.ipoll(0))
+
+# Unregister the second socket, to test polling none.
+poller.unregister(s2)
+print_poll_output(poller.ipoll(0))
+
+s2.close()
+s.close()