diff options
-rw-r--r-- | tests/multi_pyb_can/rx_callback.py | 98 | ||||
-rw-r--r-- | tests/multi_pyb_can/rx_callback.py.exp | 9 | ||||
-rw-r--r-- | tests/multi_pyb_can/rx_filters.py | 50 | ||||
-rw-r--r-- | tests/multi_pyb_can/rx_filters.py.exp | 15 |
4 files changed, 172 insertions, 0 deletions
diff --git a/tests/multi_pyb_can/rx_callback.py b/tests/multi_pyb_can/rx_callback.py new file mode 100644 index 0000000000..960a225c93 --- /dev/null +++ b/tests/multi_pyb_can/rx_callback.py @@ -0,0 +1,98 @@ +from pyb import CAN +import time +import errno + +# Test the various receive IRQs, including overflow + +rx_overflow = False + +REASONS = ["received", "full", "overflow"] + +# CAN IDs +ID_SPAM = 0x345 # messages spammed into the receive FIFO +ID_ACK_OFLOW = 0x055 # message the receiver sends after it's seen an overflow +ID_AFTER = 0x100 # message the sender sends after the ACK + + +def cb0(bus, reason): + global rx_overflow + if reason != 0 and not rx_overflow: + # exact timing of 'received' callbacks depends on controller type, + # so only log the other two + print("rx0 reason", REASONS[reason]) + if reason == 2: + rx_overflow = True + + +# Accept all standard IDs on FIFO 0 +def _enable_accept_all(): + if hasattr(CAN, "MASK"): # FD-CAN controller + can.setfilter(0, CAN.RANGE, 0, (0x0, 0x7FF), extframe=False) + else: + can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0), extframe=False) + + +# Receiver +def instance0(): + _enable_accept_all() + can.rxcallback(0, cb0) + + multitest.next() + multitest.wait("sender ready") + multitest.broadcast("receiver ready") + + while not rx_overflow: + pass # Resume ASAP after FIFO0 overflows + + can.send(b"overflow", ID_ACK_OFLOW) + + # drain the receive FIFO, making sure we read at least on ID_SPAM message + rxed_spam = False + while can.any(0): + msg = can.recv(0, timeout=0) + assert msg[0] == ID_SPAM + rxed_spam = True + print("rxed_spam", rxed_spam) + + # This should be the one message with ID_AFTER, there may be one or two spam messages as well + for _ in range(10): + msg = can.recv(0, timeout=500) + if msg[0] == ID_AFTER: + print(msg) + break + + # RX FIFO should be empty now + print("any", can.any(0)) + + +# Sender +def instance1(): + _enable_accept_all() + multitest.next() + multitest.broadcast("sender ready") + multitest.wait("receiver ready") + + # Spam out messages until the receiver tells us its RX FIFO is full. + # + # The RX FIFO on the receiver can vary from 3 deep (BXCAN) to 25 deep (STM32H7), + # so we keep sending to it until we see a CAN message on ID_ACK_OFLOW indicating + # the receiver's FIFO has overflowed + for i in range(255): + can.send(bytes([i] * 8), ID_SPAM, timeout=25) + if can.any(0): + print(can.recv(0)) # should be ID_ACK_OFLOW + break + # on boards like STM32H7 the TX FIFO is really deep, so don't fill it too quickly... + time.sleep_ms(1) + + # give the receiver some time to make space in the FIFO + time.sleep_ms(200) + + # send the final message, the receiver should get this one + can.send(b"aaaaa", ID_AFTER) + + # Sender's RX FIFO should also be empty at this point + print("any", can.any(0)) + + +can = CAN(1, CAN.NORMAL, baudrate=500_000, sample_point=75) diff --git a/tests/multi_pyb_can/rx_callback.py.exp b/tests/multi_pyb_can/rx_callback.py.exp new file mode 100644 index 0000000000..3ab197cc10 --- /dev/null +++ b/tests/multi_pyb_can/rx_callback.py.exp @@ -0,0 +1,9 @@ +--- instance0 --- +rx0 reason full +rx0 reason overflow +rxed_spam True +(256, False, False, 0, b'aaaaa') +any False +--- instance1 --- +(85, False, False, 0, b'overflow') +any False diff --git a/tests/multi_pyb_can/rx_filters.py b/tests/multi_pyb_can/rx_filters.py new file mode 100644 index 0000000000..22e5eb0364 --- /dev/null +++ b/tests/multi_pyb_can/rx_filters.py @@ -0,0 +1,50 @@ +from pyb import CAN +import time +import errno + +# Test for the filtering capabilities for RX FIFO 0 and 1. + + +# Receiver +def instance0(): + # Configure to receive standard frames (in a range) on FIFO 0 + # and extended frames (in a range) on FIFO 1. + if hasattr(CAN, "MASK"): + # FD-CAN has independent filter banks for standard and extended IDs + can.setfilter(0, CAN.MASK, 0, (0x300, 0x700), extframe=False) + can.setfilter(0, CAN.MASK, 1, (0x3000, 0x7000), extframe=True) + else: + # pyb.CAN only supports MASK32 for extended ids + can.setfilter(0, CAN.MASK16, 0, (0x300, 0x700, 0x300, 0x700), extframe=False) + can.setfilter(1, CAN.MASK32, 1, (0x3000, 0x7000), extframe=True) + + multitest.next() + multitest.wait("sender ready") + multitest.broadcast("receiver ready") + for i in range(3): + print(i) + print("fifo0", can.recv(0, timeout=200)) + print("fifo1", can.recv(1, timeout=200)) + + try: + can.recv(0, timeout=100) # should time out + except OSError as e: + assert e.errno == errno.ETIMEDOUT + print("Timed out as expected") + + +# Sender +def instance1(): + multitest.next() + multitest.broadcast("sender ready") + multitest.wait("receiver ready") + + for i in range(3): + print(i) + can.send(bytes([i, 3] * i), 0x345) + can.send(bytes([0xEE] * i), 0x3700 + i, extframe=True) + can.send(b"abcdef", 0x123) # matches no filter, expect ACKed but not received + time.sleep_ms(5) # avoid flooding either our or the receiver's FIFO + + +can = CAN(1, CAN.NORMAL, baudrate=500_000, sample_point=75) diff --git a/tests/multi_pyb_can/rx_filters.py.exp b/tests/multi_pyb_can/rx_filters.py.exp new file mode 100644 index 0000000000..65fbdf4b1b --- /dev/null +++ b/tests/multi_pyb_can/rx_filters.py.exp @@ -0,0 +1,15 @@ +--- instance0 --- +0 +fifo0 (837, False, False, 0, b'') +fifo1 (14080, True, False, 0, b'') +1 +fifo0 (837, False, False, 0, b'\x01\x03') +fifo1 (14081, True, False, 0, b'\xee') +2 +fifo0 (837, False, False, 0, b'\x02\x03\x02\x03') +fifo1 (14082, True, False, 0, b'\xee\xee') +Timed out as expected +--- instance1 --- +0 +1 +2 |