Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT重连会出现丢数据的情况。 #12

Open
javidHao opened this issue Aug 1, 2024 · 0 comments
Open

MQTT重连会出现丢数据的情况。 #12

javidHao opened this issue Aug 1, 2024 · 0 comments

Comments

@javidHao
Copy link

javidHao commented Aug 1, 2024

2024-08-01 09:24:19,859 - MqttThread - mqtt_connection.py[__publish_message] - INFO: publish message, rc= MQTTErrorCode.MQTT_ERR_SUCCESS, mid= 1, topic= $oc/devices/63fde63ea8097d234f681a7c_javid_test/sys/events/up, msg= {"object_device_id": "63fde63ea8097d234f681a7c_javid_test", "services": [{"service_id": "$log", "event_type": "log_report", "event_time": "2024-08-01T09:24:19Z", "event_id": "", "paras": {"timestamp": "1722500659859", "type": "DEVICE_STATUS", "content": "connect complete, the uri is 121.36.227.119"}}]}
2024-08-01 09:24:19,859 - MqttThread - mqtt_connection.py[__publish_buffer_message] - INFO: send buffer message end.
2024-08-01 09:24:20,840 - MainThread - device_reconnect_sample.py[run] - INFO: begin report message
2024-08-01 09:24:20,840 - MainThread - mqtt_connection.py[__publish_message] - INFO: publish message, rc= MQTTErrorCode.MQTT_ERR_SUCCESS, mid= 2, topic= $oc/devices/63fde63ea8097d234f681a7c_javid_test/sys/messages/up, msg= {"object_device_id": "", "id": "", "name": "", "content": "Hello Huawei 2024-08-01 09:24:20.840696"}
==success==
publish message success, mid = 2
2024-08-01 09:24:30,841 - MainThread - mqtt_connection.py[__publish_message] - INFO: publish message, rc= MQTTErrorCode.MQTT_ERR_SUCCESS, mid= 3, topic= $oc/devices/63fde63ea8097d234f681a7c_javid_test/sys/messages/up, msg= {"object_device_id": "", "id": "", "name": "", "content": "Hello Huawei 2024-08-01 09:24:30.841033"}
==success==
publish message success, mid = 3
2024-08-01 09:24:40,841 - MainThread - mqtt_connection.py[__publish_message] - INFO: publish message, rc= MQTTErrorCode.MQTT_ERR_SUCCESS, mid= 4, topic= $oc/devices/63fde63ea8097d234f681a7c_javid_test/sys/messages/up, msg= {"object_device_id": "", "id": "", "name": "", "content": "Hello Huawei 2024-08-01 09:24:40.841558"}
==success==
publish message success, mid = 4
2024-08-01 09:24:50,842 - MainThread - mqtt_connection.py[__publish_message] - INFO: publish message, rc= MQTTErrorCode.MQTT_ERR_SUCCESS, mid= 5, topic= $oc/devices/63fde63ea8097d234f681a7c_javid_test/sys/messages/up, msg= {"object_device_id": "", "id": "", "name": "", "content": "Hello Huawei 2024-08-01 09:24:50.842366"}
2024-08-01 09:25:00,842 - MainThread - mqtt_connection.py[__publish_message] - INFO: publish message, rc= MQTTErrorCode.MQTT_ERR_SUCCESS, mid= 6, topic= $oc/devices/63fde63ea8097d234f681a7c_javid_test/sys/messages/up, msg= {"object_device_id": "", "id": "", "name": "", "content": "Hello Huawei 2024-08-01 09:25:00.842766"}

从日志里可以看到mid: 5和mid: 6数据已经触发发送函数,并且回调函数已经注册到__publish_listener_dict字典里。
但其实我电脑的网络已经让我主动关闭了,导致mid: 5和mid: 6数据发送会失败,不会触发相应的回调函数,这两条数据就丢掉了。

以下是我的修改:
huaweicloud-iot-device-sdk-python-1.2.0\iot_device_sdk_python\service\abstract_device.py
`
def __publish_buffer_message(self):
try:
# 处理上一次连接失败后未触发回调的数据同时将buffer中的数据重新发布
for mid in self.__publish_listener_dict.keys():
message_publish_listener: ActionListener = self.__publish_listener_dict.get(mid)
if message_publish_listener is not None and isinstance(message_publish_listener, ActionListener):
message_publish_listener.on_failure("publish message failed, mid= %s" % (str(mid)), None)

        if self.__max_buffer_queue is None:
            return
        while len(self.__max_buffer_queue) > 0:
            buffer_message: BufferMessage = self.__max_buffer_queue.popleft()
            raw_message = buffer_message.raw_message
            listener = buffer_message.listener
            self.__publish_message(raw_message, listener)
    except Exception as e:
        self._logger.warning("publish buffer message failed. buffer queue is empty. err: %s", e)
        pass
    self._logger.info("send buffer message end.")

`

但是这种修改在回调函数里拿不到原始消息,导致失败了之后,应用层还得缓存原始数据。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant