summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--examples/bluetooth/ble_advertising.py37
-rw-r--r--examples/bluetooth/ble_temperature.py76
-rw-r--r--examples/bluetooth/ble_uart_peripheral.py78
-rw-r--r--examples/bluetooth/ble_uart_repl.py80
4 files changed, 271 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_advertising.py b/examples/bluetooth/ble_advertising.py
new file mode 100644
index 0000000000..f52598a62a
--- /dev/null
+++ b/examples/bluetooth/ble_advertising.py
@@ -0,0 +1,37 @@
+# Helpers for generating BLE advertising payloads.
+
+from micropython import const
+import struct
+
+# Advertising payloads are repeated packets of the following form:
+# 1 byte data length (N + 1)
+# 1 byte type (see constants below)
+# N bytes type-specific data
+
+_ADV_TYPE_FLAGS = const(0x01)
+_ADV_TYPE_NAME = const(0x09)
+_ADV_TYPE_UUID16_COMPLETE = const(0x3)
+_ADV_TYPE_APPEARANCE = const(0x19)
+
+# Generate a payload to be passed to gap_advertise(adv_data=...).
+def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
+ payload = bytearray()
+
+ def _append(adv_type, value):
+ nonlocal payload
+ payload += struct.pack('BB', len(value) + 1, adv_type) + value
+
+ _append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)))
+
+ if name:
+ _append(_ADV_TYPE_NAME, name)
+
+ if services:
+ for uuid in services:
+ # TODO: Support bluetooth.UUID class.
+ _append(_ADV_TYPE_UUID16_COMPLETE, struct.pack('<h', uuid))
+
+ # See org.bluetooth.characteristic.gap.appearance.xml
+ _append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance))
+
+ return payload
diff --git a/examples/bluetooth/ble_temperature.py b/examples/bluetooth/ble_temperature.py
new file mode 100644
index 0000000000..379138e746
--- /dev/null
+++ b/examples/bluetooth/ble_temperature.py
@@ -0,0 +1,76 @@
+# This example demonstrates a simple temperature sensor peripheral.
+#
+# The sensor's local value updates every second, and it will notify
+# any connected central every 10 seconds.
+
+import bluetooth
+import random
+import struct
+import time
+from ble_advertising import advertising_payload
+
+from micropython import const
+_IRQ_CENTRAL_CONNECT = const(1 << 0)
+_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
+
+# org.bluetooth.service.environmental_sensing
+_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
+# org.bluetooth.characteristic.temperature
+_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,)
+_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),)
+
+# org.bluetooth.characteristic.gap.appearance.xml
+_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
+
+class BLETemperature:
+ def __init__(self, ble, name='mpy-temp'):
+ self._ble = ble
+ self._ble.active(True)
+ self._ble.irq(handler=self._irq)
+ ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
+ self._connections = set()
+ self._payload = advertising_payload(name=name, services=[0x181A], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER)
+ self._advertise()
+
+ def _irq(self, event, data):
+ # Track connections so we can send notifications.
+ if event == _IRQ_CENTRAL_CONNECT:
+ conn_handle, _, _, = data
+ self._connections.add(conn_handle)
+ elif event == _IRQ_CENTRAL_DISCONNECT:
+ conn_handle, _, _, = data
+ self._connections.remove(conn_handle)
+ # Start advertising again to allow a new connection.
+ self._advertise()
+
+ def set_temperature(self, temp_deg_c, notify=False):
+ # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
+ # Write the local value, ready for a central to read.
+ self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100)))
+ if notify:
+ for conn_handle in self._connections:
+ # Notify connected centrals to issue a read.
+ self._ble.gatts_notify(conn_handle, self._handle)
+
+ def _advertise(self, interval_us=500000):
+ self._ble.gap_advertise(interval_us, adv_data=self._payload)
+
+
+def demo():
+ ble = bluetooth.BLE()
+ temp = BLETemperature(ble)
+
+ t = 25
+ i = 0
+
+ while True:
+ # Write every second, notify every 10 seconds.
+ i = (i + 1) % 10
+ temp.set_temperature(t, notify=i == 0)
+ # Random walk the temperature.
+ t += random.uniform(-0.5, 0.5)
+ time.sleep_ms(1000)
+
+
+if __name__ == '__main__':
+ demo()
diff --git a/examples/bluetooth/ble_uart_peripheral.py b/examples/bluetooth/ble_uart_peripheral.py
new file mode 100644
index 0000000000..32015b69fd
--- /dev/null
+++ b/examples/bluetooth/ble_uart_peripheral.py
@@ -0,0 +1,78 @@
+# This example demonstrates a peripheral implementing the Nordic UART Service (NUS).
+
+import bluetooth
+from ble_advertising import advertising_payload
+
+from micropython import const
+_IRQ_CENTRAL_CONNECT = const(1 << 0)
+_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
+_IRQ_GATTS_WRITE = const(1 << 2)
+
+_UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
+_UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_NOTIFY,)
+_UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,)
+_UART_SERVICE = (_UART_UUID, (_UART_TX, _UART_RX,),)
+
+# org.bluetooth.characteristic.gap.appearance.xml
+_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)
+
+class BLEUART:
+ def __init__(self, ble, name='mpy-uart', rxbuf=100):
+ self._ble = ble
+ self._ble.active(True)
+ self._ble.irq(handler=self._irq)
+ ((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services((_UART_SERVICE,))
+ # Increase the size of the rx buffer.
+ self._ble.gatts_write(self._rx_handle, bytes(rxbuf))
+ self._connections = set()
+ self._rx_buffer = bytearray()
+ self._handler = None
+ self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER)
+ self._advertise()
+
+ def irq(self, handler):
+ self._handler = handler
+
+ def _irq(self, event, data):
+ # Track connections so we can send notifications.
+ if event == _IRQ_CENTRAL_CONNECT:
+ conn_handle, _, _, = data
+ self._connections.add(conn_handle)
+ elif event == _IRQ_CENTRAL_DISCONNECT:
+ conn_handle, _, _, = data
+ if conn_handle in self._connections:
+ self._connections.remove(conn_handle)
+ # Start advertising again to allow a new connection.
+ self._advertise()
+ elif event == _IRQ_GATTS_WRITE:
+ conn_handle, value_handle, = data
+ if conn_handle in self._connections and value_handle == self._rx_handle:
+ self._rx_buffer += self._ble.gatts_read(self._rx_handle)
+ if self._handler:
+ self._handler()
+
+ def any(self):
+ return len(self._rx_buffer)
+
+ def read(self, sz=None):
+ if not sz:
+ sz = len(self._rx_buffer)
+ result = self._rx_buffer[0:sz]
+ self._rx_buffer = self._rx_buffer[sz:]
+ return result
+
+ def write(self, data):
+ for conn_handle in self._connections:
+ self._ble.gatts_notify(conn_handle, self._tx_handle, data)
+
+ def close(self):
+ for conn_handle in self._connections:
+ self._ble.gap_disconnect(conn_handle)
+ self._connections.clear()
+
+ def _advertise(self, interval_us=500000):
+ self._ble.gap_advertise(interval_us, adv_data=self._payload)
+
+
+# ble = bluetooth.BLE()
+# uart = BLEUART(ble)
diff --git a/examples/bluetooth/ble_uart_repl.py b/examples/bluetooth/ble_uart_repl.py
new file mode 100644
index 0000000000..430a571a69
--- /dev/null
+++ b/examples/bluetooth/ble_uart_repl.py
@@ -0,0 +1,80 @@
+# Proof-of-concept of a REPL over BLE UART.
+#
+# Tested with the Adafruit Bluefruit app on Android.
+# Set the EoL characters to \r\n.
+
+import bluetooth
+import io
+import os
+import micropython
+import machine
+
+from ble_uart_peripheral import BLEUART
+
+_MP_STREAM_POLL = const(3)
+_MP_STREAM_POLL_RD = const(0x0001)
+
+# TODO: Remove this when STM32 gets machine.Timer.
+if hasattr(machine, 'Timer'):
+ _timer = machine.Timer(-1)
+else:
+ _timer = None
+
+# Batch writes into 50ms intervals.
+def schedule_in(handler, delay_ms):
+ def _wrap(_arg):
+ handler()
+ if _timer:
+ _timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap)
+ else:
+ micropython.schedule(_wrap, None)
+
+# Simple buffering stream to support the dupterm requirements.
+class BLEUARTStream(io.IOBase):
+ def __init__(self, uart):
+ self._uart = uart
+ self._tx_buf = bytearray()
+ self._uart.irq(self._on_rx)
+
+ def _on_rx(self):
+ # Needed for ESP32.
+ if hasattr(os, 'dupterm_notify'):
+ os.dupterm_notify(None)
+
+ def read(self, sz=None):
+ return self._uart.read(sz)
+
+ def readinto(self, buf):
+ avail = self._uart.read(len(buf))
+ if not avail:
+ return None
+ for i in range(len(avail)):
+ buf[i] = avail[i]
+ return len(avail)
+
+ def ioctl(self, op, arg):
+ if op == _MP_STREAM_POLL:
+ if self._uart.any():
+ return _MP_STREAM_POLL_RD
+ return 0
+
+ def _flush(self):
+ data = self._tx_buf[0:100]
+ self._tx_buf = self._tx_buf[100:]
+ self._uart.write(data)
+ if self._tx_buf:
+ schedule_in(self._flush, 50)
+
+ def write(self, buf):
+ empty = not self._tx_buf
+ self._tx_buf += buf
+ if empty:
+ schedule_in(self._flush, 50)
+
+
+def start():
+ ble = bluetooth.BLE()
+ uart = BLEUART(ble, name='mpy-repl')
+ stream = BLEUARTStream(uart)
+
+ os.dupterm(stream)