summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--drivers/nrf24l01/nrf24l01.py234
-rw-r--r--drivers/nrf24l01/nrf24l01test.py100
2 files changed, 334 insertions, 0 deletions
diff --git a/drivers/nrf24l01/nrf24l01.py b/drivers/nrf24l01/nrf24l01.py
new file mode 100644
index 0000000000..255aade83f
--- /dev/null
+++ b/drivers/nrf24l01/nrf24l01.py
@@ -0,0 +1,234 @@
+"""NRF24L01 driver for Micro Python"""
+
+import pyb
+
+# nRF24L01+ registers
+CONFIG = const(0x00)
+EN_RXADDR = const(0x02)
+SETUP_AW = const(0x03)
+SETUP_RETR = const(0x04)
+RF_CH = const(0x05)
+RF_SETUP = const(0x06)
+STATUS = const(0x07)
+OBSERVE_TX = const(0x08)
+RX_ADDR_P0 = const(0x0a)
+TX_ADDR = const(0x10)
+RX_PW_P0 = const(0x11)
+FIFO_STATUS = const(0x17)
+DYNPD = const(0x1c)
+
+# CONFIG register
+EN_CRC = const(0x08) # enable CRC
+CRCO = const(0x04) # CRC encoding scheme; 0=1 byte, 1=2 bytes
+PWR_UP = const(0x02) # 1=power up, 0=power down
+PRIM_RX = const(0x01) # RX/TX control; 0=PTX, 1=PRX
+
+# RF_SETUP register
+POWER_0 = const(0x00) # -18 dBm
+POWER_1 = const(0x02) # -12 dBm
+POWER_2 = const(0x04) # -6 dBm
+POWER_3 = const(0x06) # 0 dBm
+SPEED_1M = const(0x00)
+SPEED_2M = const(0x08)
+SPEED_250K = const(0x20)
+
+# STATUS register
+RX_DR = const(0x40) # RX data ready; write 1 to clear
+TX_DS = const(0x20) # TX data sent; write 1 to clear
+MAX_RT = const(0x10) # max retransmits reached; write 1 to clear
+
+# FIFO_STATUS register
+RX_EMPTY = const(0x01) # 1 if RX FIFO is empty
+
+# constants for instructions
+R_RX_PL_WID = const(0x60) # read RX payload width
+R_RX_PAYLOAD = const(0x61) # read RX payload
+W_TX_PAYLOAD = const(0xa0) # write TX payload
+FLUSH_TX = const(0xe1) # flush TX FIFO
+FLUSH_RX = const(0xe2) # flush RX FIFO
+NOP = const(0xff) # use to read STATUS register
+
+class NRF24L01:
+ def __init__(self, spi, cs, ce, channel=46, payload_size=16):
+ assert payload_size <= 32
+
+ # init the SPI bus and pins
+ spi.init(spi.MASTER, baudrate=4000000, polarity=0, phase=1, firstbit=spi.MSB)
+ cs.init(cs.OUT_PP, cs.PULL_NONE)
+ ce.init(ce.OUT_PP, ce.PULL_NONE)
+
+ # store the pins
+ self.spi = spi
+ self.cs = cs
+ self.ce = ce
+
+ # reset everything
+ self.ce.low()
+ self.cs.high()
+ self.payload_size = payload_size
+ self.pipe0_read_addr = None
+ pyb.delay(5)
+
+ # set address width to 5 bytes
+ self.reg_write(SETUP_AW, 0b11)
+
+ # disable dynamic payloads
+ self.reg_write(DYNPD, 0)
+
+ # auto retransmit delay: 1750us
+ # auto retransmit count: 8
+ self.reg_write(SETUP_RETR, (6 << 4) | 8)
+
+ # set rf power and speed
+ self.set_power_speed(POWER_3, SPEED_1M)
+
+ # init CRC
+ self.set_crc(2)
+
+ # clear status flags
+ self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
+
+ # set channel
+ self.set_channel(channel)
+
+ # flush buffers
+ self.flush_rx()
+ self.flush_tx()
+
+ def reg_read(self, reg):
+ self.cs.low()
+ self.spi.send_recv(reg)
+ buf = self.spi.recv(1)
+ self.cs.high()
+ return buf[0]
+
+ def reg_read_ret_status(self, reg):
+ self.cs.low()
+ status = self.spi.send_recv(reg)[0]
+ buf = self.spi.recv(1)
+ self.cs.high()
+ return status
+
+ def reg_write(self, reg, buf):
+ self.cs.low()
+ status = self.spi.send_recv(0x20 | reg)[0]
+ self.spi.send(buf)
+ self.cs.high()
+ return status
+
+ def flush_rx(self):
+ self.cs.low()
+ self.spi.send(FLUSH_RX)
+ self.cs.high()
+
+ def flush_tx(self):
+ self.cs.low()
+ self.spi.send(FLUSH_TX)
+ self.cs.high()
+
+ # power is one of POWER_x defines; speed is one of SPEED_x defines
+ def set_power_speed(self, power, speed):
+ setup = self.reg_read(RF_SETUP) & 0b11010001
+ self.reg_write(RF_SETUP, setup | power | speed)
+
+ # length in bytes: 0, 1 or 2
+ def set_crc(self, length):
+ config = self.reg_read(CONFIG) & ~(CRCO | EN_CRC)
+ if length == 0:
+ pass
+ elif length == 1:
+ config |= EN_CRC
+ else:
+ config |= EN_CRC | CRCO
+ self.reg_write(CONFIG, config)
+
+ def set_channel(self, channel):
+ self.reg_write(RF_CH, min(channel, 127))
+
+ # address should be a bytes object 5 bytes long
+ def open_tx_pipe(self, address):
+ assert len(address) == 5
+ self.reg_write(RX_ADDR_P0, address)
+ self.reg_write(TX_ADDR, address)
+ self.reg_write(RX_PW_P0, self.payload_size)
+
+ # address should be a bytes object 5 bytes long
+ # pipe 0 and 1 have 5 byte address
+ # pipes 2-5 use same 4 most-significant bytes as pipe 1, plus 1 extra byte
+ def open_rx_pipe(self, pipe_id, address):
+ assert len(address) == 5
+ assert 0 <= pipe_id <= 5
+ if pipe_id == 0:
+ self.pipe0_read_addr = address
+ if pipe_id < 2:
+ self.reg_write(RX_ADDR_P0 + pipe_id, address)
+ else:
+ self.reg_write(RX_ADDR_P0 + pipe_id, address[0])
+ self.reg_write(RX_PW_P0 + pipe_id, self.payload_size)
+ self.reg_write(EN_RXADDR, self.reg_read(EN_RXADDR) | (1 << pipe_id))
+
+ def start_listening(self):
+ self.reg_write(CONFIG, self.reg_read(CONFIG) | PWR_UP | PRIM_RX)
+ self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
+
+ if self.pipe0_read_addr is not None:
+ self.reg_write(RX_ADDR_P0, self.pipe0_read_addr)
+
+ self.flush_rx()
+ self.flush_tx()
+ self.ce.high()
+ pyb.udelay(130)
+
+ def stop_listening(self):
+ self.ce.low()
+ self.flush_tx()
+ self.flush_rx()
+
+ # returns True if any data available to recv
+ def any(self):
+ return not bool(self.reg_read(FIFO_STATUS) & RX_EMPTY)
+
+ def recv(self):
+ # get the data
+ self.cs.low()
+ self.spi.send(R_RX_PAYLOAD)
+ buf = self.spi.recv(self.payload_size)
+ self.cs.high()
+
+ # clear RX ready flag
+ self.reg_write(STATUS, RX_DR)
+
+ return buf
+
+ def send(self, buf, timeout=500):
+ # power up
+ self.reg_write(CONFIG, (self.reg_read(CONFIG) | PWR_UP) & ~PRIM_RX)
+ pyb.udelay(150)
+
+ # send the data
+ self.cs.low()
+ self.spi.send(W_TX_PAYLOAD)
+ self.spi.send(buf)
+ if len(buf) < self.payload_size:
+ self.spi.send(b'\x00' * (self.payload_size - len(buf))) # pad out data
+ self.cs.high()
+
+ # enable the chip so it can send the data
+ self.ce.high()
+ pyb.udelay(15) # needs to be >10us
+ self.ce.low()
+
+ # blocking wait for tx complete
+ start = pyb.millis()
+ while pyb.millis() - start < timeout:
+ status = self.reg_read_ret_status(OBSERVE_TX)
+ if status & (TX_DS | MAX_RT):
+ break
+
+ # get and clear all status flags
+ status = self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
+ if not (status & TX_DS):
+ raise OSError("send failed")
+
+ # power down
+ self.reg_write(CONFIG, self.reg_read(CONFIG) & ~PWR_UP)
diff --git a/drivers/nrf24l01/nrf24l01test.py b/drivers/nrf24l01/nrf24l01test.py
new file mode 100644
index 0000000000..264bc21c8c
--- /dev/null
+++ b/drivers/nrf24l01/nrf24l01test.py
@@ -0,0 +1,100 @@
+"""Test for nrf24l01 module."""
+
+import struct
+import pyb
+from pyb import Pin, SPI
+from nrf24l01 import NRF24L01
+
+pipes = (b'\xf0\xf0\xf0\xf0\xe1', b'\xf0\xf0\xf0\xf0\xd2')
+
+def master():
+ nrf = NRF24L01(SPI(2), Pin('Y5'), Pin('Y4'), payload_size=8)
+
+ nrf.open_tx_pipe(pipes[0])
+ nrf.open_rx_pipe(1, pipes[1])
+ nrf.start_listening()
+
+ num_needed = 16
+ num_successes = 0
+ num_failures = 0
+ led_state = 0
+
+ print('NRF24L01 master mode, sending %d packets...' % num_needed)
+
+ while num_successes < num_needed and num_failures < num_needed:
+ # stop listening and send packet
+ nrf.stop_listening()
+ millis = pyb.millis()
+ led_state = max(1, (led_state << 1) & 0x0f)
+ print('sending:', millis, led_state)
+ try:
+ nrf.send(struct.pack('ii', millis, led_state))
+ except OSError:
+ pass
+
+ # start listening again
+ nrf.start_listening()
+
+ # wait for response, with 250ms timeout
+ start_time = pyb.millis()
+ timeout = False
+ while not nrf.any() and not timeout:
+ if pyb.elapsed_millis(start_time) > 250:
+ timeout = True
+
+ if timeout:
+ print('failed, respones timed out')
+ num_failures += 1
+
+ else:
+ # recv packet
+ got_millis, = struct.unpack('i', nrf.recv())
+
+ # print response and round-trip delay
+ print('got response:', got_millis, '(delay', pyb.millis() - got_millis, 'ms)')
+ num_successes += 1
+
+ # delay then loop
+ pyb.delay(250)
+
+ print('master finished sending; succeses=%d, failures=%d' % (num_successes, num_failures))
+
+def slave():
+ nrf = NRF24L01(SPI(2), Pin('Y5'), Pin('Y4'), payload_size=8)
+
+ nrf.open_tx_pipe(pipes[1])
+ nrf.open_rx_pipe(1, pipes[0])
+ nrf.start_listening()
+
+ print('NRF24L01 slave mode, waiting for packets... (ctrl-C to stop)')
+
+ while True:
+ pyb.wfi()
+ if nrf.any():
+ while nrf.any():
+ buf = nrf.recv()
+ millis, led_state = struct.unpack('ii', buf)
+ print('received:', millis, led_state)
+ for i in range(4):
+ if led_state & (1 << i):
+ pyb.LED(i + 1).on()
+ else:
+ pyb.LED(i + 1).off()
+ pyb.delay(15)
+
+ nrf.stop_listening()
+ try:
+ nrf.send(struct.pack('i', millis))
+ except OSError:
+ pass
+ print('sent response')
+ nrf.start_listening()
+
+print('NRF24L01 test module loaded')
+print('NRF24L01 pinout for test:')
+print(' CE on Y4')
+print(' CSN on Y5')
+print(' SCK on Y6')
+print(' MISO on Y7')
+print(' MOSI on Y8')
+print('run nrf24l01test.slave() on slave, then nrf24l01test.master() on master')