summaryrefslogtreecommitdiffstatshomepage
path: root/tests/extmod/select_ipoll.py
blob: 0b661c11c83313b29642648d6570086e27ae39ca (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# Test select.ipoll().

try:
    import socket, select
except ImportError:
    print("SKIP")
    raise SystemExit


def print_poll_output(lst):
    print([(type(obj), flags) for obj, flags in lst])


poller = select.poll()

# Use a new UDP socket for tests, which should be writable but not readable.
try:
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
except OSError:
    print("SKIP")
    raise SystemExit

poller.register(s)

# Basic polling.
print_poll_output(poller.ipoll(0))

# Pass in flags=1 for one-shot behaviour.
print_poll_output(poller.ipoll(0, 1))

# Socket should be deregistered and poll should return nothing.
print_poll_output(poller.ipoll(0))

# Create a second socket.
s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s2.bind(socket.getaddrinfo("127.0.0.1", 8001)[0][-1])

# Register both sockets (to reset the first one).
poller.register(s)
poller.register(s2)

# Basic polling with two sockets.
print_poll_output(poller.ipoll(0))

# Unregister the first socket, to test polling the remaining one.
poller.unregister(s)
print_poll_output(poller.ipoll(0))

# Unregister the second socket, to test polling none.
poller.unregister(s2)
print_poll_output(poller.ipoll(0))

s2.close()
s.close()