From 737e76a73600046e38e6b96e63ef5ed0d4d47f38 Mon Sep 17 00:00:00 2001 From: Maxim Prokhorov Date: Thu, 12 Sep 2024 22:16:25 +0300 Subject: [PATCH] mqtt: clean session flag --- code/espurna/config/general.h | 16 ++++++- code/espurna/mqtt.cpp | 82 ++++++++++++++++++++++------------- 2 files changed, 66 insertions(+), 32 deletions(-) diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index 94f29fae8..33e57893c 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -1026,7 +1026,7 @@ #endif #ifndef MQTT_RETAIN -#define MQTT_RETAIN true // MQTT retain flag +#define MQTT_RETAIN 1 // MQTT retain flag #endif #ifndef MQTT_QOS @@ -1036,7 +1036,19 @@ #endif #ifndef MQTT_KEEPALIVE -#define MQTT_KEEPALIVE 120 // MQTT keep-alive interval (in seconds). 2 minutes by default. Cannot be zero. +#define MQTT_KEEPALIVE 120 // MQTT Keep Alive time, in seconds. + // Maximum amount of time without any communication between the server and the client before closing the connection. + // From client side, handled internally by counting amount of time between control packets and / or PINGREQ & PINGRESP. + // From server side, depends on the implementation (usually, also adds +50% of the original value) + // 2 minutes by default. Can be zero. Small values 1..5 *may* not be handled properly. +#endif + +#ifndef MQTT_CLEAN_SESSION +#define MQTT_CLEAN_SESSION 0 // MQTT Clean Session flag. + // When disabled, server is expected to persist subscriptions and not-yet-delivered messages w/ QoS > 0 for the specified Client Id. + // (and *may* also store messages w/ QoS == 0, depends on the implementation) + // When enabled, any previously stored session data is discarded. + // Disabled by default. #endif #ifndef MQTT_SKIP_TIME diff --git a/code/espurna/mqtt.cpp b/code/espurna/mqtt.cpp index a2aa55abb..c6e7a09b3 100644 --- a/code/espurna/mqtt.cpp +++ b/code/espurna/mqtt.cpp @@ -203,6 +203,10 @@ constexpr KeepAlive keepalive() { static_assert(keepalive() >= KeepaliveMin, ""); static_assert(keepalive() <= KeepaliveMax, ""); +constexpr bool cleanSession() { + return 1 == MQTT_CLEAN_SESSION; +} + STRING_VIEW_INLINE(TopicWill, MQTT_TOPIC_STATUS); constexpr int willQoS() { @@ -272,6 +276,7 @@ STRING_VIEW_INLINE(Password, "mqttPassword"); STRING_VIEW_INLINE(QoS, "mqttQoS"); STRING_VIEW_INLINE(Retain, "mqttRetain"); STRING_VIEW_INLINE(Keepalive, "mqttKeep"); +STRING_VIEW_INLINE(CleanSession, "mqttClean"); STRING_VIEW_INLINE(ClientId, "mqttClientID"); STRING_VIEW_INLINE(TopicWill, "mqttWill"); STRING_VIEW_INLINE(WillQoS, "mqttWillQoS"); @@ -350,6 +355,10 @@ espurna::mqtt::KeepAlive keepalive() { build::KeepaliveMin, build::KeepaliveMax); } +bool cleanSession() { + return getSetting(keys::CleanSession, build::cleanSession()); +} + String clientId() { return getSetting(keys::ClientId, systemIdentifier()); } @@ -435,6 +444,7 @@ String NAME () {\ } EXACT_VALUE(autoconnect, settings::autoconnect) +EXACT_VALUE(cleanSession, settings::cleanSession) EXACT_VALUE(enabled, settings::enabled) EXACT_VALUE(heartbeatInterval, settings::heartbeatInterval) EXACT_VALUE(heartbeatMode, settings::heartbeatMode) @@ -442,11 +452,11 @@ EXACT_VALUE(json, settings::json) EXACT_VALUE(keepalive, settings::keepalive) EXACT_VALUE(port, settings::port) EXACT_VALUE(qos, settings::qos) -EXACT_VALUE(willQoS, settings::willQoS) -EXACT_VALUE(willRetain, settings::willRetain) EXACT_VALUE(retain, settings::retain) -EXACT_VALUE(skipTime, settings::skipTime) EXACT_VALUE(settings, settings::settings) +EXACT_VALUE(skipTime, settings::skipTime) +EXACT_VALUE(willQoS, settings::willQoS) +EXACT_VALUE(willRetain, settings::willRetain) #undef EXACT_VALUE @@ -461,10 +471,11 @@ static constexpr espurna::settings::query::Setting Settings[] PROGMEM { {keys::WillRetain, internal::willRetain}, {keys::PayloadOffline, settings::payloadOffline}, {keys::PayloadOnline, settings::payloadOnline}, - {keys::QoS, internal::qos}, {keys::Retain, internal::retain}, - {keys::ClientId, settings::clientId}, + {keys::QoS, internal::qos}, {keys::Keepalive, internal::keepalive}, + {keys::CleanSession, internal::cleanSession}, + {keys::ClientId, settings::clientId}, {keys::User, settings::user}, {keys::Password, settings::password}, {keys::Topic, settings::topic}, @@ -632,11 +643,12 @@ struct MqttConnectionSettings { String server; uint16_t port{}; - String clientId; + String client_id; bool retain { mqtt::build::retain() }; int qos { mqtt::build::qos() }; espurna::mqtt::KeepAlive keepalive { mqtt::build::keepalive() }; + bool clean_session { mqtt::build::cleanSession() }; String topic; String user; @@ -904,9 +916,9 @@ namespace { void _mqttSetupAsyncClient(bool secure = false) { _mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_settings.port); - _mqtt.setClientId(_mqtt_settings.clientId.c_str()); + _mqtt.setClientId(_mqtt_settings.client_id.c_str()); _mqtt.setKeepAlive(_mqtt_settings.keepalive.count()); - _mqtt.setCleanSession(false); + _mqtt.setCleanSession(_mqtt_settings.clean_session); _mqtt.setWill( _mqtt_settings.will_topic.c_str(), @@ -959,6 +971,24 @@ bool _mqttSetupSyncClient(bool secure = false) { bool _mqttConnectSyncClient(bool secure = false) { bool result = false; + const auto credentials = + _mqtt_settings.user.length() + && _mqtt_settings.pass.length(); + + const auto* user = + credentials + ? _mqtt_settings.user.c_str() + : nullptr; + + const auto* pass = + credentials + ? _mqtt_settings.pass.c_str() + : nullptr; + + if (credentials) { + DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), user); + } + #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT _mqtt.begin(_mqtt_settings.server.c_str(), _mqtt_settings.port, @@ -967,32 +997,22 @@ bool _mqttConnectSyncClient(bool secure = false) { _mqtt_payload_offline.c_str(), _mqtt_settings.will_retain, _mqtt_settings.will_qos); _mqtt.setKeepAlive(_mqtt_settings.keepalive.count()); + _mqtt.setCleanSession(_mqtt_settings.clean_session); result = _mqtt.connect( - _mqtt_settings.clientId.c_str(), - _mqtt_settings.user.c_str(), - _mqtt_settings.pass.c_str()); + _mqtt_settings.client_id.c_str(), user, pass); #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT _mqtt.setClient(_mqttGetClient(secure)); _mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_settings.port); - if (_mqtt_settings.user.length() && _mqtt_settings.pass.length()) { - DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_settings.user.c_str()); - result = _mqtt.connect( - _mqtt_settings.clientId.c_str(), - _mqtt_settings.user.c_str(), - _mqtt_settings.pass.c_str(), - _mqtt_settings.will_topic.c_str(), - _mqtt_settings.will_qos, - _mqtt_settings.will_retain, - _mqtt_payload_offline.c_str()); - } else { - result = _mqtt.connect( - _mqtt_settings.clientId.c_str(), - _mqtt_settings.will_topic.c_str(), - _mqtt_settings.will_qos, - _mqtt_settings.will_retain, - _mqtt_payload_offline.c_str()); - } + result = _mqtt.connect( + _mqtt_settings.client_id.c_str(), + user, + pass, + _mqtt_settings.will_topic.c_str(), + _mqtt_settings.will_qos, + _mqtt_settings.will_retain, + _mqtt_payload_offline.c_str(), + _mqtt_settings.clean_session); #endif #if SECURE_CLIENT != SECURE_CLIENT_NONE @@ -1162,7 +1182,7 @@ void _mqttConfigureImpl(bool reschedule) { _mqttApplySetting(_mqtt_settings.pass, mqtt::settings::password()); - _mqttApplySetting(_mqtt_settings.clientId, + _mqttApplySetting(_mqtt_settings.client_id, placeholders.replace(mqtt::settings::clientId())); _mqttApplySetting(_mqtt_settings.qos, @@ -1171,6 +1191,8 @@ void _mqttConfigureImpl(bool reschedule) { mqtt::settings::retain()); _mqttApplySetting(_mqtt_settings.keepalive, mqtt::settings::keepalive()); + _mqttApplySetting(_mqtt_settings.clean_session, + mqtt::settings::cleanSession()); // Heartbeat messages that are supposed to be published when connected _mqttApplySetting(_mqtt_heartbeat_mode,