Skip to content

Commit

Permalink
mqtt: clean session flag
Browse files Browse the repository at this point in the history
  • Loading branch information
mcspr committed Sep 13, 2024
1 parent 3b6f0a5 commit 737e76a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 32 deletions.
16 changes: 14 additions & 2 deletions code/espurna/config/general.h
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@
#endif

#ifndef MQTT_RETAIN
#define MQTT_RETAIN true // MQTT retain flag
#define MQTT_RETAIN 1 // MQTT retain flag
#endif

#ifndef MQTT_QOS
Expand All @@ -1036,7 +1036,19 @@
#endif

#ifndef MQTT_KEEPALIVE
#define MQTT_KEEPALIVE 120 // MQTT keep-alive interval (in seconds). 2 minutes by default. Cannot be zero.
#define MQTT_KEEPALIVE 120 // MQTT Keep Alive time, in seconds.
// Maximum amount of time without any communication between the server and the client before closing the connection.
// From client side, handled internally by counting amount of time between control packets and / or PINGREQ & PINGRESP.
// From server side, depends on the implementation (usually, also adds +50% of the original value)
// 2 minutes by default. Can be zero. Small values 1..5 *may* not be handled properly.
#endif

#ifndef MQTT_CLEAN_SESSION
#define MQTT_CLEAN_SESSION 0 // MQTT Clean Session flag.
// When disabled, server is expected to persist subscriptions and not-yet-delivered messages w/ QoS > 0 for the specified Client Id.
// (and *may* also store messages w/ QoS == 0, depends on the implementation)
// When enabled, any previously stored session data is discarded.
// Disabled by default.
#endif

