summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--tests/multi_pyb_can/rx_callback.py98
-rw-r--r--tests/multi_pyb_can/rx_callback.py.exp9
-rw-r--r--tests/multi_pyb_can/rx_filters.py50
-rw-r--r--tests/multi_pyb_can/rx_filters.py.exp15
4 files changed, 172 insertions, 0 deletions
diff --git a/tests/multi_pyb_can/rx_callback.py b/tests/multi_pyb_can/rx_callback.py
new file mode 100644
index 0000000000..960a225c93
--- /dev/null
+++ b/tests/multi_pyb_can/rx_callback.py
@@ -0,0 +1,98 @@
+from pyb import CAN
+import time
+import errno
+
+# Test the various receive IRQs, including overflow
+
+rx_overflow = False
+
+REASONS = ["received", "full", "overflow"]
+
+# CAN IDs
+ID_SPAM = 0x345 # messages spammed into the receive FIFO
+ID_ACK_OFLOW = 0x055 # message the receiver sends after it's seen an overflow
+ID_AFTER = 0x100 # message the sender sends after the ACK
+
+
+def cb0(bus, reason):
+ global rx_overflow
+ if reason != 0 and not rx_overflow:
+ # exact timing of 'received' callbacks depends on controller type,
+ # so only log the other two
+ print("rx0 reason", REASONS[reason])
+ if reason == 2:
+ rx_overflow = True
+
+
+# Accept all standard IDs on FIFO 0
+def _enable_accept_all():
+ if hasattr(CAN, "MASK"): # FD-CAN controller
+ can.setfilter(0, CAN.RANGE, 0, (0x0, 0x7FF), extframe=False)
+ else:
+ can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0), extframe=False)
+
+
+# Receiver
+def instance0():
+ _enable_accept_all()
+ can.rxcallback(0, cb0)
+
+ multitest.next()
+ multitest.wait("sender ready")
+ multitest.broadcast("receiver ready")
+
+ while not rx_overflow:
+ pass # Resume ASAP after FIFO0 overflows
+
+ can.send(b"overflow", ID_ACK_OFLOW)
+
+ # drain the receive FIFO, making sure we read at least on ID_SPAM message
+ rxed_spam = False
+ while can.any(0):
+ msg = can.recv(0, timeout=0)
+ assert msg[0] == ID_SPAM
+ rxed_spam = True
+ print("rxed_spam", rxed_spam)
+
+ # This should be the one message with ID_AFTER, there may be one or two spam messages as well
+ for _ in range(10):
+ msg = can.recv(0, timeout=500)
+ if msg[0] == ID_AFTER:
+ print(msg)
+ break
+
+ # RX FIFO should be empty now
+ print("any", can.any(0))
+
+
+# Sender
+def instance1():
+ _enable_accept_all()
+ multitest.next()
+ multitest.broadcast("sender ready")
+ multitest.wait("receiver ready")
+
+ # Spam out messages until the receiver tells us its RX FIFO is full.
+ #
+ # The RX FIFO on the receiver can vary from 3 deep (BXCAN) to 25 deep (STM32H7),
+ # so we keep sending to it until we see a CAN message on ID_ACK_OFLOW indicating
+ # the receiver's FIFO has overflowed
+ for i in range(255):
+ can.send(bytes([i] * 8), ID_SPAM, timeout=25)
+ if can.any(0):
+ print(can.recv(0)) # should be ID_ACK_OFLOW
+ break
+ # on boards like STM32H7 the TX FIFO is really deep, so don't fill it too quickly...
+ time.sleep_ms(1)
+
+ # give the receiver some time to make space in the FIFO
+ time.sleep_ms(200)
+
+ # send the final message, the receiver should get this one
+ can.send(b"aaaaa", ID_AFTER)
+
+ # Sender's RX FIFO should also be empty at this point
+ print("any", can.any(0))
+
+
+can = CAN(1, CAN.NORMAL, baudrate=500_000, sample_point=75)
diff --git a/tests/multi_pyb_can/rx_callback.py.exp b/tests/multi_pyb_can/rx_callback.py.exp
new file mode 100644
index 0000000000..3ab197cc10
--- /dev/null
+++ b/tests/multi_pyb_can/rx_callback.py.exp
@@ -0,0 +1,9 @@
+--- instance0 ---
+rx0 reason full
+rx0 reason overflow
+rxed_spam True
+(256, False, False, 0, b'aaaaa')
+any False
+--- instance1 ---
+(85, False, False, 0, b'overflow')
+any False
diff --git a/tests/multi_pyb_can/rx_filters.py b/tests/multi_pyb_can/rx_filters.py
new file mode 100644
index 0000000000..22e5eb0364
--- /dev/null
+++ b/tests/multi_pyb_can/rx_filters.py
@@ -0,0 +1,50 @@
+from pyb import CAN
+import time
+import errno
+
+# Test for the filtering capabilities for RX FIFO 0 and 1.
+
+
+# Receiver
+def instance0():
+ # Configure to receive standard frames (in a range) on FIFO 0
+ # and extended frames (in a range) on FIFO 1.
+ if hasattr(CAN, "MASK"):
+ # FD-CAN has independent filter banks for standard and extended IDs
+ can.setfilter(0, CAN.MASK, 0, (0x300, 0x700), extframe=False)
+ can.setfilter(0, CAN.MASK, 1, (0x3000, 0x7000), extframe=True)
+ else:
+ # pyb.CAN only supports MASK32 for extended ids
+ can.setfilter(0, CAN.MASK16, 0, (0x300, 0x700, 0x300, 0x700), extframe=False)
+ can.setfilter(1, CAN.MASK32, 1, (0x3000, 0x7000), extframe=True)
+
+ multitest.next()
+ multitest.wait("sender ready")
+ multitest.broadcast("receiver ready")
+ for i in range(3):
+ print(i)
+ print("fifo0", can.recv(0, timeout=200))
+ print("fifo1", can.recv(1, timeout=200))
+
+ try:
+ can.recv(0, timeout=100) # should time out
+ except OSError as e:
+ assert e.errno == errno.ETIMEDOUT
+ print("Timed out as expected")
+
+
+# Sender
+def instance1():
+ multitest.next()
+ multitest.broadcast("sender ready")
+ multitest.wait("receiver ready")
+
+ for i in range(3):
+ print(i)
+ can.send(bytes([i, 3] * i), 0x345)
+ can.send(bytes([0xEE] * i), 0x3700 + i, extframe=True)
+ can.send(b"abcdef", 0x123) # matches no filter, expect ACKed but not received
+ time.sleep_ms(5) # avoid flooding either our or the receiver's FIFO
+
+
+can = CAN(1, CAN.NORMAL, baudrate=500_000, sample_point=75)
diff --git a/tests/multi_pyb_can/rx_filters.py.exp b/tests/multi_pyb_can/rx_filters.py.exp
new file mode 100644
index 0000000000..65fbdf4b1b
--- /dev/null
+++ b/tests/multi_pyb_can/rx_filters.py.exp
@@ -0,0 +1,15 @@
+--- instance0 ---
+0
+fifo0 (837, False, False, 0, b'')
+fifo1 (14080, True, False, 0, b'')
+1
+fifo0 (837, False, False, 0, b'\x01\x03')
+fifo1 (14081, True, False, 0, b'\xee')
+2
+fifo0 (837, False, False, 0, b'\x02\x03\x02\x03')
+fifo1 (14082, True, False, 0, b'\xee\xee')
+Timed out as expected
+--- instance1 ---
+0
+1
+2