Skip to content

Commit

Permalink
Improve BLE com interface and example
Browse files Browse the repository at this point in the history
  • Loading branch information
BojanSof committed Apr 13, 2024
1 parent 7b790da commit 5886be0
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 236 deletions.
12 changes: 12 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false
}
]
}
62 changes: 41 additions & 21 deletions examples/ble_example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import sys
import time
import queue

from pydevdtk.coms.ble import Ble, BleStatus
from pydevdtk.coms.ble import Ble


class Parser:
Expand All @@ -27,50 +28,69 @@ def parse_data(self, data):
ble = Ble()
print("Scanning for BLE devices")
ble.start_scan()
time.sleep(5)
time.sleep(2)
ble.stop_scan()

ble_devs = ble.get_found_devices()
ble_devs_names = [ble_dev["name"] for ble_dev in ble_devs]
ble_devs = [ble_dev for ble_dev in ble_devs if len(str(ble_dev)) > 0]
ble_devs_names = [
str(ble_dev) for ble_dev in ble_devs if len(str(ble_dev)) > 0
]

print(f"Found devs: {ble_devs_names}\n")

dev_name = input("Type in the BLE dev name to read the data from: ")
if dev_name not in ble_devs_names:
print(f"Requested device {dev_name} not found")
sys.exit(1)

dev = ble_devs[ble_devs_names.index(dev_name)]
# wait for connection to establish
print(f"Connecting to {dev_name}")
ble.connect(dev["dev"])
dev = ble_devs[ble_devs_names.index(dev_name)]
print(f"Connecting to {dev}...")
t_timeout = 5
ble.connect(dev)
t_start = time.time()
while time.time() - t_start < t_timeout:
status = ble.get_status(dev["address"])
if status is not None and status == BleStatus.Connected:
if ble.is_connected(dev):
break
if not ble.is_connected(dev["address"]):
print(f"Could not connect to device {dev_name}")
else:
time.sleep(0.2)
if not ble.is_connected(dev):
print(f"Could not connect to device {dev}")
sys.exit(1)
print(f"Connected to {dev_name}")
ble.start_notifications_characteristic(
dev["address"], "beb5483e-36e1-4688-b7f5-ea07361b26a8"

print(f"Connected to {dev}")

data_queue = queue.Queue()
parser = Parser()
ble.start_notifications(
dev,
"beb5483e-36e1-4688-b7f5-ea07361b26a8",
lambda data: data_queue.put(data),
)
print("Notifications started")

t_run = 10
parser = Parser()
t_start = time.time()
while time.time() - t_start < t_run:
addr, uuid, data = ble.get_data_event()
data = data_queue.get()
for sin, cos in parser.parse_data(data):
print(f"sin = {sin} | cos = {cos}")

ble.disconnect(dev["address"])
ble.stop_notifications(
dev,
"beb5483e-36e1-4688-b7f5-ea07361b26a8",
)
print("Notifications stopped")

print(f"Disconnecting from {dev}...")
ble.disconnect(dev)
t_timeout = 5
t_start = time.time()
while time.time() - t_start < t_timeout:
status = ble.get_status(dev["address"])
if status is None:
if not ble.is_connected(dev):
break
status = ble.get_status(dev["address"])
if status is not None:
print(f"Failed to disconnect from {dev_name}")
if ble.is_connected(dev):
print(f"Could not disconnect from device {dev}")
sys.exit(2)
print(f"Disconnected from {dev}")
Loading

0 comments on commit 5886be0

Please sign in to comment.