forked from parallaxinc/BlocklyPropClient
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SerialSocket.py
57 lines (47 loc) · 1.87 KB
/
SerialSocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
__author__ = 'Michel'
from ws4py.websocket import WebSocket
from serial import Serial, SerialException
from time import sleep
import thread
import re
OPEN_CONNECTION_STRING = "+++ open port "
class SerialSocket(WebSocket):
def __init__(self, sock, protocols=None, extensions=None, environ=None, heartbeat_freq=None):
super(SerialSocket, self).__init__(sock, protocols, extensions, environ, heartbeat_freq)
self.serial = Serial()
def received_message(self, message):
if message.data[0:len(OPEN_CONNECTION_STRING)] == OPEN_CONNECTION_STRING:
port = message.data[len(OPEN_CONNECTION_STRING):]
self.serial.port = port
self.serial.baudrate = 115200
try:
self.serial.open()
except SerialException as se:
self.send("Failed to connect to: " + port + "\n\r(" + se.message + ")\n\r")
return
if self.serial.isOpen():
self.send("Connection established with: " + port + "\n\r")
thread.start_new_thread(serial_poll, (self.serial, self))
else:
self.send("Failed to connect to: " + port + "\n\r")
else:
if self.serial.isOpen():
self.send(message.data)
self.serial.write(message.data)
def close(self, code=1000, reason=''):
# close serial connection
print 'closing'
self.serial.close()
super(SerialSocket, self).close(code, reason)
def serial_poll(serial, socket):
try:
while serial.isOpen():
data = serial.read(serial.inWaiting())
if len(data) > 0:
# print 'Got:', data
data = re.sub("\r", "\n\r", data)
socket.send(data)
sleep(0.5)
# print 'not blocked'
except SerialException:
print('connection closed')