summaryrefslogtreecommitdiffstatshomepage
path: root/tests/multi_pyb_can/rx_callback.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/multi_pyb_can/rx_callback.py')
-rw-r--r--tests/multi_pyb_can/rx_callback.py98
1 files changed, 98 insertions, 0 deletions
diff --git a/tests/multi_pyb_can/rx_callback.py b/tests/multi_pyb_can/rx_callback.py
new file mode 100644
index 0000000000..960a225c93
--- /dev/null
+++ b/tests/multi_pyb_can/rx_callback.py
@@ -0,0 +1,98 @@
+from pyb import CAN
+import time
+import errno
+
+# Test the various receive IRQs, including overflow
+
+rx_overflow = False
+
+REASONS = ["received", "full", "overflow"]
+
+# CAN IDs
+ID_SPAM = 0x345 # messages spammed into the receive FIFO
+ID_ACK_OFLOW = 0x055 # message the receiver sends after it's seen an overflow
+ID_AFTER = 0x100 # message the sender sends after the ACK
+
+
+def cb0(bus, reason):
+ global rx_overflow
+ if reason != 0 and not rx_overflow:
+ # exact timing of 'received' callbacks depends on controller type,
+ # so only log the other two
+ print("rx0 reason", REASONS[reason])
+ if reason == 2:
+ rx_overflow = True
+
+
+# Accept all standard IDs on FIFO 0
+def _enable_accept_all():
+ if hasattr(CAN, "MASK"): # FD-CAN controller
+ can.setfilter(0, CAN.RANGE, 0, (0x0, 0x7FF), extframe=False)
+ else:
+ can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0), extframe=False)
+
+
+# Receiver
+def instance0():
+ _enable_accept_all()
+ can.rxcallback(0, cb0)
+
+ multitest.next()
+ multitest.wait("sender ready")
+ multitest.broadcast("receiver ready")
+
+ while not rx_overflow:
+ pass # Resume ASAP after FIFO0 overflows
+
+ can.send(b"overflow", ID_ACK_OFLOW)
+
+ # drain the receive FIFO, making sure we read at least on ID_SPAM message
+ rxed_spam = False
+ while can.any(0):
+ msg = can.recv(0, timeout=0)
+ assert msg[0] == ID_SPAM
+ rxed_spam = True
+ print("rxed_spam", rxed_spam)
+
+ # This should be the one message with ID_AFTER, there may be one or two spam messages as well
+ for _ in range(10):
+ msg = can.recv(0, timeout=500)
+ if msg[0] == ID_AFTER:
+ print(msg)
+ break
+
+ # RX FIFO should be empty now
+ print("any", can.any(0))
+
+
+# Sender
+def instance1():
+ _enable_accept_all()
+ multitest.next()
+ multitest.broadcast("sender ready")
+ multitest.wait("receiver ready")
+
+ # Spam out messages until the receiver tells us its RX FIFO is full.
+ #
+ # The RX FIFO on the receiver can vary from 3 deep (BXCAN) to 25 deep (STM32H7),
+ # so we keep sending to it until we see a CAN message on ID_ACK_OFLOW indicating
+ # the receiver's FIFO has overflowed
+ for i in range(255):
+ can.send(bytes([i] * 8), ID_SPAM, timeout=25)
+ if can.any(0):
+ print(can.recv(0)) # should be ID_ACK_OFLOW
+ break
+ # on boards like STM32H7 the TX FIFO is really deep, so don't fill it too quickly...
+ time.sleep_ms(1)
+
+ # give the receiver some time to make space in the FIFO
+ time.sleep_ms(200)
+
+ # send the final message, the receiver should get this one
+ can.send(b"aaaaa", ID_AFTER)
+
+ # Sender's RX FIFO should also be empty at this point
+ print("any", can.any(0))
+
+
+can = CAN(1, CAN.NORMAL, baudrate=500_000, sample_point=75)