summaryrefslogtreecommitdiffstatshomepage
path: root/tests/multi_bluetooth/ble_irq_calls.py
diff options
context:
space:
mode:
authorDamien George <damien@micropython.org>2024-02-08 16:52:01 +1100
committerDamien George <damien@micropython.org>2024-02-09 11:44:28 +1100
commitb4f59984f7668db457280c1f4007618fd1d235f6 (patch)
tree1cb5a35670d7752beb49221c687545c0e7393a05 /tests/multi_bluetooth/ble_irq_calls.py
parent8cbae12d0d84ebce76b305d2f5535f327d5d44be (diff)
downloadmicropython-b4f59984f7668db457280c1f4007618fd1d235f6.tar.gz
micropython-b4f59984f7668db457280c1f4007618fd1d235f6.zip
extmod/btstack: Reset pending_value_handle before calling write-done cb.
The pending_value_handle needs to be freed and reset before calling mp_bluetooth_gattc_on_read_write_status(), which will call the Python IRQ handler, which may in turn call back into BTstack to perform an action like a write. In that case the pending_value_handle will need to be available for the write/read/etc to proceed. Fixes issue #13611. Signed-off-by: Damien George <damien@micropython.org>
Diffstat (limited to 'tests/multi_bluetooth/ble_irq_calls.py')
-rw-r--r--tests/multi_bluetooth/ble_irq_calls.py200
1 files changed, 200 insertions, 0 deletions
diff --git a/tests/multi_bluetooth/ble_irq_calls.py b/tests/multi_bluetooth/ble_irq_calls.py
new file mode 100644
index 0000000000..131ea90929
--- /dev/null
+++ b/tests/multi_bluetooth/ble_irq_calls.py
@@ -0,0 +1,200 @@
+# Test calling BLE methods from within the BLE.irq event handler.
+
+from micropython import const
+import struct
+import time
+import bluetooth
+
+_IRQ_CENTRAL_CONNECT = const(1)
+_IRQ_CENTRAL_DISCONNECT = const(2)
+_IRQ_PERIPHERAL_CONNECT = const(7)
+_IRQ_PERIPHERAL_DISCONNECT = const(8)
+_IRQ_GATTC_SERVICE_RESULT = const(9)
+_IRQ_GATTC_SERVICE_DONE = const(10)
+_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
+_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
+_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
+_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
+_IRQ_GATTC_READ_RESULT = const(15)
+_IRQ_GATTC_READ_DONE = const(16)
+_IRQ_GATTC_WRITE_DONE = const(17)
+_IRQ_MTU_EXCHANGED = const(21)
+_IRQ_GET_SECRET = const(29)
+_IRQ_SET_SECRET = const(30)
+
+EVENT_NAMES = {
+ 1: "_IRQ_CENTRAL_CONNECT",
+ 2: "_IRQ_CENTRAL_DISCONNECT",
+ 3: "_IRQ_GATTS_WRITE",
+ 4: "_IRQ_GATTS_READ_REQUEST",
+ 7: "_IRQ_PERIPHERAL_CONNECT",
+ 8: "_IRQ_PERIPHERAL_DISCONNECT",
+ 9: "_IRQ_GATTC_SERVICE_RESULT",
+ 10: "_IRQ_GATTC_SERVICE_DONE",
+ 11: "_IRQ_GATTC_CHARACTERISTIC_RESULT",
+ 12: "_IRQ_GATTC_CHARACTERISTIC_DONE",
+ 13: "_IRQ_GATTC_DESCRIPTOR_RESULT",
+ 14: "_IRQ_GATTC_DESCRIPTOR_DONE",
+ 15: "_IRQ_GATTC_READ_RESULT",
+ 16: "_IRQ_GATTC_READ_DONE",
+ 17: "_IRQ_GATTC_WRITE_DONE",
+ 18: "_IRQ_GATTC_NOTIFY",
+ 21: "_IRQ_MTU_EXCHANGED",
+}
+
+_ADV_TYPE_FLAGS = const(0x01)
+_ADV_TYPE_NAME = const(0x09)
+_ADV_TYPE_UUID128_COMPLETE = const(0x7)
+
+_NOTIFY_ENABLE = const(1)
+
+ACCESSORY_UUID = bluetooth.UUID("a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a")
+STATE_UUID = bluetooth.UUID("a5a5a5a5-eeee-9999-1111-5a5a5a5a5a5a")
+CCC_UUID = bluetooth.UUID(0x2902)
+
+STATE_CHARACTERISTIC = (
+ STATE_UUID,
+ bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
+)
+
+ACCESSORY_SERVICE = (ACCESSORY_UUID, (STATE_CHARACTERISTIC,))
+
+
+class Central:
+ def __init__(self):
+ self.done = False
+ self._conn_handle = None
+ self._service = None
+ self._characteristic = None
+ self._cccd_handle = None
+ ble.active(1)
+ ble.irq(self._ble_event_handler)
+ ble.gap_connect(*BDADDR)
+
+ def _ble_event_handler(self, event, data):
+ print(EVENT_NAMES[event])
+
+ if event == _IRQ_PERIPHERAL_CONNECT:
+ conn_handle, _, _ = data
+ self._conn_handle = conn_handle
+ ble.gattc_discover_services(self._conn_handle, ACCESSORY_UUID)
+
+ elif event == _IRQ_PERIPHERAL_DISCONNECT:
+ conn_handle, _, addr = data
+ assert self._conn_handle == conn_handle
+ self._conn_handle = None
+ print("connection closed")
+
+ elif event == _IRQ_GATTC_SERVICE_RESULT:
+ _, first_handle, last_handle, uuid = data
+ print("service found:", last_handle - first_handle, uuid)
+ if uuid == ACCESSORY_UUID:
+ assert self._service is None
+ self._service = (first_handle, last_handle)
+
+ elif event == _IRQ_GATTC_SERVICE_DONE:
+ print("service handle range:", self._service[1] - self._service[0])
+ start_handle, end_handle = self._service
+ ble.gattc_discover_characteristics(self._conn_handle, start_handle, end_handle)
+
+ elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
+ _, end_handle, value_handle, properties, uuid = data
+ assert uuid == STATE_UUID
+ print("characteristic found:", uuid)
+ self._characteristic = (end_handle, value_handle, properties)
+
+ elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
+ start_handle, end_handle = self._service
+ ble.gattc_discover_descriptors(self._conn_handle, start_handle, end_handle)
+
+ elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
+ _, dsc_handle, uuid = data
+ if uuid == CCC_UUID:
+ print("CCCD found:", uuid)
+ assert self._cccd_handle is None
+ self._cccd_handle = dsc_handle
+
+ elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
+ # Discovery complete, proceed to MTU exchange.
+ ble.gattc_exchange_mtu(self._conn_handle)
+
+ elif event == _IRQ_MTU_EXCHANGED:
+ # MTU exchanged, proceed to enable CCCD.
+ print("CCCD write")
+ ble.gattc_write(
+ self._conn_handle, self._cccd_handle, struct.pack("<h", _NOTIFY_ENABLE), 1
+ )
+
+ elif event == _IRQ_GATTC_WRITE_DONE:
+ conn_handle, _, result = data
+ print("CCCD write result:", result)
+ _, state_handle, _ = self._characteristic
+ print("issue gattc_read")
+ ble.gattc_read(self._conn_handle, state_handle)
+
+ elif event == _IRQ_GATTC_READ_RESULT:
+ _, _, char_data = data
+ print("gattc_read result:", bytes(char_data))
+
+ elif event == _IRQ_GATTC_READ_DONE:
+ self.done = True
+ ble.gap_disconnect(self._conn_handle)
+
+
+class Peripheral:
+ def __init__(self):
+ self.done = False
+ ble.active(1)
+ ble.irq(self._ble_event_handler)
+ ble.gatts_register_services((ACCESSORY_SERVICE,))
+ add_payload = self.advertising_payload("acc", (ACCESSORY_UUID,))
+ ble.gap_advertise(500000, add_payload)
+
+ def advertising_payload(self, name, services):
+ payload = bytearray()
+
+ def _append(adv_type, value):
+ nonlocal payload
+ payload.extend(struct.pack("BB", len(value) + 1, adv_type) + value)
+
+ _append(_ADV_TYPE_FLAGS, struct.pack("B", 0x02 + 0x04))
+ _append(_ADV_TYPE_NAME, name)
+
+ for uuid in services:
+ b = bytes(uuid)
+ assert len(b) == 16
+ _append(_ADV_TYPE_UUID128_COMPLETE, b)
+
+ return payload
+
+ def _ble_event_handler(self, event, data):
+ if event not in (_IRQ_GET_SECRET, _IRQ_SET_SECRET):
+ print(EVENT_NAMES[event])
+ if event == _IRQ_CENTRAL_DISCONNECT:
+ self.done = True
+
+
+# Acting in peripheral role.
+def instance0():
+ print("peripheral start")
+ peripheral = Peripheral()
+ multitest.globals(BDADDR=ble.config("mac"))
+ multitest.next()
+ while not peripheral.done:
+ time.sleep_ms(100)
+ multitest.broadcast("finished")
+ ble.active(0)
+
+
+# Acting in central role.
+def instance1():
+ print("central start")
+ multitest.next()
+ central = Central()
+ while not central.done:
+ time.sleep_ms(100)
+ multitest.wait("finished")
+ ble.active(0)
+
+
+ble = bluetooth.BLE()