summaryrefslogtreecommitdiffstatshomepage
path: root/examples/bluetooth/ble_uart_repl.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/bluetooth/ble_uart_repl.py')
-rw-r--r--examples/bluetooth/ble_uart_repl.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_uart_repl.py b/examples/bluetooth/ble_uart_repl.py
new file mode 100644
index 0000000000..430a571a69
--- /dev/null
+++ b/examples/bluetooth/ble_uart_repl.py
@@ -0,0 +1,80 @@
+# Proof-of-concept of a REPL over BLE UART.
+#
+# Tested with the Adafruit Bluefruit app on Android.
+# Set the EoL characters to \r\n.
+
+import bluetooth
+import io
+import os
+import micropython
+import machine
+
+from ble_uart_peripheral import BLEUART
+
+_MP_STREAM_POLL = const(3)
+_MP_STREAM_POLL_RD = const(0x0001)
+
+# TODO: Remove this when STM32 gets machine.Timer.
+if hasattr(machine, 'Timer'):
+ _timer = machine.Timer(-1)
+else:
+ _timer = None
+
+# Batch writes into 50ms intervals.
+def schedule_in(handler, delay_ms):
+ def _wrap(_arg):
+ handler()
+ if _timer:
+ _timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap)
+ else:
+ micropython.schedule(_wrap, None)
+
+# Simple buffering stream to support the dupterm requirements.
+class BLEUARTStream(io.IOBase):
+ def __init__(self, uart):
+ self._uart = uart
+ self._tx_buf = bytearray()
+ self._uart.irq(self._on_rx)
+
+ def _on_rx(self):
+ # Needed for ESP32.
+ if hasattr(os, 'dupterm_notify'):
+ os.dupterm_notify(None)
+
+ def read(self, sz=None):
+ return self._uart.read(sz)
+
+ def readinto(self, buf):
+ avail = self._uart.read(len(buf))
+ if not avail:
+ return None
+ for i in range(len(avail)):
+ buf[i] = avail[i]
+ return len(avail)
+
+ def ioctl(self, op, arg):
+ if op == _MP_STREAM_POLL:
+ if self._uart.any():
+ return _MP_STREAM_POLL_RD
+ return 0
+
+ def _flush(self):
+ data = self._tx_buf[0:100]
+ self._tx_buf = self._tx_buf[100:]
+ self._uart.write(data)
+ if self._tx_buf:
+ schedule_in(self._flush, 50)
+
+ def write(self, buf):
+ empty = not self._tx_buf
+ self._tx_buf += buf
+ if empty:
+ schedule_in(self._flush, 50)
+
+
+def start():
+ ble = bluetooth.BLE()
+ uart = BLEUART(ble, name='mpy-repl')
+ stream = BLEUARTStream(uart)
+
+ os.dupterm(stream)