-
Notifications
You must be signed in to change notification settings - Fork 3
/
zei.py
133 lines (95 loc) · 3.62 KB
/
zei.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
#
from bluepy import btle
import struct
import logging
_log = logging.getLogger(__name__)
_log.addHandler(logging.StreamHandler())
_log.setLevel(logging.INFO)
def _ZEI_UUID(short_uuid):
return 'c7e7%04X-c847-11e6-8175-8c89a55d403c' % (short_uuid)
class ZeiCharBase:
def __init__(self, periph):
self.periph = periph
self.hndl = None
def enable(self):
_svc = self.periph.getServiceByUUID(self.svcUUID)
_chr = _svc.getCharacteristics(self.charUUID)[0]
self.hndl = _chr.getHandle()
# this is uint16_t - see: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.descriptor.gatt.client_characteristic_configuration.xml
_cccd = _chr.getDescriptors(btle.AssignedNumbers.client_characteristic_configuration)[0]
_cccd.write(struct.pack("<H", 2), withResponse=True)
class ZeiOrientationChar(ZeiCharBase):
svcUUID = _ZEI_UUID(0x0010)
charUUID = _ZEI_UUID(0x0012)
def __init__(self, periph):
ZeiCharBase.__init__(self, periph)
class BatteryLevelChar(ZeiCharBase):
svcUUID = btle.AssignedNumbers.battery_service
charUUID = btle.AssignedNumbers.battery_level
def __init__(self, periph):
ZeiCharBase.__init__(self, periph)
class Zei(btle.Peripheral):
def __init__(self, *args, **kwargs):
btle.Peripheral.__init__(self, *args, **kwargs)
self.withDelegate(ZeiDelegate(self))
# activate notifications about turn
self.orientation = ZeiOrientationChar(self)
self.orientation.enable()
class ZeiDelegate(btle.DefaultDelegate):
def __init__(self, periph):
btle.DefaultDelegate.__init__(self)
self.parent = periph
def handleNotification(self, cHandle, data):
if cHandle == 39:
_log.info("Current side up is %s", struct.unpack('B', data) )
else:
_log.info("Notification from hndl: %s - %r", cHandle, data)
class ZeiDiscoveryDelegate(btle.DefaultDelegate):
def __init__(self, scanner, periph):
btle.DefaultDelegate.__init__(self)
self.scanner = scanner
self.periph = periph
def handleDiscovery(self, dev, isNewDev, isNewData):
if not dev.addr == 'f1:05:a5:9c:2e:9b':
return
_log.info("Device %s (%s), RSSI=%d dB", dev.addr, dev.addrType, dev.rssi)
for (adtype, desc, value) in dev.getScanData():
_log.info(" %s = %s", desc, value)
# reconnect
# bluepy can only do one thing at a time, so stop scanning while trying to connect
# this is not supported by bluepy
#self.scanner.stop()
try:
self.periph.connect(dev)
self.scanner.stop_scanning = True
except:
# re
self.scanner.start()
pass
class ZeiDiscovery(btle.Scanner):
def __init__(self, periph=None, **kwargs):
self.zei = periph
btle.Scanner.__init__(self, **kwargs)
#self.withDelegate(ZeiDiscoveryDelegate(self, self.zei))
#self.stop_scanning = False
def reconnect(self):
self.iface=self.zei.iface
self.clear()
self.start()
while self.zei.addr not in self.scanned:
self.process(timeout=2)
self.stop()
self.zei.connect(self.scanned[self.zei.addr])
def main():
zei = Zei('f1:05:a5:9c:2e:9b', 'random', iface=1)
scanner = ZeiDiscovery(zei)
while True:
try:
zei.waitForNotifications(timeout=None)
except Exception as e:
_log.exception(e)
scanner.reconnect()
zei.disconnect()
if __name__ == "__main__":
main()