Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented server keep alive setting and usage in connect #883

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Server keep alive: added configuration setting so that the broker can specify a keep alive other then the one selected by clients. (#789)
[feature] User properties: covered publish with user properties with tests and fixed publish of will messages. (#877)
[feature] Topic alias: implemented handling of topic alias received by publishers. (#873)
[feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858)
Expand Down
10 changes: 10 additions & 0 deletions broker/src/main/java/io/moquette/broker/BrokerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.moquette.broker.config.IConfig;

import java.util.Locale;
import java.util.Optional;

class BrokerConfiguration {

Expand All @@ -30,6 +31,7 @@ class BrokerConfiguration {
private final int topicAliasMaximum;
// integer max value means that the property is unset
private int receiveMaximum;
private Optional<Integer> serverKeepAlive = Optional.empty();

BrokerConfiguration(IConfig props) {
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
Expand Down Expand Up @@ -70,6 +72,10 @@ class BrokerConfiguration {
receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM);

topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS);

if (props.getProperty(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME) != null) {
serverKeepAlive = Optional.of((int) props.durationProp(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME).toMillis() / 1_000);
}
}

// test method
Expand Down Expand Up @@ -133,4 +139,8 @@ public int receiveMaximum() {
public int topicAliasMaximum() {
return topicAliasMaximum;
}

public Optional<Integer> getServerKeepAlive() {
return serverKeepAlive;
}
}
11 changes: 11 additions & 0 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum);
}

if (brokerConfig.getServerKeepAlive().isPresent()) {
connAckPropertiesBuilder.serverKeepAlive(brokerConfig.getServerKeepAlive().get());
}

final MqttProperties ackProperties = connAckPropertiesBuilder.build();
connAckBuilder.properties(ackProperties);
}
Expand Down Expand Up @@ -464,6 +468,13 @@ private void setupInflightResender(Channel channel) {

private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
int keepAlive = msg.variableHeader().keepAliveTimeSeconds();

// force server keep alive if configured
if (brokerConfig.getServerKeepAlive().isPresent()) {
int serverKeepAlive = brokerConfig.getServerKeepAlive().get();
LOG.info("Forcing server keep alive ({}) over client selection ({})", serverKeepAlive, keepAlive);
keepAlive = serverKeepAlive;
}
NettyUtils.keepAlive(channel, keepAlive);
NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
NettyUtils.clientID(channel, clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Locale;
import java.util.Properties;
import java.util.function.Consumer;
Expand All @@ -30,6 +31,7 @@
import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM;
import static io.moquette.broker.config.IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE;
import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SSL_PROVIDER;
Expand Down Expand Up @@ -215,6 +217,12 @@ public FluentConfig topicAliasMaximum(int topicAliasMaximum) {
return this;
}

public FluentConfig serverKeepAlive(Duration keepAliveSeconds) {
int seconds = (int) keepAliveSeconds.toMillis() / 1_000;
configAccumulator.put(SERVER_KEEP_ALIVE_PROPERTY_NAME, seconds + "s");
return this;
}

public class TLSConfig {

private SSLProvider providerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public abstract class IConfig {
public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos";
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;
public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum";
public static final String SERVER_KEEP_ALIVE_PROPERTY_NAME = "server_keep_alive";

public abstract void setProperty(String name, String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublishBuilder;
import io.moquette.broker.config.FluentConfig;
import io.moquette.broker.config.IConfig;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.*;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -70,6 +72,35 @@ public void simpleConnect() {
client.disconnect();
}

@Test
public void givenServerKeepAliveConfiguredThenConnectAckMustRespectIt() throws IOException {
stopServer();
IConfig config = new FluentConfig()
.dataPath(dbPath)
.enablePersistence()
.port(1883)
.disableTelemetry()
.persistentQueueType(FluentConfig.PersistentQueueType.SEGMENTED)
.serverKeepAlive(Duration.ofSeconds(12))
.build();
startServer(config);

Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier("simple_connect_test")
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
final Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Accept plain connection");
assertTrue(connectAck.getServerKeepAlive().isPresent());
connectAck.getServerKeepAlive().ifPresent(serverKeepAlive -> {
assertEquals(12 ,serverKeepAlive);
});

client.disconnect();
}

@Test
public void sendConnectOnDisconnectedConnection() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
Expand Down
9 changes: 9 additions & 0 deletions distribution/src/main/resources/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,12 @@ password_file config/password_file.conf
# default: 0 (disabled)
#*********************************************************************
# topic_alias_maximum 16

#*********************************************************************
# Keep alive provided by server, only for MQTT5.
#
# server_keep_alive:
# Option used to configure a server preferred keep alive that the client must respect.
# default: empty (disabled)
#*********************************************************************
# server_keep_alive 2s
Loading