-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblesync_server.py
242 lines (194 loc) · 7.57 KB
/
blesync_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import struct
from micropython import const
import blesync
# Advertising payloads are repeated packets of the following form:
# 1 byte data length (N + 1)
# 1 byte type (see constants below)
# N bytes type-specific data
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
_ADV_IND = const(0x00)
'''
Known as Advertising Indications (ADV_IND), where a peripheral device requests connection to any central device (i.e., not directed at a particular central device).
Example: A smart watch requesting connection to any central device.
'''
_ADV_DIRECT_IND = const(0x01)
'''
Similar to ADV_IND, yet the connection request is directed at a specific central device.
Example: A smart watch requesting connection to a specific central device.
'''
_ADV_SCAN_IND = const(0x02)
'''
Similar to ADV_NONCONN_IND, with the option additional information via scan responses.
Example: A warehouse pallet beacon allowing a central device to request additional information about the pallet.
'''
_ADV_NONCONN_IND = const(0x03)
'''
Non connectable devices, advertising information to any listening device.
Example: Beacons in museums defining proximity to specific exhibits.
'''
# Generate a payload to be passed to gap_advertise(adv_data=...).
def _create_advertising_payload(
limited_disc=False,
br_edr=False,
name=None,
services=None,
appearance=0 # UNKNOWN
):
payload = bytearray()
def _append(adv_type, value):
nonlocal payload
payload += struct.pack("BB", len(value) + 1, adv_type) + value
# some combinations of flags aren't allowed TODO describe
adv_type_flags = (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)
_append(
_ADV_TYPE_FLAGS,
struct.pack("B", adv_type_flags),
)
if name:
_append(_ADV_TYPE_NAME, name)
if services:
for uuid in services:
b = bytes(uuid)
if len(b) == 2:
_append(_ADV_TYPE_UUID16_COMPLETE, b)
elif len(b) == 4:
_append(_ADV_TYPE_UUID32_COMPLETE, b)
elif len(b) == 16:
_append(_ADV_TYPE_UUID128_COMPLETE, b)
# See org.bluetooth.characteristic.gap.appearance.xml
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))
return payload
class Characteristic:
def encode(self, decoded):
return decoded
def decode(self, encoded):
return encoded
def __init__(self, uuid, flags, buffer_size=None, buffer_append=False):
self.uuid = uuid
self.flags = flags
self._buffer_size = buffer_size
self._buffer_append = buffer_append
self._value_handle = None
self._on_write_callback = lambda *_: None
def __get__(self, service, service_class):
# for cpython compliance
# in micropython cls.attribute doesn't invoke __get__
if service is None:
return self
return ServerServiceCharacteristic(service.connections, self._value_handle)
def __set__(self, service, value):
blesync.gatts_write(self._value_handle, self.encode(value))
def set_value_handle(self, value_handle):
if self._buffer_size:
blesync.gatts_set_buffer(
value_handle,
self._buffer_size,
self._buffer_append
)
self._value_handle = value_handle
def call_write_callback(self, service, conn_handle, received_data):
return self._on_write_callback(
service,
conn_handle,
self.decode(received_data)
)
# if write flag
def on_write(self, callback):
self._on_write_callback = callback
return callback
class ServerServiceCharacteristic:
def __init__(self, connections, value_handle):
self._connections = connections
self.value_handle = value_handle
def notify(self, conn_handle, data=None):
blesync.gatts_notify(conn_handle, self.value_handle, data)
def notify_multiple(self, conn_handles, data=None):
for conn_handle in conn_handles:
self.notify(conn_handle, data=data)
def notify_all(self, data=None):
self.notify_multiple(self._connections, data=data)
class Service:
characteristics = tuple() # TODO it won't be needed in py 3.6
@classmethod
def get_characteristics_declarations(cls):
return [
(characteristic.uuid, characteristic.flags) # TODO BLE descriptors
for characteristic in cls.characteristics
]
def _on_gatts_write(self, conn_handle, value_handle):
received_data = blesync.gatts_read(value_handle)
characteristic = self._characteristics[value_handle]
characteristic.call_write_callback(self, conn_handle, received_data)
def __init__(self, connections, handles):
self.connections = connections
self._characteristics = {}
for characteristic, handle in zip(self.characteristics, handles):
characteristic.set_value_handle(handle)
self._characteristics[handle] = characteristic
def _get_services_declarations(service_classes):
return [
(service_class.uuid, service_class.get_characteristics_declarations())
for service_class in service_classes
]
class Server:
def __init__(self,
name,
*service_classes,
multiple_connections=False,
appearance=0,
advertise_interval_us=50000,
):
self._advertise_interval_us = advertise_interval_us
self._service_classes = service_classes
self._multiple_connections = multiple_connections
self._service_by_handle = {}
self._advertising_payload = _create_advertising_payload(
name=name,
appearance=appearance,
)
self.connections = []
def start(self):
blesync.activate()
services_declarations = _get_services_declarations(self._service_classes)
all_handles = blesync.gatts_register_services(services_declarations)
services = []
for handles, service_class in zip(
all_handles,
self._service_classes
):
service = service_class(self.connections, handles)
services.append(service)
for handle in handles:
self._service_by_handle[handle] = service
blesync.on_central_connect(self._on_central_connect)
blesync.on_central_disconnect(self._on_central_disconnect)
blesync.on_gatts_write(self._on_gatts_write)
self._advertise()
return services
def _advertise(self):
blesync.gap_advertise(
self._advertise_interval_us,
adv_data=self._advertising_payload
)
def _on_central_connect(self, conn_handle, addr_type, addr):
self.connections.append(conn_handle)
if self._multiple_connections:
# Start advertising again to allow multiple connections
self._advertise()
def _on_gatts_write(self, conn_handle, value_handle):
service = self._service_by_handle[value_handle]
service._on_gatts_write(conn_handle, value_handle)
def _on_central_disconnect(self, conn_handle, addr_type, addr):
self.connections.remove(conn_handle)
# Start advertising again to allow a new connection.
self._advertise()
on_connect = blesync.on_central_connect
on_disconnect = blesync.on_central_disconnect