diff options
Diffstat (limited to 'examples/bluetooth/ble_uart_repl.py')
-rw-r--r-- | examples/bluetooth/ble_uart_repl.py | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_uart_repl.py b/examples/bluetooth/ble_uart_repl.py new file mode 100644 index 0000000000..430a571a69 --- /dev/null +++ b/examples/bluetooth/ble_uart_repl.py @@ -0,0 +1,80 @@ +# Proof-of-concept of a REPL over BLE UART. +# +# Tested with the Adafruit Bluefruit app on Android. +# Set the EoL characters to \r\n. + +import bluetooth +import io +import os +import micropython +import machine + +from ble_uart_peripheral import BLEUART + +_MP_STREAM_POLL = const(3) +_MP_STREAM_POLL_RD = const(0x0001) + +# TODO: Remove this when STM32 gets machine.Timer. +if hasattr(machine, 'Timer'): + _timer = machine.Timer(-1) +else: + _timer = None + +# Batch writes into 50ms intervals. +def schedule_in(handler, delay_ms): + def _wrap(_arg): + handler() + if _timer: + _timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap) + else: + micropython.schedule(_wrap, None) + +# Simple buffering stream to support the dupterm requirements. +class BLEUARTStream(io.IOBase): + def __init__(self, uart): + self._uart = uart + self._tx_buf = bytearray() + self._uart.irq(self._on_rx) + + def _on_rx(self): + # Needed for ESP32. + if hasattr(os, 'dupterm_notify'): + os.dupterm_notify(None) + + def read(self, sz=None): + return self._uart.read(sz) + + def readinto(self, buf): + avail = self._uart.read(len(buf)) + if not avail: + return None + for i in range(len(avail)): + buf[i] = avail[i] + return len(avail) + + def ioctl(self, op, arg): + if op == _MP_STREAM_POLL: + if self._uart.any(): + return _MP_STREAM_POLL_RD + return 0 + + def _flush(self): + data = self._tx_buf[0:100] + self._tx_buf = self._tx_buf[100:] + self._uart.write(data) + if self._tx_buf: + schedule_in(self._flush, 50) + + def write(self, buf): + empty = not self._tx_buf + self._tx_buf += buf + if empty: + schedule_in(self._flush, 50) + + +def start(): + ble = bluetooth.BLE() + uart = BLEUART(ble, name='mpy-repl') + stream = BLEUARTStream(uart) + + os.dupterm(stream) |