-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrtm_loop.py
167 lines (132 loc) · 4.6 KB
/
rtm_loop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import sys
import time
import json
import threading
import ssl
import websocket
from bearychat import RTMMessage, RTMMessageType
# reference: https://github.com/websocket-client/websocket-client/issues/227
ssl_defaults = ssl.get_default_verify_paths()
sslopt_with_ca_certs = {'ca_certs': ssl_defaults.cafile}
if sys.version_info > (3, ):
from queue import Queue
from _thread import start_new_thread
else:
from Queue import Queue
from thread import start_new_thread
class RTMLoop(object):
"""Real Time Message loop
_errors(Queue): contains error message(dict("result", "msg")),
looks self._set_error()
_inbox(Queue): contains RTMMessage
_worker(threading.Thread): a thread for running the loop
Args:
ws_host(str): websocket host
"""
def __init__(self, ws_host):
self._call_id = 0
self._inbox = Queue()
self._errors = Queue()
self._ws = websocket.WebSocketApp(
ws_host,
on_open=self.on_open,
on_message=self.on_message,
on_close=self.on_close,
on_error=self.on_error)
self._worker = threading.Thread(
target=self._ws.run_forever,
kwargs={'sslopt': sslopt_with_ca_certs})
def on_open(self, ws):
"""Websocket on_open event handler"""
def keep_alive(interval):
while True:
time.sleep(interval)
self.ping()
start_new_thread(keep_alive, (self.keep_alive_interval, ))
def on_message(self, ws, message):
"""Websocket on_message event handler
Saves message as RTMMessage in self._inbox
"""
try:
data = json.loads(message)
except Exception:
self._set_error(message, "decode message failed")
else:
self._inbox.put(RTMMessage(data))
def on_error(self, ws, error):
"""Websocket on_error event handler
Saves error message in self._errors
"""
self._set_error(error, "read socket failed")
def on_close(self, ws):
"""Websocket on_close event handler"""
self._set_error("closed", "websocket closed")
def _set_error(self, result, msg):
"""Puts a error to self._errors
Args:
result(mix): received data
msg(str): message
"""
self._errors.put({"result": result, "msg": msg})
def start(self, keep_alive_interval=2):
"""Starts the main loop
Args:
keep_alive_interval(int): the interval(second) of sending keep
alive message
"""
self.keep_alive_interval = keep_alive_interval
self._worker.start()
def stop(self):
"""Stops the main loop
"""
self._ws.close()
def ping(self):
"""Sends ping message
"""
self.send(RTMMessage({"type": RTMMessageType.Ping}))
def gen_call_id(self):
"""Generates a call_id
Returns:
int: the call_id
"""
self._call_id += 1
return self._call_id
def send(self, message):
"""Sends a RTMMessage
Should be called after starting the loop
Args:
message(RTMMessage): the sending message
Raises:
WebSocketConnectionClosedException: if the loop is closed
"""
if "call_id" not in message:
message["call_id"] = self.gen_call_id()
self._ws.send(message.to_json())
def get_message(self, block=False, timeout=None):
"""Removes and returns a RTMMessage from self._inbox
Args:
block(bool): if True block until a RTMMessage is available,
else it will return None when self._inbox is empty
timeout(int): it blocks at most timeout seconds
Returns:
RTMMessage if self._inbox is not empty, else None
"""
try:
message = self._inbox.get(block=block, timeout=timeout)
return message
except Exception:
return None
def get_error(self, block=False, timeout=None):
"""Removes and returns an error from self._errors
Args:
block(bool): if True block until a RTMMessage is available,
else it will return None when self._inbox is empty
timeout(int): it blocks at most timeout seconds
Returns:
error if inbox is not empty, else None
"""
try:
error = self._errors.get(block=block, timeout=timeout)
return error
except Exception:
return None