summaryrefslogtreecommitdiffstatshomepage
path: root/examples/bluetooth/ble_uart_peripheral.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/bluetooth/ble_uart_peripheral.py')
-rw-r--r--examples/bluetooth/ble_uart_peripheral.py78
1 files changed, 78 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_uart_peripheral.py b/examples/bluetooth/ble_uart_peripheral.py
new file mode 100644
index 0000000000..32015b69fd
--- /dev/null
+++ b/examples/bluetooth/ble_uart_peripheral.py
@@ -0,0 +1,78 @@
+# This example demonstrates a peripheral implementing the Nordic UART Service (NUS).
+
+import bluetooth
+from ble_advertising import advertising_payload
+
+from micropython import const
+_IRQ_CENTRAL_CONNECT = const(1 << 0)
+_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
+_IRQ_GATTS_WRITE = const(1 << 2)
+
+_UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
+_UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_NOTIFY,)
+_UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,)
+_UART_SERVICE = (_UART_UUID, (_UART_TX, _UART_RX,),)
+
+# org.bluetooth.characteristic.gap.appearance.xml
+_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)
+
+class BLEUART:
+ def __init__(self, ble, name='mpy-uart', rxbuf=100):
+ self._ble = ble
+ self._ble.active(True)
+ self._ble.irq(handler=self._irq)
+ ((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services((_UART_SERVICE,))
+ # Increase the size of the rx buffer.
+ self._ble.gatts_write(self._rx_handle, bytes(rxbuf))
+ self._connections = set()
+ self._rx_buffer = bytearray()
+ self._handler = None
+ self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER)
+ self._advertise()
+
+ def irq(self, handler):
+ self._handler = handler
+
+ def _irq(self, event, data):
+ # Track connections so we can send notifications.
+ if event == _IRQ_CENTRAL_CONNECT:
+ conn_handle, _, _, = data
+ self._connections.add(conn_handle)
+ elif event == _IRQ_CENTRAL_DISCONNECT:
+ conn_handle, _, _, = data
+ if conn_handle in self._connections:
+ self._connections.remove(conn_handle)
+ # Start advertising again to allow a new connection.
+ self._advertise()
+ elif event == _IRQ_GATTS_WRITE:
+ conn_handle, value_handle, = data
+ if conn_handle in self._connections and value_handle == self._rx_handle:
+ self._rx_buffer += self._ble.gatts_read(self._rx_handle)
+ if self._handler:
+ self._handler()
+
+ def any(self):
+ return len(self._rx_buffer)
+
+ def read(self, sz=None):
+ if not sz:
+ sz = len(self._rx_buffer)
+ result = self._rx_buffer[0:sz]
+ self._rx_buffer = self._rx_buffer[sz:]
+ return result
+
+ def write(self, data):
+ for conn_handle in self._connections:
+ self._ble.gatts_notify(conn_handle, self._tx_handle, data)
+
+ def close(self):
+ for conn_handle in self._connections:
+ self._ble.gap_disconnect(conn_handle)
+ self._connections.clear()
+
+ def _advertise(self, interval_us=500000):
+ self._ble.gap_advertise(interval_us, adv_data=self._payload)
+
+
+# ble = bluetooth.BLE()
+# uart = BLEUART(ble)