diff options
author | Alex March <alex.march.dev@gmail.com> | 2016-11-01 16:43:18 +0000 |
---|---|---|
committer | Damien George <damien.p.george@gmail.com> | 2017-03-09 16:47:41 +1100 |
commit | ce0b5e078b376dadcc33226647c91c73d6c73600 (patch) | |
tree | 7dba2da085affa8c3ae9f61c45e7bfecd65944c2 /tests/extmod/websocket.py | |
parent | 38f063ea72632e395ea59b644552bb98c962393f (diff) | |
download | micropython-ce0b5e078b376dadcc33226647c91c73d6c73600.tar.gz micropython-ce0b5e078b376dadcc33226647c91c73d6c73600.zip |
tests/extmod: Add websocket tests.
These short unit tests test the base uPy methods as well as parts of the
websocket protocol, as implemented by uPy.
@dpgeorge converted the original socket based tests by @hosaka to ones
that only require io.BytesIO.
Diffstat (limited to 'tests/extmod/websocket.py')
-rw-r--r-- | tests/extmod/websocket.py | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/tests/extmod/websocket.py b/tests/extmod/websocket.py new file mode 100644 index 0000000000..770836c8eb --- /dev/null +++ b/tests/extmod/websocket.py @@ -0,0 +1,61 @@ +try: + import uio + import uerrno + import websocket +except ImportError: + import sys + print("SKIP") + sys.exit() + +# put raw data in the stream and do a websocket read +def ws_read(msg, sz): + ws = websocket.websocket(uio.BytesIO(msg)) + return ws.read(sz) + +# do a websocket write and then return the raw data from the stream +def ws_write(msg, sz): + s = uio.BytesIO() + ws = websocket.websocket(s) + ws.write(msg) + s.seek(0) + return s.read(sz) + +# basic frame +print(ws_read(b"\x81\x04ping", 4)) +print(ws_read(b"\x80\x04ping", 4)) # FRAME_CONT +print(ws_write(b"pong", 6)) + +# split frames are not supported +# print(ws_read(b"\x01\x04ping", 4)) + +# extended payloads +print(ws_read(b'\x81~\x00\x80' + b'ping' * 32, 128)) +print(ws_write(b"pong" * 32, 132)) + +# mask (returned data will be 'mask' ^ 'mask') +print(ws_read(b"\x81\x84maskmask", 4)) + +# close control frame +s = uio.BytesIO(b'\x88\x00') # FRAME_CLOSE +ws = websocket.websocket(s) +print(ws.read(1)) +s.seek(2) +print(s.read(4)) + +# misc control frames +print(ws_read(b"\x89\x00\x81\x04ping", 4)) # FRAME_PING +print(ws_read(b"\x8a\x00\x81\x04pong", 4)) # FRAME_PONG + +# close method +ws = websocket.websocket(uio.BytesIO()) +ws.close() + +# ioctl +ws = websocket.websocket(uio.BytesIO()) +print(ws.ioctl(8)) # GET_DATA_OPTS +print(ws.ioctl(9, 2)) # SET_DATA_OPTS +print(ws.ioctl(9)) +try: + ws.ioctl(-1) +except OSError as e: + print("ioctl: EINVAL:", e.args[0] == uerrno.EINVAL) |