-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathserver.py
executable file
·63 lines (53 loc) · 2.04 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
from robomodules.comm.serverProto import ServerProto
from robomodules.comm.subscribe_pb2 import Subscribe
from robomodules.comm.constants import _SUBSCRIBE
class Server():
def __init__(self, addr, port, MsgType):
self.loop = asyncio.get_event_loop()
self.clients = []
self.subs = {}
self.MsgType = MsgType
coro = self.loop.create_server(lambda: ServerProto(self), addr, port)
self.server = self.loop.run_until_complete(coro)
def _handle_subscriptions(self, protocol, data):
if data.dir == Subscribe.SUBSCRIBE:
self._add_subscriptions(protocol, data)
else:
self._remove_subscriptions(protocol, data)
def _remove_subscriptions(self, protocol, data):
for msg_type in data.msg_types:
m_type = self.MsgType(msg_type)
if m_type in self.subs:
self.subs[m_type].remove(protocol)
def _add_subscriptions(self, protocol, data):
for msg_type in data.msg_types:
m_type = self.MsgType(msg_type)
if m_type in self.subs:
self.subs[m_type].append(protocol)
else:
self.subs[m_type] = [protocol]
def _forward_msg(self, msg, msg_type):
m_type = self.MsgType(msg_type)
if m_type in self.subs:
for protocol in self.subs[m_type]:
protocol.write(msg, m_type)
def remove_client(self, protocol):
self.clients.remove(protocol)
for msg_type in self.subs:
if protocol in self.subs[msg_type]:
self.subs[msg_type].remove(protocol)
def msg_received(self, protocol, msg, msg_type):
if msg_type == _SUBSCRIBE:
data = Subscribe()
data.ParseFromString(msg)
self._handle_subscriptions(protocol, data)
else:
self._forward_msg(msg, msg_type)
def quit(self):
self.loop.stop()
def run(self):
try:
self.loop.run_forever()
except KeyboardInterrupt:
self.quit()