Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added configuration option for auto reconnect, reconnect delay, and max inflight #49

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ hs_err_pid*

.idea
*.iml
target
target

*.mapdb*
*SNAPSHOT.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,39 @@
"to connect to the MQTT broker. Once this time interval elapses, a timeout takes " +
"place.",
type = {DataType.INT},
optional = true, defaultValue = "30")

optional = true, defaultValue = "30"),
@Parameter(
name = "max.inflight",
description = "The maximum number of messages the MQTT client can send without receiving " +
"acknowledgments. The default value is 10",
type = {DataType.INT},
optional = true, defaultValue = "10"),
@Parameter(
name = "automatic.reconnect",
description = "This is an optional parameter. If set to true, in the event that the " +
"connection is lost, the client will attempt to reconnect to the server. It will " +
"initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
"attempt, the delay will double until it is at 2 minutes at which point the delay " +
"will stay at 2 minutes. " +
"If set to false, the client will not attempt to automatically reconnect to the " +
"server in the event that the connection is lost. " +
"The default value is `false`.",
type = {DataType.BOOL},
optional = true, defaultValue = "false"),
@Parameter(
name = "max.reconnect.delay",
description = "The maximum number of milliseconds the client could wait after the connection " +
"is lost. The default value is 128000",
type = {DataType.INT},
optional = true, defaultValue = "128000")
},
examples =
{
@Example(
syntax = "@sink(type='mqtt', url= 'tcp://localhost:1883', " +
"topic='mqtt_topic', clean.session='true', message.retain='false', " +
"quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
"max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
"@map(type='xml'))" +
"Define stream BarStream (symbol string, price float, volume long);",
description = "This query publishes events to a stream named `BarStream` via the " +
Expand All @@ -165,6 +189,9 @@ public class MqttSink extends Sink {
private boolean cleanSession;
private int keepAlive;
private int connectionTimeout;
private int maxInflight;
private boolean automaticReconnect;
private int maxReconnectDelay;
private MqttClient client;
private Option messageRetainOption;
private StreamDefinition streamDefinition;
Expand Down Expand Up @@ -192,6 +219,14 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
MqttConstants.DEFAULT_MESSAGE_RETAIN);
this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_INFLIGHT,
MqttConstants.DEFAULT_MAX_INFLIGHT));
this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_RECONNECT_DELAY,
MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
return null;
}

