diff options
Diffstat (limited to 'examples/bluetooth/ble_uart_peripheral.py')
-rw-r--r-- | examples/bluetooth/ble_uart_peripheral.py | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_uart_peripheral.py b/examples/bluetooth/ble_uart_peripheral.py new file mode 100644 index 0000000000..32015b69fd --- /dev/null +++ b/examples/bluetooth/ble_uart_peripheral.py @@ -0,0 +1,78 @@ +# This example demonstrates a peripheral implementing the Nordic UART Service (NUS). + +import bluetooth +from ble_advertising import advertising_payload + +from micropython import const +_IRQ_CENTRAL_CONNECT = const(1 << 0) +_IRQ_CENTRAL_DISCONNECT = const(1 << 1) +_IRQ_GATTS_WRITE = const(1 << 2) + +_UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E') +_UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_NOTIFY,) +_UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,) +_UART_SERVICE = (_UART_UUID, (_UART_TX, _UART_RX,),) + +# org.bluetooth.characteristic.gap.appearance.xml +_ADV_APPEARANCE_GENERIC_COMPUTER = const(128) + +class BLEUART: + def __init__(self, ble, name='mpy-uart', rxbuf=100): + self._ble = ble + self._ble.active(True) + self._ble.irq(handler=self._irq) + ((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services((_UART_SERVICE,)) + # Increase the size of the rx buffer. + self._ble.gatts_write(self._rx_handle, bytes(rxbuf)) + self._connections = set() + self._rx_buffer = bytearray() + self._handler = None + self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER) + self._advertise() + + def irq(self, handler): + self._handler = handler + + def _irq(self, event, data): + # Track connections so we can send notifications. + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, _, _, = data + self._connections.add(conn_handle) + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _, = data + if conn_handle in self._connections: + self._connections.remove(conn_handle) + # Start advertising again to allow a new connection. + self._advertise() + elif event == _IRQ_GATTS_WRITE: + conn_handle, value_handle, = data + if conn_handle in self._connections and value_handle == self._rx_handle: + self._rx_buffer += self._ble.gatts_read(self._rx_handle) + if self._handler: + self._handler() + + def any(self): + return len(self._rx_buffer) + + def read(self, sz=None): + if not sz: + sz = len(self._rx_buffer) + result = self._rx_buffer[0:sz] + self._rx_buffer = self._rx_buffer[sz:] + return result + + def write(self, data): + for conn_handle in self._connections: + self._ble.gatts_notify(conn_handle, self._tx_handle, data) + + def close(self): + for conn_handle in self._connections: + self._ble.gap_disconnect(conn_handle) + self._connections.clear() + + def _advertise(self, interval_us=500000): + self._ble.gap_advertise(interval_us, adv_data=self._payload) + + +# ble = bluetooth.BLE() +# uart = BLEUART(ble) |