#ifndef MQTT_SKIP_TIME
Expand Down
82 changes: 52 additions & 30 deletions code/espurna/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ constexpr KeepAlive keepalive() {
static_assert(keepalive() >= KeepaliveMin, "");
static_assert(keepalive() <= KeepaliveMax, "");

constexpr bool cleanSession() {
return 1 == MQTT_CLEAN_SESSION;
}

STRING_VIEW_INLINE(TopicWill, MQTT_TOPIC_STATUS);

constexpr int willQoS() {
Expand Down Expand Up @@ -272,6 +276,7 @@ STRING_VIEW_INLINE(Password, "mqttPassword");
STRING_VIEW_INLINE(QoS, "mqttQoS");
STRING_VIEW_INLINE(Retain, "mqttRetain");
STRING_VIEW_INLINE(Keepalive, "mqttKeep");
STRING_VIEW_INLINE(CleanSession, "mqttClean");
STRING_VIEW_INLINE(ClientId, "mqttClientID");
STRING_VIEW_INLINE(TopicWill, "mqttWill");
STRING_VIEW_INLINE(WillQoS, "mqttWillQoS");
Expand Down Expand Up @@ -350,6 +355,10 @@ espurna::mqtt::KeepAlive keepalive() {
build::KeepaliveMin, build::KeepaliveMax);
}

bool cleanSession() {
return getSetting(keys::CleanSession, build::cleanSession());
}

String clientId() {
return getSetting(keys::ClientId, systemIdentifier());
}
Expand Down Expand Up @@ -435,18 +444,19 @@ String NAME () {\
}

EXACT_VALUE(autoconnect, settings::autoconnect)
EXACT_VALUE(cleanSession, settings::cleanSession)
EXACT_VALUE(enabled, settings::enabled)
EXACT_VALUE(heartbeatInterval, settings::heartbeatInterval)
EXACT_VALUE(heartbeatMode, settings::heartbeatMode)
EXACT_VALUE(json, settings::json)
EXACT_VALUE(keepalive, settings::keepalive)
EXACT_VALUE(port, settings::port)
EXACT_VALUE(qos, settings::qos)
EXACT_VALUE(willQoS, settings::willQoS)
EXACT_VALUE(willRetain, settings::willRetain)
EXACT_VALUE(retain, settings::retain)
EXACT_VALUE(skipTime, settings::skipTime)
EXACT_VALUE(settings, settings::settings)
EXACT_VALUE(skipTime, settings::skipTime)
EXACT_VALUE(willQoS, settings::willQoS)
EXACT_VALUE(willRetain, settings::willRetain)

#undef EXACT_VALUE

Expand All @@ -461,10 +471,11 @@ static constexpr espurna::settings::query::Setting Settings[] PROGMEM {
{keys::WillRetain, internal::willRetain},
{keys::PayloadOffline, settings::payloadOffline},
{keys::PayloadOnline, settings::payloadOnline},
{keys::QoS, internal::qos},
{keys::Retain, internal::retain},
{keys::ClientId, settings::clientId},
{keys::QoS, internal::qos},
{keys::Keepalive, internal::keepalive},
{keys::CleanSession, internal::cleanSession},
{keys::ClientId, settings::clientId},
{keys::User, settings::user},
{keys::Password, settings::password},
{keys::Topic, settings::topic},
Expand Down Expand Up @@ -632,11 +643,12 @@ struct MqttConnectionSettings {
String server;
uint16_t port{};

String clientId;
String client_id;

bool retain { mqtt::build::retain() };
int qos { mqtt::build::qos() };
espurna::mqtt::KeepAlive keepalive { mqtt::build::keepalive() };
bool clean_session { mqtt::build::cleanSession() };

String topic;
String user;
Expand Down Expand Up @@ -904,9 +916,9 @@ namespace {

void _mqttSetupAsyncClient(bool secure = false) {
_mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_settings.port);
_mqtt.setClientId(_mqtt_settings.clientId.c_str());
_mqtt.setClientId(_mqtt_settings.client_id.c_str());
_mqtt.setKeepAlive(_mqtt_settings.keepalive.count());
_mqtt.setCleanSession(false);
_mqtt.setCleanSession(_mqtt_settings.clean_session);

_mqtt.setWill(
_mqtt_settings.will_topic.c_str(),
Expand Down Expand Up @@ -959,6 +971,24 @@ bool _mqttSetupSyncClient(bool secure = false) {
bool _mqttConnectSyncClient(bool secure = false) {
bool result = false;

const auto credentials =
_mqtt_settings.user.length()
&& _mqtt_settings.pass.length();

const auto* user =
credentials
? _mqtt_settings.user.c_str()
: nullptr;

const auto* pass =
credentials
? _mqtt_settings.pass.c_str()
: nullptr;

if (credentials) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), user);
}

#if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
_mqtt.begin(_mqtt_settings.server.c_str(),
_mqtt_settings.port,
Expand All @@ -967,32 +997,22 @@ bool _mqttConnectSyncClient(bool secure = false) {
_mqtt_payload_offline.c_str(),
_mqtt_settings.will_retain, _mqtt_settings.will_qos);
_mqtt.setKeepAlive(_mqtt_settings.keepalive.count());
_mqtt.setCleanSession(_mqtt_settings.clean_session);
result = _mqtt.connect(
_mqtt_settings.clientId.c_str(),
_mqtt_settings.user.c_str(),
_mqtt_settings.pass.c_str());
_mqtt_settings.client_id.c_str(), user, pass);
#elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
_mqtt.setClient(_mqttGetClient(secure));
_mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_settings.port);

if (_mqtt_settings.user.length() && _mqtt_settings.pass.length()) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_settings.user.c_str());
result = _mqtt.connect(
_mqtt_settings.clientId.c_str(),
_mqtt_settings.user.c_str(),
_mqtt_settings.pass.c_str(),
_mqtt_settings.will_topic.c_str(),
_mqtt_settings.will_qos,
_mqtt_settings.will_retain,
_mqtt_payload_offline.c_str());
} else {
result = _mqtt.connect(
_mqtt_settings.clientId.c_str(),
_mqtt_settings.will_topic.c_str(),
_mqtt_settings.will_qos,
_mqtt_settings.will_retain,
_mqtt_payload_offline.c_str());
}
result = _mqtt.connect(
_mqtt_settings.client_id.c_str(),
user,
pass,
_mqtt_settings.will_topic.c_str(),
_mqtt_settings.will_qos,
_mqtt_settings.will_retain,
_mqtt_payload_offline.c_str(),
_mqtt_settings.clean_session);
#endif

#if SECURE_CLIENT != SECURE_CLIENT_NONE
Expand Down Expand Up @@ -1162,7 +1182,7 @@ void _mqttConfigureImpl(bool reschedule) {
_mqttApplySetting(_mqtt_settings.pass,
mqtt::settings::password());

_mqttApplySetting(_mqtt_settings.clientId,
_mqttApplySetting(_mqtt_settings.client_id,
placeholders.replace(mqtt::settings::clientId()));

_mqttApplySetting(_mqtt_settings.qos,
Expand All @@ -1171,6 +1191,8 @@ void _mqttConfigureImpl(bool reschedule) {
mqtt::settings::retain());
_mqttApplySetting(_mqtt_settings.keepalive,
mqtt::settings::keepalive());
_mqttApplySetting(_mqtt_settings.clean_session,
mqtt::settings::cleanSession());

// Heartbeat messages that are supposed to be published when connected
_mqttApplySetting(_mqtt_heartbeat_mode,
Expand Down

0 comments on commit 737e76a

Please sign in to comment.