Expand Down Expand Up @@ -225,6 +260,9 @@ public void connect() throws ConnectionUnavailableException {
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
connectionOptions.setMaxInflight(maxInflight);
connectionOptions.setAutomaticReconnect(automaticReconnect);
connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
client.connect(connectionOptions);

} catch (MqttException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,42 @@
"place.",
type = {DataType.INT},
optional = true,
defaultValue = "30")
defaultValue = "30"),
@Parameter(
name = "max.inflight",
description = "The maximum number of messages the MQTT client can send without receiving " +
"acknowledgments. The default value is 10",
type = {DataType.INT},
optional = true,
defaultValue = "10"),
@Parameter(
name = "automatic.reconnect",
description = "This is an optional parameter. If set to true, in the event that the " +
"connection is lost, the client will attempt to reconnect to the server. It will " +
"initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
"attempt, the delay will double until it is at 2 minutes at which point the delay " +
"will stay at 2 minutes. " +
"If set to false, the client will not attempt to automatically reconnect to the " +
"server in the event that the connection is lost. " +
"The default value is `false`.",
type = {DataType.BOOL},
optional = true,
defaultValue = "false"),
@Parameter(
name = "max.reconnect.delay",
description = "The maximum number of milliseconds the client could wait after the connection " +
"is lost. The default value is 128000",
type = {DataType.INT},
optional = true,
defaultValue = "128000")
},
examples =
{
@Example(
syntax = "@source(type='mqtt', url= 'tcp://localhost:1883', " +
"topic='mqtt_topic', clean.session='true'," +
"quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
"max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
"@map(type='xml'))" +
"Define stream BarStream (symbol string, price float, volume long);",
description = "This query receives events from the `mqtt_topic` topic via MQTT," +
Expand All @@ -153,6 +181,9 @@ public class MqttSource extends Source {
private boolean cleanSession;
private int keepAlive;
private int connectionTimeout;
private int maxInflight;
private boolean automaticReconnect;
private int maxReconnectDelay;
private MqttClient client;
private MqttConsumer mqttConsumer;
private String siddhiAppName;
Expand All @@ -178,6 +209,14 @@ public StateFactory init(SourceEventListener sourceEventListener, OptionHolder o
MqttConstants.DEFAULT_CONNECTION_TIMEOUT_INTERVAL));
this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_INFLIGHT,
MqttConstants.DEFAULT_MAX_INFLIGHT));
this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_RECONNECT_DELAY,
MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
this.mqttConsumer = new MqttConsumer(sourceEventListener);
return null;
}
Expand All @@ -200,6 +239,9 @@ public void connect(ConnectionCallback connectionCallback, State state) throws C
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
connectionOptions.setMaxInflight(maxInflight);
connectionOptions.setAutomaticReconnect(automaticReconnect);
connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
client.connect(connectionOptions);
int qos = Integer.parseInt(String.valueOf(qosOption));
mqttConsumer.subscribe(topicOption, qos, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ private MqttConstants() {
public static final String DEFAULT_USERNAME = null;
public static final String CONNECTION_TIMEOUT_INTERVAL = "connection.timeout";
public static final String DEFAULT_CONNECTION_TIMEOUT_INTERVAL = "30";



public static final String MAX_INFLIGHT = "max.inflight";
public static final String DEFAULT_MAX_INFLIGHT = "10";
public static final String AUTOMATIC_RECONNECT = "automatic.reconnect";
public static final String DEFAULT_AUTOMATIC_RECONNECT = "false";
public static final String MAX_RECONNECT_DELAY = "max.reconnect.delay";
public static final String DEFAULT_MAX_RECONNECT_DELAY = "128000";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.siddhi.extension.io.mqtt.sink;

import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.input.InputHandler;
import org.apache.log4j.Logger;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.rmi.RemoteException;
import java.util.Properties;

public class MqttSinkAutomaticReconnectTest {
private volatile int count;
private volatile boolean eventArrived;
private static final Logger log = Logger.getLogger(MqttSinkAutomaticReconnectTest.class);
private static final Server mqttBroker = new Server();
private MqttTestClient mqttTestClient;
private static final Properties properties = new Properties();

@BeforeMethod
public void initBeforeMethod() {
count = 0;
eventArrived = false;
}

@BeforeClass
public static void init() throws Exception {
try {
properties.put("port", Integer.toString(1883));
properties.put("host", "0.0.0.0");
final IConfig config = new MemoryConfig(properties);
mqttBroker.startServer(config);
Thread.sleep(1000);
} catch (Exception e) {
throw new RemoteException("Exception caught when starting server", e);
}
}

@AfterClass
public static void stop() {
mqttBroker.stopServer();
}

@Test
public void mqttPublishEventWithAutomaticReconnect() {
log.info("Test for Mqtt publish events with automatic reconnect enabled");
SiddhiManager siddhiManager = new SiddhiManager();
ResultContainer resultContainer = new ResultContainer(3);
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
"define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ "topic='mqtt_publish_event_with_automatic_reconnect',username='mqtt-user', "
+ "password='mqtt-password', clean.session='true', message.retain='false', "
+ "automatic.reconnect='true', keep.alive= '60'," + "@map(type='xml'))"
+ "Define stream BarStream (symbol string, price float, volume long);"
+ "from FooStream select symbol, price, volume insert into BarStream;");
InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
try {
this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
"mqtt_publish_event_with_automatic_reconnect", 1,
resultContainer, true, false);
} catch (ConnectionUnavailableException e) {
AssertJUnit.fail("Could not connect to broker.");
}
siddhiAppRuntime.start();
try {
fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
fooStream.send(new Object[] { "IBM", 75.6f, 100L });
Thread.sleep(500);
mqttBroker.stopServer();
fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
} catch (InterruptedException e) {
AssertJUnit.fail("Thread sleep was interrupted");
}
try {
final IConfig config = new MemoryConfig(properties);
mqttBroker.startServer(config);
Thread.sleep(2000);
fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
Thread.sleep(500);
} catch (Exception e) {
AssertJUnit.fail("Thread sleep was interrupted");
}
count = mqttTestClient.getCount();
eventArrived = mqttTestClient.getEventArrived();
AssertJUnit.assertEquals(3, count);
AssertJUnit.assertTrue(eventArrived);
AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
AssertJUnit.assertTrue(resultContainer.assertMessageContent("JAMES"));
siddhiAppRuntime.shutdown();
}

@Test
public void mqttPublishEventWithoutAutomaticReconnect() {
log.info("Test for Mqtt publish events with automatic reconnect disabled");
SiddhiManager siddhiManager = new SiddhiManager();
ResultContainer resultContainer = new ResultContainer(2);
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
"define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ "topic='mqtt_publish_event_without_automatic_reconnect',username='mqtt-user', "
+ "password='mqtt-password', clean.session='true', message.retain='false', "
+ "automatic.reconnect='false', keep.alive= '60'," + "@map(type='xml'))"
+ "Define stream BarStream (symbol string, price float, volume long);"
+ "from FooStream select symbol, price, volume insert into BarStream;");
InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
try {
this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
"mqtt_publish_event_without_automatic_reconnect", 1, resultContainer,
true, false);
} catch (ConnectionUnavailableException e) {
AssertJUnit.fail("Could not connect to broker.");
}
siddhiAppRuntime.start();
try {
fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
fooStream.send(new Object[] { "IBM", 75.6f, 100L });
Thread.sleep(500);
mqttBroker.stopServer();
fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
} catch (InterruptedException e) {
AssertJUnit.fail("Thread sleep was interrupted");
}
try {
final IConfig config = new MemoryConfig(properties);
mqttBroker.startServer(config);
Thread.sleep(2000);
fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
Thread.sleep(500);
} catch (Exception e) {
AssertJUnit.fail("Thread sleep was interrupted");
}

count = mqttTestClient.getCount();
eventArrived = mqttTestClient.getEventArrived();
AssertJUnit.assertEquals(2, count);
AssertJUnit.assertTrue(eventArrived);
AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
siddhiAppRuntime.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class MqttTestClient {
private String clientId;
private String userName = null;
private String userPassword = "";
private boolean cleanSession = true;
private boolean eventArrived;
private int count;
private int keepAlive = 60;
Expand Down Expand Up @@ -74,6 +73,12 @@ public boolean getEventArrived() {

public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer)
throws ConnectionUnavailableException {
this(brokerURL, topic, qos, resultContainer, false, true);
}

public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer,
boolean automaticReconnect, boolean cleanSession)
throws ConnectionUnavailableException {
this.resultContainer = resultContainer;
try {
persistence = new MemoryPersistence();
Expand All @@ -85,6 +90,7 @@ public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer r
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
connectionOptions.setAutomaticReconnect(automaticReconnect);
client.connect(connectionOptions);
} catch (MqttException e) {
throw new ConnectionUnavailableException(
Expand Down
Loading