-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_point.py
93 lines (80 loc) · 3.07 KB
/
mqtt_point.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import paho.mqtt.client as paho
import pickle
from data import *
class MqttPoint:
def makeOnConnectListener(self_inst):
def on_connect(client, userdata, flags, rc):
client.subscribe(self_inst.listen_channel)
return on_connect
def makeOnMessageListener(self_inst):
def on_message(client, userdata, msg):
message = pickle.loads(msg.payload)
print(f'{self_inst.name} received message: {str(message)}')
if message.is_request:
if self_inst.on_request_cb:
self_inst.on_request_cb(message)
else:
on_reply_cb = None
for _, msgPair in self_inst.last_message_map.items():
last_msg, cb = msgPair
if message.uuid == last_msg.uuid:
on_reply_cb = cb
break
if on_reply_cb:
on_reply_cb(message)
else:
if self_inst.on_request_cb:
self_inst.on_request_cb(message)
return on_message
def __init__(self, name: str, listen_channel: str, request_channel: str, on_request_cb):
self.name = name
self.listen_channel = listen_channel
self.request_channel = request_channel
self.on_request_cb = on_request_cb
self.last_message_map = {}
self.req_client = paho.Client()
self.req_client.on_publish = None
self.req_client.on_pre_connect = None
try:
self.req_client.connect('broker.hivemq.com', 1883)
except:
pass
self.master_listener = paho.Client()
self.master_listener.on_connect = MqttPoint.makeOnConnectListener(self)
self.master_listener.on_message = MqttPoint.makeOnMessageListener(self)
self.master_listener.on_pre_connect = None
try:
self.master_listener.connect('broker.hivemq.com', 1883, 60)
except:
pass
def getMyName(self):
return self.name
def sendMessage(self, msg: Data_MessageBase, on_reply_cb = None):
msg.slave_name = self.name
self.last_message_map[msg.msg_type] = (msg, on_reply_cb)
print(f'{self.name} sending request: {str(msg)}')
mqtt_msg = pickle.dumps(msg)
try:
self.req_client.publish(self.request_channel, mqtt_msg, qos=1)
except:
pass
def sendMessageToListenerChannel(self, msg: Data_MessageBase):
msg.slave_name = self.name
print(f'{self.name} sending request to listener channel: {str(msg)}')
mqtt_msg = pickle.dumps(msg)
try:
self.master_listener.publish(self.listen_channel, mqtt_msg, qos=1)
except:
pass
def start(self):
try:
self.master_listener.loop_start()
self.req_client.loop_start()
except:
pass
def stop(self):
try:
self.master_listener.loop_stop(force=True)
self.req_client.loop_stop(force=True)
except:
pass