diff options
-rw-r--r-- | examples/bluetooth/ble_advertising.py | 37 | ||||
-rw-r--r-- | examples/bluetooth/ble_temperature.py | 76 | ||||
-rw-r--r-- | examples/bluetooth/ble_uart_peripheral.py | 78 | ||||
-rw-r--r-- | examples/bluetooth/ble_uart_repl.py | 80 |
4 files changed, 271 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_advertising.py b/examples/bluetooth/ble_advertising.py new file mode 100644 index 0000000000..f52598a62a --- /dev/null +++ b/examples/bluetooth/ble_advertising.py @@ -0,0 +1,37 @@ +# Helpers for generating BLE advertising payloads. + +from micropython import const +import struct + +# Advertising payloads are repeated packets of the following form: +# 1 byte data length (N + 1) +# 1 byte type (see constants below) +# N bytes type-specific data + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_APPEARANCE = const(0x19) + +# Generate a payload to be passed to gap_advertise(adv_data=...). +def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): + payload = bytearray() + + def _append(adv_type, value): + nonlocal payload + payload += struct.pack('BB', len(value) + 1, adv_type) + value + + _append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04))) + + if name: + _append(_ADV_TYPE_NAME, name) + + if services: + for uuid in services: + # TODO: Support bluetooth.UUID class. + _append(_ADV_TYPE_UUID16_COMPLETE, struct.pack('<h', uuid)) + + # See org.bluetooth.characteristic.gap.appearance.xml + _append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance)) + + return payload diff --git a/examples/bluetooth/ble_temperature.py b/examples/bluetooth/ble_temperature.py new file mode 100644 index 0000000000..379138e746 --- /dev/null +++ b/examples/bluetooth/ble_temperature.py @@ -0,0 +1,76 @@ +# This example demonstrates a simple temperature sensor peripheral. +# +# The sensor's local value updates every second, and it will notify +# any connected central every 10 seconds. + +import bluetooth +import random +import struct +import time +from ble_advertising import advertising_payload + +from micropython import const +_IRQ_CENTRAL_CONNECT = const(1 << 0) +_IRQ_CENTRAL_DISCONNECT = const(1 << 1) + +# org.bluetooth.service.environmental_sensing +_ENV_SENSE_UUID = bluetooth.UUID(0x181A) +# org.bluetooth.characteristic.temperature +_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,) +_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),) + +# org.bluetooth.characteristic.gap.appearance.xml +_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768) + +class BLETemperature: + def __init__(self, ble, name='mpy-temp'): + self._ble = ble + self._ble.active(True) + self._ble.irq(handler=self._irq) + ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,)) + self._connections = set() + self._payload = advertising_payload(name=name, services=[0x181A], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER) + self._advertise() + + def _irq(self, event, data): + # Track connections so we can send notifications. + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, _, _, = data + self._connections.add(conn_handle) + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _, = data + self._connections.remove(conn_handle) + # Start advertising again to allow a new connection. + self._advertise() + + def set_temperature(self, temp_deg_c, notify=False): + # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius. + # Write the local value, ready for a central to read. + self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100))) + if notify: + for conn_handle in self._connections: + # Notify connected centrals to issue a read. + self._ble.gatts_notify(conn_handle, self._handle) + + def _advertise(self, interval_us=500000): + self._ble.gap_advertise(interval_us, adv_data=self._payload) + + +def demo(): + ble = bluetooth.BLE() + temp = BLETemperature(ble) + + t = 25 + i = 0 + + while True: + # Write every second, notify every 10 seconds. + i = (i + 1) % 10 + temp.set_temperature(t, notify=i == 0) + # Random walk the temperature. + t += random.uniform(-0.5, 0.5) + time.sleep_ms(1000) + + +if __name__ == '__main__': + demo() diff --git a/examples/bluetooth/ble_uart_peripheral.py b/examples/bluetooth/ble_uart_peripheral.py new file mode 100644 index 0000000000..32015b69fd --- /dev/null +++ b/examples/bluetooth/ble_uart_peripheral.py @@ -0,0 +1,78 @@ +# This example demonstrates a peripheral implementing the Nordic UART Service (NUS). + +import bluetooth +from ble_advertising import advertising_payload + +from micropython import const +_IRQ_CENTRAL_CONNECT = const(1 << 0) +_IRQ_CENTRAL_DISCONNECT = const(1 << 1) +_IRQ_GATTS_WRITE = const(1 << 2) + +_UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E') +_UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_NOTIFY,) +_UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,) +_UART_SERVICE = (_UART_UUID, (_UART_TX, _UART_RX,),) + +# org.bluetooth.characteristic.gap.appearance.xml +_ADV_APPEARANCE_GENERIC_COMPUTER = const(128) + +class BLEUART: + def __init__(self, ble, name='mpy-uart', rxbuf=100): + self._ble = ble + self._ble.active(True) + self._ble.irq(handler=self._irq) + ((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services((_UART_SERVICE,)) + # Increase the size of the rx buffer. + self._ble.gatts_write(self._rx_handle, bytes(rxbuf)) + self._connections = set() + self._rx_buffer = bytearray() + self._handler = None + self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER) + self._advertise() + + def irq(self, handler): + self._handler = handler + + def _irq(self, event, data): + # Track connections so we can send notifications. + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, _, _, = data + self._connections.add(conn_handle) + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _, = data + if conn_handle in self._connections: + self._connections.remove(conn_handle) + # Start advertising again to allow a new connection. + self._advertise() + elif event == _IRQ_GATTS_WRITE: + conn_handle, value_handle, = data + if conn_handle in self._connections and value_handle == self._rx_handle: + self._rx_buffer += self._ble.gatts_read(self._rx_handle) + if self._handler: + self._handler() + + def any(self): + return len(self._rx_buffer) + + def read(self, sz=None): + if not sz: + sz = len(self._rx_buffer) + result = self._rx_buffer[0:sz] + self._rx_buffer = self._rx_buffer[sz:] + return result + + def write(self, data): + for conn_handle in self._connections: + self._ble.gatts_notify(conn_handle, self._tx_handle, data) + + def close(self): + for conn_handle in self._connections: + self._ble.gap_disconnect(conn_handle) + self._connections.clear() + + def _advertise(self, interval_us=500000): + self._ble.gap_advertise(interval_us, adv_data=self._payload) + + +# ble = bluetooth.BLE() +# uart = BLEUART(ble) diff --git a/examples/bluetooth/ble_uart_repl.py b/examples/bluetooth/ble_uart_repl.py new file mode 100644 index 0000000000..430a571a69 --- /dev/null +++ b/examples/bluetooth/ble_uart_repl.py @@ -0,0 +1,80 @@ +# Proof-of-concept of a REPL over BLE UART. +# +# Tested with the Adafruit Bluefruit app on Android. +# Set the EoL characters to \r\n. + +import bluetooth +import io +import os +import micropython +import machine + +from ble_uart_peripheral import BLEUART + +_MP_STREAM_POLL = const(3) +_MP_STREAM_POLL_RD = const(0x0001) + +# TODO: Remove this when STM32 gets machine.Timer. +if hasattr(machine, 'Timer'): + _timer = machine.Timer(-1) +else: + _timer = None + +# Batch writes into 50ms intervals. +def schedule_in(handler, delay_ms): + def _wrap(_arg): + handler() + if _timer: + _timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap) + else: + micropython.schedule(_wrap, None) + +# Simple buffering stream to support the dupterm requirements. +class BLEUARTStream(io.IOBase): + def __init__(self, uart): + self._uart = uart + self._tx_buf = bytearray() + self._uart.irq(self._on_rx) + + def _on_rx(self): + # Needed for ESP32. + if hasattr(os, 'dupterm_notify'): + os.dupterm_notify(None) + + def read(self, sz=None): + return self._uart.read(sz) + + def readinto(self, buf): + avail = self._uart.read(len(buf)) + if not avail: + return None + for i in range(len(avail)): + buf[i] = avail[i] + return len(avail) + + def ioctl(self, op, arg): + if op == _MP_STREAM_POLL: + if self._uart.any(): + return _MP_STREAM_POLL_RD + return 0 + + def _flush(self): + data = self._tx_buf[0:100] + self._tx_buf = self._tx_buf[100:] + self._uart.write(data) + if self._tx_buf: + schedule_in(self._flush, 50) + + def write(self, buf): + empty = not self._tx_buf + self._tx_buf += buf + if empty: + schedule_in(self._flush, 50) + + +def start(): + ble = bluetooth.BLE() + uart = BLEUART(ble, name='mpy-repl') + stream = BLEUARTStream(uart) + + os.dupterm(stream) |