diff --git a/ChangeLog.txt b/ChangeLog.txt index 6c7741633..1c036e81c 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] Server keep alive: added configuration setting so that the broker can specify a keep alive other then the one selected by clients. (#789) [feature] User properties: covered publish with user properties with tests and fixed publish of will messages. (#877) [feature] Topic alias: implemented handling of topic alias received by publishers. (#873) [feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858) diff --git a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java index 421ad5fbc..dfbc14363 100644 --- a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java +++ b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java @@ -19,6 +19,7 @@ import io.moquette.broker.config.IConfig; import java.util.Locale; +import java.util.Optional; class BrokerConfiguration { @@ -30,6 +31,7 @@ class BrokerConfiguration { private final int topicAliasMaximum; // integer max value means that the property is unset private int receiveMaximum; + private Optional serverKeepAlive = Optional.empty(); BrokerConfiguration(IConfig props) { allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true); @@ -70,6 +72,10 @@ class BrokerConfiguration { receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM); topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS); + + if (props.getProperty(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME) != null) { + serverKeepAlive = Optional.of((int) props.durationProp(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME).toMillis() / 1_000); + } } // test method @@ -133,4 +139,8 @@ public int receiveMaximum() { public int topicAliasMaximum() { return topicAliasMaximum; } + + public Optional getServerKeepAlive() { + return serverKeepAlive; + } } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 74085c400..a7930c860 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -358,6 +358,10 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum); } + if (brokerConfig.getServerKeepAlive().isPresent()) { + connAckPropertiesBuilder.serverKeepAlive(brokerConfig.getServerKeepAlive().get()); + } + final MqttProperties ackProperties = connAckPropertiesBuilder.build(); connAckBuilder.properties(ackProperties); } @@ -464,6 +468,13 @@ private void setupInflightResender(Channel channel) { private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) { int keepAlive = msg.variableHeader().keepAliveTimeSeconds(); + + // force server keep alive if configured + if (brokerConfig.getServerKeepAlive().isPresent()) { + int serverKeepAlive = brokerConfig.getServerKeepAlive().get(); + LOG.info("Forcing server keep alive ({}) over client selection ({})", serverKeepAlive, keepAlive); + keepAlive = serverKeepAlive; + } NettyUtils.keepAlive(channel, keepAlive); NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession()); NettyUtils.clientID(channel, clientId); diff --git a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java index 93d219fb1..46f296e9c 100644 --- a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.Locale; import java.util.Properties; import java.util.function.Consumer; @@ -30,6 +31,7 @@ import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM; +import static io.moquette.broker.config.IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE; import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SSL_PROVIDER; @@ -215,6 +217,12 @@ public FluentConfig topicAliasMaximum(int topicAliasMaximum) { return this; } + public FluentConfig serverKeepAlive(Duration keepAliveSeconds) { + int seconds = (int) keepAliveSeconds.toMillis() / 1_000; + configAccumulator.put(SERVER_KEEP_ALIVE_PROPERTY_NAME, seconds + "s"); + return this; + } + public class TLSConfig { private SSLProvider providerType; diff --git a/broker/src/main/java/io/moquette/broker/config/IConfig.java b/broker/src/main/java/io/moquette/broker/config/IConfig.java index ca2ef181d..f73fdad32 100644 --- a/broker/src/main/java/io/moquette/broker/config/IConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/IConfig.java @@ -68,6 +68,7 @@ public abstract class IConfig { public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos"; public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092; public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum"; + public static final String SERVER_KEEP_ALIVE_PROPERTY_NAME = "server_keep_alive"; public abstract void setProperty(String name, String value); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java index a977e7693..906a83970 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java @@ -13,6 +13,8 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublishBuilder; +import io.moquette.broker.config.FluentConfig; +import io.moquette.broker.config.IConfig; import io.moquette.testclient.Client; import io.netty.handler.codec.mqtt.*; import org.awaitility.Awaitility; @@ -70,6 +72,35 @@ public void simpleConnect() { client.disconnect(); } + @Test + public void givenServerKeepAliveConfiguredThenConnectAckMustRespectIt() throws IOException { + stopServer(); + IConfig config = new FluentConfig() + .dataPath(dbPath) + .enablePersistence() + .port(1883) + .disableTelemetry() + .persistentQueueType(FluentConfig.PersistentQueueType.SEGMENTED) + .serverKeepAlive(Duration.ofSeconds(12)) + .build(); + startServer(config); + + Mqtt5BlockingClient client = MqttClient.builder() + .useMqttVersion5() + .identifier("simple_connect_test") + .serverHost("localhost") + .serverPort(1883) + .buildBlocking(); + final Mqtt5ConnAck connectAck = client.connect(); + assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Accept plain connection"); + assertTrue(connectAck.getServerKeepAlive().isPresent()); + connectAck.getServerKeepAlive().ifPresent(serverKeepAlive -> { + assertEquals(12 ,serverKeepAlive); + }); + + client.disconnect(); + } + @Test public void sendConnectOnDisconnectedConnection() throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(); diff --git a/distribution/src/main/resources/moquette.conf b/distribution/src/main/resources/moquette.conf index b863cced4..b13faad63 100644 --- a/distribution/src/main/resources/moquette.conf +++ b/distribution/src/main/resources/moquette.conf @@ -234,3 +234,12 @@ password_file config/password_file.conf # default: 0 (disabled) #********************************************************************* # topic_alias_maximum 16 + +#********************************************************************* +# Keep alive provided by server, only for MQTT5. +# +# server_keep_alive: +# Option used to configure a server preferred keep alive that the client must respect. +# default: empty (disabled) +#********************************************************************* +# server_keep_alive 2s