From a1d301c17f638afb98aff5c47f509bcb63a1b12c Mon Sep 17 00:00:00 2001 From: amnonbb Date: Fri, 25 Aug 2023 08:14:59 +0300 Subject: [PATCH] Set mqtt ping timeout --- api/app.go | 3 ++- api/mqtt.go | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/api/app.go b/api/app.go index 5c8d6ae..b9789d2 100644 --- a/api/app.go +++ b/api/app.go @@ -109,10 +109,11 @@ func (a *App) initMQTT() { opts.SetUsername(common.USERNAME) opts.SetPassword(common.PASSWORD) opts.SetKeepAlive(10 * time.Second) + opts.SetPingTimeout(5 * time.Second) opts.SetAutoReconnect(true) opts.SetOnConnectHandler(a.SubMQTT) opts.SetConnectionLostHandler(a.LostMQTT) - opts.SetBinaryWill(common.ExecStatusTopic, []byte("Offline"), byte(2), true) + opts.SetBinaryWill(common.ExecStatusTopic, []byte("Offline"), byte(1), true) a.Msg = mqtt.NewClient(opts) if token := a.Msg.Connect(); token.Wait() && token.Error() != nil { err := token.Error() diff --git a/api/mqtt.go b/api/mqtt.go index 3ac92eb..eb84849 100644 --- a/api/mqtt.go +++ b/api/mqtt.go @@ -30,29 +30,29 @@ func (a *App) SubMQTT(c mqtt.Client) { log.Info().Str("source", "MQTT").Msg("- Connected -") - if token := a.Msg.Publish(common.ExecStatusTopic, byte(2), true, []byte("Online")); token.Wait() && token.Error() != nil { + if token := a.Msg.Publish(common.ExecStatusTopic, byte(1), true, []byte("Online")); token.Wait() && token.Error() != nil { log.Error().Str("source", "MQTT").Err(token.Error()).Msg("Send status") } - if token := a.Msg.Subscribe(common.ExecServiceTopic, byte(2), a.execMessage); token.Wait() && token.Error() != nil { + if token := a.Msg.Subscribe(common.ExecServiceTopic, byte(1), a.execMessage); token.Wait() && token.Error() != nil { log.Fatal().Str("source", "MQTT").Err(token.Error()).Msg("Subscription error") } else { log.Info().Str("source", "MQTT").Msg("Subscription - " + common.ExecServiceTopic) } - if token := a.Msg.Subscribe(common.ExecStateTopic, byte(2), a.ExecState); token.Wait() && token.Error() != nil { + if token := a.Msg.Subscribe(common.ExecStateTopic, byte(1), a.ExecState); token.Wait() && token.Error() != nil { log.Fatal().Str("source", "MQTT").Err(token.Error()).Msg("Subscription error") } else { log.Info().Str("source", "MQTT").Msg("Subscription - " + common.ExecStateTopic) } - if token := a.Msg.Subscribe(common.WorkflowServiceTopic, byte(2), wf.MqttMessage); token.Wait() && token.Error() != nil { + if token := a.Msg.Subscribe(common.WorkflowServiceTopic, byte(1), wf.MqttMessage); token.Wait() && token.Error() != nil { log.Fatal().Str("source", "MQTT").Err(token.Error()).Msg("Subscription error") } else { log.Info().Str("source", "MQTT").Msg("Subscription - " + common.WorkflowServiceTopic) } - if token := a.Msg.Subscribe(common.WorkflowStateTopic, byte(2), wf.SetState); token.Wait() && token.Error() != nil { + if token := a.Msg.Subscribe(common.WorkflowStateTopic, byte(1), wf.SetState); token.Wait() && token.Error() != nil { log.Fatal().Str("source", "MQTT").Err(token.Error()).Msg("Subscription error") } else { log.Info().Str("source", "MQTT").Msg("Subscription - " + common.WorkflowStateTopic) @@ -120,7 +120,7 @@ func (a *App) SendRespond(id string, m *MqttPayload) { } text := fmt.Sprintf(string(message)) - if token := a.Msg.Publish(topic, byte(2), false, text); token.Wait() && token.Error() != nil { + if token := a.Msg.Publish(topic, byte(1), false, text); token.Wait() && token.Error() != nil { log.Error().Str("source", "MQTT").Err(err).Msg("Send Respond") } }