From 5886be05c8abb8747a60456f482a72ff56826f66 Mon Sep 17 00:00:00 2001 From: Bojan Sofronievski Date: Sat, 13 Apr 2024 19:52:21 +0200 Subject: [PATCH] Improve BLE com interface and example --- .vscode/launch.json | 12 ++ examples/ble_example.py | 62 ++++-- src/pydevdtk/coms/ble.py | 404 ++++++++++++++++++--------------------- 3 files changed, 242 insertions(+), 236 deletions(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..0d9b9dd --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,12 @@ +{ + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": false + } + ] +} \ No newline at end of file diff --git a/examples/ble_example.py b/examples/ble_example.py index 5c69928..0f86e54 100644 --- a/examples/ble_example.py +++ b/examples/ble_example.py @@ -1,7 +1,8 @@ import sys import time +import queue -from pydevdtk.coms.ble import Ble, BleStatus +from pydevdtk.coms.ble import Ble class Parser: @@ -27,50 +28,69 @@ def parse_data(self, data): ble = Ble() print("Scanning for BLE devices") ble.start_scan() -time.sleep(5) +time.sleep(2) ble.stop_scan() ble_devs = ble.get_found_devices() -ble_devs_names = [ble_dev["name"] for ble_dev in ble_devs] +ble_devs = [ble_dev for ble_dev in ble_devs if len(str(ble_dev)) > 0] +ble_devs_names = [ + str(ble_dev) for ble_dev in ble_devs if len(str(ble_dev)) > 0 +] print(f"Found devs: {ble_devs_names}\n") dev_name = input("Type in the BLE dev name to read the data from: ") if dev_name not in ble_devs_names: print(f"Requested device {dev_name} not found") + sys.exit(1) -dev = ble_devs[ble_devs_names.index(dev_name)] # wait for connection to establish -print(f"Connecting to {dev_name}") -ble.connect(dev["dev"]) +dev = ble_devs[ble_devs_names.index(dev_name)] +print(f"Connecting to {dev}...") t_timeout = 5 +ble.connect(dev) t_start = time.time() while time.time() - t_start < t_timeout: - status = ble.get_status(dev["address"]) - if status is not None and status == BleStatus.Connected: + if ble.is_connected(dev): break -if not ble.is_connected(dev["address"]): - print(f"Could not connect to device {dev_name}") + else: + time.sleep(0.2) +if not ble.is_connected(dev): + print(f"Could not connect to device {dev}") sys.exit(1) -print(f"Connected to {dev_name}") -ble.start_notifications_characteristic( - dev["address"], "beb5483e-36e1-4688-b7f5-ea07361b26a8" + +print(f"Connected to {dev}") + +data_queue = queue.Queue() +parser = Parser() +ble.start_notifications( + dev, + "beb5483e-36e1-4688-b7f5-ea07361b26a8", + lambda data: data_queue.put(data), ) +print("Notifications started") + t_run = 10 -parser = Parser() t_start = time.time() while time.time() - t_start < t_run: - addr, uuid, data = ble.get_data_event() + data = data_queue.get() for sin, cos in parser.parse_data(data): print(f"sin = {sin} | cos = {cos}") -ble.disconnect(dev["address"]) +ble.stop_notifications( + dev, + "beb5483e-36e1-4688-b7f5-ea07361b26a8", +) +print("Notifications stopped") + +print(f"Disconnecting from {dev}...") +ble.disconnect(dev) t_timeout = 5 t_start = time.time() while time.time() - t_start < t_timeout: - status = ble.get_status(dev["address"]) - if status is None: + if not ble.is_connected(dev): break -status = ble.get_status(dev["address"]) -if status is not None: - print(f"Failed to disconnect from {dev_name}") +if ble.is_connected(dev): + print(f"Could not disconnect from device {dev}") + sys.exit(2) +print(f"Disconnected from {dev}") diff --git a/src/pydevdtk/coms/ble.py b/src/pydevdtk/coms/ble.py index 79acfca..6168abe 100644 --- a/src/pydevdtk/coms/ble.py +++ b/src/pydevdtk/coms/ble.py @@ -1,9 +1,11 @@ import asyncio import enum import threading -import queue +from typing import Callable from bleak import BleakScanner, BleakClient +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData class BleStatus(enum.Enum): @@ -16,18 +18,42 @@ class BleStatus(enum.Enum): NotificationsDisabled = enum.auto() +class BleDevice: + def __init__( + self, + name: str | None, + address: str, + rssi: int, + uuids: list[str], + manufacturer_data: dict[int, bytes], + device_hndl: BLEDevice, + client: BleakClient | None = None, + ): + self.name = name + self.address = address + self.rssi = rssi + self.uuids = uuids + self.manufacturer_data = manufacturer_data + self._device_hndl = device_hndl + self._client = client + + def __str__(self) -> str: + return self.name if self.name is not None else "" + + class Ble: def __init__(self): - self.found_devices = {} - self.found_device = False + self.found_devices: dict[str, BleDevice] = {} + self.on_device: Callable[[BleDevice], None] | None = None self.scanning = False - self.connected_devices = {} - self.disconnect_events = {} - self.notification_devices = {} - self.stop_notify_events = {} - self.status_queue = queue.Queue() - self.data_queue = queue.Queue() - self.status_devices = {} + self.scan_stop_event = asyncio.Event() + + self.on_connect: dict[str, Callable[[], None]] = {} + self.on_disconnect: dict[str, Callable[[], None]] = {} + self.status_devices: dict[str, BleStatus] = {} + self.disconnect_events: dict[str, asyncio.Event] = {} + self.connected_devices: dict[str, BleDevice] = {} + self.event_loop = asyncio.new_event_loop() self.event_loop_thread = threading.Thread( target=self._asyncloop, daemon=True @@ -35,23 +61,15 @@ def __init__(self): self.event_loop_thread.start() def __del__(self): - # stop scanning - if self.scanning: - self.stop_scan() - # stop notifications - for dev_addr, dev_events in self.stop_notify_events.items(): - for char_uuid in dev_events.keys(): - self.stop_notifications_characteristic(dev_addr, char_uuid) - # disconnect from connected devices - for dev_addr in self.connected_devices.keys(): - self.disconnect(dev_addr) + for dev in self.connected_devices.values(): + self.disconnect(dev) self.event_loop.call_soon_threadsafe(self.event_loop.stop) self.event_loop_thread.join() - def start_scan(self): - # clear previously found devices - self.found_devices = {} - self.scan_stop_event = asyncio.Event() + def start_scan(self, on_device: Callable[[BleDevice], None] | None = None): + self.found_devices = {} # clear previously found devices + self.on_device = on_device + self.scan_stop_event.clear() asyncio.run_coroutine_threadsafe( self.bluetooth_scan(self.scan_stop_event), self.event_loop ) @@ -62,92 +80,58 @@ def stop_scan(self): self.event_loop.call_soon_threadsafe(self.scan_stop_event.set) self.scanning = False - def is_scanning(self): + def is_scanning(self) -> bool: return self.scanning - def has_found_device(self): - ret_val = self.found_device - self.found_device = False - return ret_val - - def get_found_devices(self): - devices = [] - for address, ( - device, - advertisement_data, - ) in self.found_devices.items(): - dev = { - "name": advertisement_data.local_name, - "address": address, - "rssi": advertisement_data.rssi, - "uuids": advertisement_data.service_uuids, - "manufacturer_data": advertisement_data.manufacturer_data, - "dev": device, - } - devices.append(dev) - return devices - - def connect(self, dev): - self.status_devices[dev.address] = BleStatus.Connecting - try: - self.status_queue.put_nowait((dev.address, BleStatus.Connecting)) - except queue.Full: - # TODO better handling of this case - pass - self.disconnect_events[dev.address] = asyncio.Event() - asyncio.run_coroutine_threadsafe( - self.bluetooth_connect(dev, self.disconnect_events[dev.address]), - self.event_loop, - ) + def get_found_devices(self) -> list[BleDevice]: + return list(self.found_devices.values()) + + def connect( + self, + dev: BleDevice, + on_connect: Callable[[], None] | None = None, + on_disconnect: Callable[[], None] | None = None, + ): + if not self.is_connected(dev): + if on_connect is not None: + self.on_connect[dev.address] = on_connect + if on_disconnect is not None: + self.on_disconnect[dev.address] = on_disconnect + self.disconnect_events[dev.address] = asyncio.Event() + self.status_devices[dev.address] = BleStatus.Connecting + asyncio.run_coroutine_threadsafe( + self.bluetooth_connect( + dev, self.disconnect_events[dev.address] + ), + self.event_loop, + ) - def disconnect(self, dev_address): - self.status_devices[dev_address] = BleStatus.Disconnecting - try: - self.status_queue.put_nowait( - (dev_address, BleStatus.Disconnecting) + def disconnect(self, dev: BleDevice): + if self.is_connected(dev): + self.status_devices[dev.address] = BleStatus.Disconnecting + self.event_loop.call_soon_threadsafe( + self.disconnect_events[dev.address].set ) - except queue.Full: - # TODO better handling of this case - pass - # stop notifications if any - for dev_addr, dev_events in self.stop_notify_events.items(): - for char_uuid in dev_events.keys(): - self.stop_notifications_characteristic(dev_addr, char_uuid) - self.event_loop.call_soon_threadsafe( - self.disconnect_events[dev_address].set - ) - def is_connected(self, dev_address): - return dev_address in self.connected_devices + def is_connected(self, dev: BleDevice) -> bool: + return dev.address in self.connected_devices - def get_connected_devices(self): - return list(self.connected_devices.keys()) + def get_connected_devices(self) -> list[BleDevice]: + return list(self.connected_devices.values()) - def get_status(self, dev_address): - if dev_address in self.status_devices: - return self.status_devices[dev_address] + def get_status(self, dev: BleDevice) -> BleStatus | None: + if dev.address in self.status_devices: + return self.status_devices[dev.address] else: return None - def get_status_event(self, block=True, timeout=None): - try: - return self.status_queue.get(block, timeout) - except queue.Empty: - return None - - def get_data_event(self, block=True, timeout=None): - try: - return self.data_queue.get(block, timeout) - except queue.Empty: - return None - - def get_services_and_characteristics(self, dev_address): - if not self.is_connected(dev_address): + def get_services_and_characteristics(self, dev: BleDevice) -> dict: + if not self.is_connected(dev): services_collection = None else: services_collection = {} - dev = self.connected_devices[dev_address] - for _, service in dev.services.services.items(): + client = self.connected_devices[dev.address]._client + for _, service in client.services.services.items(): services_collection[service.uuid] = { "name": service.description, "service": service, @@ -173,72 +157,86 @@ def get_services_and_characteristics(self, dev_address): ] = service_characteristics return services_collection - def read_characteristic(self, dev_addr, char_uuid): - if self.is_connected(dev_addr): - client = self.connected_devices[dev_addr] + def read_characteristic( + self, dev: BleDevice, char_uuid: str + ) -> bytearray | None: + if self.is_connected(dev): + client = self.connected_devices[dev.address]._client chars = list(client.services.characteristics.values()) chars_uuids = [char.uuid for char in chars] chars_properties = [char.properties for char in chars] if char_uuid in chars_uuids: i_char = chars_uuids.index(char_uuid) if "read" in chars_properties[i_char]: - asyncio.run_coroutine_threadsafe( + future = asyncio.run_coroutine_threadsafe( self.bluetooth_read(client, char_uuid), self.event_loop ) - - def write_characteristic(self, dev_addr, char_uuid, data): - if self.is_connected(dev_addr): - client = self.connected_devices[dev_addr] + return future.result() + return None + + def write_characteristic( + self, + dev: BleDevice, + char_uuid: str, + data: bytes | bytearray, + response: bool, + ): + if self.is_connected(dev): + client = self.connected_devices[dev.address]._client chars = list(client.services.characteristics.values()) chars_uuids = [char.uuid for char in chars] chars_properties = [char.properties for char in chars] if char_uuid in chars_uuids: i_char = chars_uuids.index(char_uuid) - if "write" in chars_properties[i_char]: - asyncio.run_coroutine_threadsafe( - self.bluetooth_write(client, char_uuid, data), + prop = "write" if response else "write-without-response" + if prop in chars_properties[i_char]: + future = asyncio.run_coroutine_threadsafe( + self.bluetooth_write( + client, char_uuid, data, response + ), self.event_loop, ) - - def start_notifications_characteristic(self, dev_addr, char_uuid): - if self.is_connected(dev_addr): - client = self.connected_devices[dev_addr] + return future.result() + return None + + def start_notifications( + self, + dev: BleDevice, + char_uuid: str, + on_data: Callable[[bytes | bytearray], None], + ) -> bool: + if self.is_connected(dev): + client = self.connected_devices[dev.address]._client chars = list(client.services.characteristics.values()) chars_uuids = [char.uuid for char in chars] chars_properties = [char.properties for char in chars] if char_uuid in chars_uuids: i_char = chars_uuids.index(char_uuid) if "notify" in chars_properties[i_char]: - if dev_addr not in self.stop_notify_events: - self.stop_notify_events[dev_addr] = {} - self.stop_notify_events[dev_addr][ - char_uuid - ] = asyncio.Event() - if dev_addr not in self.notification_devices: - self.notification_devices[dev_addr] = {} asyncio.run_coroutine_threadsafe( - self.bluetooth_notify( - client, - char_uuid, - self.stop_notify_events[dev_addr][char_uuid], + self.bluetooth_start_notify( + client, char_uuid, on_data ), self.event_loop, ) + return True + return False - def stop_notifications_characteristic(self, dev_addr, char_uuid): - if ( - dev_addr in self.stop_notify_events.keys() - and char_uuid in self.stop_notify_events[dev_addr].keys() - ): - self.event_loop.call_soon_threadsafe( - self.stop_notify_events[dev_addr][char_uuid].set - ) - - def are_notifications_enabled(self, dev_addr, char_uuid): - return ( - dev_addr in self.notification_devices.keys() - and char_uuid in self.notification_devices[dev_addr].keys() - ) + def stop_notifications(self, dev: BleDevice, char_uuid: str) -> bool: + if self.is_connected(dev): + client = self.connected_devices[dev.address]._client + chars = list(client.services.characteristics.values()) + chars_uuids = [char.uuid for char in chars] + chars_properties = [char.properties for char in chars] + if char_uuid in chars_uuids: + i_char = chars_uuids.index(char_uuid) + if "notify" in chars_properties[i_char]: + asyncio.run_coroutine_threadsafe( + self.bluetooth_stop_notify(client, char_uuid), + self.event_loop, + ) + return True + return False async def bluetooth_scan(self, stop_event): async with BleakScanner( @@ -246,93 +244,69 @@ async def bluetooth_scan(self, stop_event): ): await stop_event.wait() - def _detection_callback(self, device, advertisement_data): - if advertisement_data.local_name is not None: - self.found_devices[device.address] = ( - device, - advertisement_data, - ) - self.found_device = True + def _detection_callback( + self, device: BLEDevice, advertisement_data: AdvertisementData + ): + dev = BleDevice( + name=advertisement_data.local_name, + address=device.address, + rssi=advertisement_data.rssi, + uuids=advertisement_data.service_uuids, + manufacturer_data=advertisement_data.manufacturer_data, + device_hndl=device, + ) + self.found_devices[device.address] = dev + if self.on_device is not None: + self.on_device(dev) - async def bluetooth_connect(self, device, disconnect_event): + async def bluetooth_connect( + self, device: BleDevice, disconnect_event: asyncio.Event + ): async with BleakClient( - device, + device._device_hndl, self._disconnect_callback, ) as client: - self.connected_devices[device.address] = client - self.status_devices[device.address] = BleStatus.Connected - try: - self.status_queue.put_nowait( - (device.address, BleStatus.Connected) - ) - except queue.Full: - # TODO better handling of this case - pass + device._client = client + self.connected_devices[client.address] = device + self.status_devices[client.address] = BleStatus.Connected + if client.address in self.on_connect: + self.on_connect[client.address]() await disconnect_event.wait() + del self.disconnect_events[client.address] - def _disconnect_callback(self, client): + def _disconnect_callback(self, client: BleakClient): + if client.address in self.disconnect_events: + self.disconnect_events[client.address].set() del self.connected_devices[client.address] del self.status_devices[client.address] - if client.address in self.notification_devices.keys(): - del self.notification_devices[client.address] - try: - self.status_queue.put_nowait( - (client.address, BleStatus.Disconnected) - ) - except queue.Full: - # TODO better handling of this case - pass - - async def bluetooth_read(self, client, uuid): - data = await client.read_gatt_char(uuid) - try: - self.data_queue.put_nowait((client.address, uuid, data)) - except queue.Full: - # TODO better handling of this case - pass - - async def bluetooth_write(self, client, uuid, data): - await client.write_gatt_char(uuid, data) - try: - self.status_queue.put_nowait( - (client.address, BleStatus.WriteSuccessful, uuid) - ) - except queue.Full: - # TODO better handling of this case - pass - - async def bluetooth_notify(self, client, uuid, stop_event): - await client.start_notify( - uuid, - lambda uuid, data: self.bluetooth_notify_callback( - client, uuid, data - ), - ) - self.notification_devices[client.address][uuid] = True - try: - self.status_queue.put_nowait( - (client.address, BleStatus.NotificationsEnabled, uuid) - ) - except queue.Full: - # TODO better handling of this case - pass - await stop_event.wait() + if client.address in self.on_connect: + del self.on_connect[client.address] + if client.address in self.on_disconnect: + self.on_disconnect[client.address]() + del self.on_disconnect[client.address] + + async def bluetooth_read(self, client: BleakClient, uuid: str): + return await client.read_gatt_char(uuid) + + async def bluetooth_write( + self, + client: BleakClient, + uuid: str, + data: bytes | bytearray, + response: bool, + ): + return await client.write_gatt_char(uuid, data, response) + + async def bluetooth_start_notify( + self, + client: BleakClient, + uuid: str, + on_data: Callable[[bytes | bytearray], None], + ): + await client.start_notify(uuid, lambda _, data: on_data(data)) + + async def bluetooth_stop_notify(self, client: BleakClient, uuid: str): await client.stop_notify(uuid) - del self.notification_devices[client.address] - try: - self.status_queue.put_nowait( - (client.address, BleStatus.NotificationsDisabled, uuid) - ) - except queue.Full: - # TODO better handling of this case - pass - - def bluetooth_notify_callback(self, client, char, data): - try: - self.data_queue.put_nowait((client.address, char.uuid, data)) - except queue.Full: - # TODO better handling of this case - pass def _asyncloop(self): asyncio.set_event_loop(self.event_loop)