diff --git a/.gitignore b/.gitignore
index ea8bfa0..462ff49 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,4 +23,7 @@ hs_err_pid*
.idea
*.iml
-target
\ No newline at end of file
+target
+
+*.mapdb*
+*SNAPSHOT.md
\ No newline at end of file
diff --git a/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java b/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java
index ee315b7..2a28478 100644
--- a/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java
+++ b/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java
@@ -134,8 +134,31 @@
"to connect to the MQTT broker. Once this time interval elapses, a timeout takes " +
"place.",
type = {DataType.INT},
- optional = true, defaultValue = "30")
-
+ optional = true, defaultValue = "30"),
+ @Parameter(
+ name = "max.inflight",
+ description = "The maximum number of messages the MQTT client can send without receiving " +
+ "acknowledgments. The default value is 10",
+ type = {DataType.INT},
+ optional = true, defaultValue = "10"),
+ @Parameter(
+ name = "automatic.reconnect",
+ description = "This is an optional parameter. If set to true, in the event that the " +
+ "connection is lost, the client will attempt to reconnect to the server. It will " +
+ "initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
+ "attempt, the delay will double until it is at 2 minutes at which point the delay " +
+ "will stay at 2 minutes. " +
+ "If set to false, the client will not attempt to automatically reconnect to the " +
+ "server in the event that the connection is lost. " +
+ "The default value is `false`.",
+ type = {DataType.BOOL},
+ optional = true, defaultValue = "false"),
+ @Parameter(
+ name = "max.reconnect.delay",
+ description = "The maximum number of milliseconds the client could wait after the connection " +
+ "is lost. The default value is 128000",
+ type = {DataType.INT},
+ optional = true, defaultValue = "128000")
},
examples =
{
@@ -143,6 +166,7 @@
syntax = "@sink(type='mqtt', url= 'tcp://localhost:1883', " +
"topic='mqtt_topic', clean.session='true', message.retain='false', " +
"quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
+ "max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
"@map(type='xml'))" +
"Define stream BarStream (symbol string, price float, volume long);",
description = "This query publishes events to a stream named `BarStream` via the " +
@@ -164,6 +188,9 @@ public class MqttSink extends Sink {
private boolean cleanSession;
private int keepAlive;
private int connectionTimeout;
+ private int maxInflight;
+ private boolean automaticReconnect;
+ private int maxReconnectDelay;
private MqttClient client;
private Option messageRetainOption;
private StreamDefinition streamDefinition;
@@ -191,6 +218,14 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
MqttConstants.DEFAULT_MESSAGE_RETAIN);
this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
+ this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
+ (MqttConstants.MAX_INFLIGHT,
+ MqttConstants.DEFAULT_MAX_INFLIGHT));
+ this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
+ (MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
+ this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
+ (MqttConstants.MAX_RECONNECT_DELAY,
+ MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
return null;
}
@@ -224,6 +259,9 @@ public void connect() throws ConnectionUnavailableException {
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
+ connectionOptions.setMaxInflight(maxInflight);
+ connectionOptions.setAutomaticReconnect(automaticReconnect);
+ connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
client.connect(connectionOptions);
} catch (MqttException e) {
diff --git a/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java b/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java
index b1c5885..11b3909 100644
--- a/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java
+++ b/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java
@@ -125,7 +125,34 @@
"place.",
type = {DataType.INT},
optional = true,
- defaultValue = "30")
+ defaultValue = "30"),
+ @Parameter(
+ name = "max.inflight",
+ description = "The maximum number of messages the MQTT client can send without receiving " +
+ "acknowledgments. The default value is 10",
+ type = {DataType.INT},
+ optional = true,
+ defaultValue = "10"),
+ @Parameter(
+ name = "automatic.reconnect",
+ description = "This is an optional parameter. If set to true, in the event that the " +
+ "connection is lost, the client will attempt to reconnect to the server. It will " +
+ "initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
+ "attempt, the delay will double until it is at 2 minutes at which point the delay " +
+ "will stay at 2 minutes. " +
+ "If set to false, the client will not attempt to automatically reconnect to the " +
+ "server in the event that the connection is lost. " +
+ "The default value is `false`.",
+ type = {DataType.BOOL},
+ optional = true,
+ defaultValue = "false"),
+ @Parameter(
+ name = "max.reconnect.delay",
+ description = "The maximum number of milliseconds the client could wait after the connection " +
+ "is lost. The default value is 128000",
+ type = {DataType.INT},
+ optional = true,
+ defaultValue = "128000")
},
examples =
{
@@ -133,6 +160,7 @@
syntax = "@source(type='mqtt', url= 'tcp://localhost:1883', " +
"topic='mqtt_topic', clean.session='true'," +
"quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
+ "max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
"@map(type='xml'))" +
"Define stream BarStream (symbol string, price float, volume long);",
description = "This query receives events from the `mqtt_topic` topic via MQTT," +
@@ -152,6 +180,9 @@ public class MqttSource extends Source {
private boolean cleanSession;
private int keepAlive;
private int connectionTimeout;
+ private int maxInflight;
+ private boolean automaticReconnect;
+ private int maxReconnectDelay;
private MqttClient client;
private MqttConsumer mqttConsumer;
private String siddhiAppName;
@@ -177,6 +208,14 @@ public StateFactory init(SourceEventListener sourceEventListener, OptionHolder o
MqttConstants.DEFAULT_CONNECTION_TIMEOUT_INTERVAL));
this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
+ this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
+ (MqttConstants.MAX_INFLIGHT,
+ MqttConstants.DEFAULT_MAX_INFLIGHT));
+ this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
+ (MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
+ this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
+ (MqttConstants.MAX_RECONNECT_DELAY,
+ MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
this.mqttConsumer = new MqttConsumer(sourceEventListener);
return null;
}
@@ -199,6 +238,9 @@ public void connect(ConnectionCallback connectionCallback, State state) throws C
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
+ connectionOptions.setMaxInflight(maxInflight);
+ connectionOptions.setAutomaticReconnect(automaticReconnect);
+ connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
client.connect(connectionOptions);
int qos = Integer.parseInt(String.valueOf(qosOption));
mqttConsumer.subscribe(topicOption, qos, client);
diff --git a/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java b/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java
index d7fbe49..a025204 100644
--- a/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java
+++ b/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java
@@ -42,7 +42,10 @@ private MqttConstants() {
public static final String DEFAULT_USERNAME = null;
public static final String CONNECTION_TIMEOUT_INTERVAL = "connection.timeout";
public static final String DEFAULT_CONNECTION_TIMEOUT_INTERVAL = "30";
-
-
-
+ public static final String MAX_INFLIGHT = "max.inflight";
+ public static final String DEFAULT_MAX_INFLIGHT = "10";
+ public static final String AUTOMATIC_RECONNECT = "automatic.reconnect";
+ public static final String DEFAULT_AUTOMATIC_RECONNECT = "false";
+ public static final String MAX_RECONNECT_DELAY = "max.reconnect.delay";
+ public static final String DEFAULT_MAX_RECONNECT_DELAY = "128000";
}
diff --git a/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttSinkAutomaticReconnectTest.java b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttSinkAutomaticReconnectTest.java
new file mode 100644
index 0000000..c2092d0
--- /dev/null
+++ b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttSinkAutomaticReconnectTest.java
@@ -0,0 +1,151 @@
+package io.siddhi.extension.io.mqtt.sink;
+
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.exception.ConnectionUnavailableException;
+import io.siddhi.core.stream.input.InputHandler;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.rmi.RemoteException;
+import java.util.Properties;
+
+public class MqttSinkAutomaticReconnectTest {
+ private volatile int count;
+ private volatile boolean eventArrived;
+ private static final Logger log = Logger.getLogger(MqttSinkAutomaticReconnectTest.class);
+ private static final Server mqttBroker = new Server();
+ private MqttTestClient mqttTestClient;
+ private static final Properties properties = new Properties();
+
+ @BeforeMethod
+ public void initBeforeMethod() {
+ count = 0;
+ eventArrived = false;
+ }
+
+ @BeforeClass
+ public static void init() throws Exception {
+ try {
+ properties.put("port", Integer.toString(1883));
+ properties.put("host", "0.0.0.0");
+ final IConfig config = new MemoryConfig(properties);
+ mqttBroker.startServer(config);
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ throw new RemoteException("Exception caught when starting server", e);
+ }
+ }
+
+ @AfterClass
+ public static void stop() {
+ mqttBroker.stopServer();
+ }
+
+ @Test
+ public void mqttPublishEventWithAutomaticReconnect() {
+ log.info("Test for Mqtt publish events with automatic reconnect enabled");
+ SiddhiManager siddhiManager = new SiddhiManager();
+ ResultContainer resultContainer = new ResultContainer(3);
+ SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
+ "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ + "topic='mqtt_publish_event_with_automatic_reconnect',username='mqtt-user', "
+ + "password='mqtt-password', clean.session='true', message.retain='false', "
+ + "automatic.reconnect='true', keep.alive= '60'," + "@map(type='xml'))"
+ + "Define stream BarStream (symbol string, price float, volume long);"
+ + "from FooStream select symbol, price, volume insert into BarStream;");
+ InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+ try {
+ this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
+ "mqtt_publish_event_with_automatic_reconnect", 1,
+ resultContainer, true, false);
+ } catch (ConnectionUnavailableException e) {
+ AssertJUnit.fail("Could not connect to broker.");
+ }
+ siddhiAppRuntime.start();
+ try {
+ fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+ fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+ Thread.sleep(500);
+ mqttBroker.stopServer();
+ fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+ } catch (InterruptedException e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ try {
+ final IConfig config = new MemoryConfig(properties);
+ mqttBroker.startServer(config);
+ Thread.sleep(2000);
+ fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+ Thread.sleep(500);
+ } catch (Exception e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ count = mqttTestClient.getCount();
+ eventArrived = mqttTestClient.getEventArrived();
+ AssertJUnit.assertEquals(3, count);
+ AssertJUnit.assertTrue(eventArrived);
+ AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
+ AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
+ AssertJUnit.assertTrue(resultContainer.assertMessageContent("JAMES"));
+ siddhiAppRuntime.shutdown();
+ }
+
+ @Test
+ public void mqttPublishEventWithoutAutomaticReconnect() {
+ log.info("Test for Mqtt publish events with automatic reconnect disabled");
+ SiddhiManager siddhiManager = new SiddhiManager();
+ ResultContainer resultContainer = new ResultContainer(2);
+ SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
+ "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ + "topic='mqtt_publish_event_without_automatic_reconnect',username='mqtt-user', "
+ + "password='mqtt-password', clean.session='true', message.retain='false', "
+ + "automatic.reconnect='false', keep.alive= '60'," + "@map(type='xml'))"
+ + "Define stream BarStream (symbol string, price float, volume long);"
+ + "from FooStream select symbol, price, volume insert into BarStream;");
+ InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+ try {
+ this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
+ "mqtt_publish_event_without_automatic_reconnect", 1, resultContainer,
+ true, false);
+ } catch (ConnectionUnavailableException e) {
+ AssertJUnit.fail("Could not connect to broker.");
+ }
+ siddhiAppRuntime.start();
+ try {
+ fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+ fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+ Thread.sleep(500);
+ mqttBroker.stopServer();
+ fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+ } catch (InterruptedException e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ try {
+ final IConfig config = new MemoryConfig(properties);
+ mqttBroker.startServer(config);
+ Thread.sleep(2000);
+ fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+ Thread.sleep(500);
+ } catch (Exception e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+
+ count = mqttTestClient.getCount();
+ eventArrived = mqttTestClient.getEventArrived();
+ AssertJUnit.assertEquals(2, count);
+ AssertJUnit.assertTrue(eventArrived);
+ AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
+ AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
+ siddhiAppRuntime.shutdown();
+ }
+}
diff --git a/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java
index e450c5e..2cfe31b 100644
--- a/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java
+++ b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java
@@ -37,7 +37,6 @@ public class MqttTestClient {
private String clientId;
private String userName = null;
private String userPassword = "";
- private boolean cleanSession = true;
private boolean eventArrived;
private int count;
private int keepAlive = 60;
@@ -73,6 +72,12 @@ public boolean getEventArrived() {
public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer)
throws ConnectionUnavailableException {
+ this(brokerURL, topic, qos, resultContainer, false, true);
+ }
+
+ public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer,
+ boolean automaticReconnect, boolean cleanSession)
+ throws ConnectionUnavailableException {
this.resultContainer = resultContainer;
try {
persistence = new MemoryPersistence();
@@ -84,6 +89,7 @@ public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer r
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
+ connectionOptions.setAutomaticReconnect(automaticReconnect);
client.connect(connectionOptions);
} catch (MqttException e) {
throw new ConnectionUnavailableException(
diff --git a/component/src/test/java/io/siddhi/extension/io/mqtt/source/MqttSourceAutomaticReconnectTest.java b/component/src/test/java/io/siddhi/extension/io/mqtt/source/MqttSourceAutomaticReconnectTest.java
new file mode 100644
index 0000000..0080ca7
--- /dev/null
+++ b/component/src/test/java/io/siddhi/extension/io/mqtt/source/MqttSourceAutomaticReconnectTest.java
@@ -0,0 +1,175 @@
+package io.siddhi.extension.io.mqtt.source;
+
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.SiddhiTestHelper;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.rmi.RemoteException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MqttSourceAutomaticReconnectTest {
+ static final Logger LOG = Logger.getLogger(MqttSourceTestCase.class);
+ private AtomicInteger count = new AtomicInteger(0);
+ private int waitTime = 50;
+ private int timeout = 60000;
+ private volatile boolean eventArrived;
+ private static final Server mqttBroker = new Server();
+ private static final Properties properties = new Properties();
+
+ @BeforeMethod
+ public void initBeforeMethod() {
+ count.set(0);
+ eventArrived = false;
+ }
+
+ @BeforeClass
+ public static void init() throws Exception {
+ try {
+ properties.put("port", Integer.toString(1883));
+ properties.put("host", "0.0.0.0");
+ final IConfig config = new MemoryConfig(properties);
+ mqttBroker.startServer(config);
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ throw new RemoteException("Exception caught when starting server", e);
+ }
+ }
+
+ @AfterClass
+ public static void stop() {
+ mqttBroker.stopServer();
+ }
+
+ @Test
+ public void mqttReceiveEventsWithAutomaticReconnect() {
+ LOG.info("Test for receiving events with automatic reconnect enabled");
+ SiddhiManager siddhiManager = new SiddhiManager();
+ SiddhiAppRuntime siddhiAppRuntimeSource = siddhiManager.createSiddhiAppRuntime(
+ "@App:name('TestExecutionPlan2') "
+ + "define stream BarStream2 (symbol string, price float, volume long); "
+ + "@info(name = 'query1') "
+ + "@source(type='mqtt',url= 'tcp://localhost:1883',topic = 'mqtt_receive_event_with_automatic_reconnect',"
+ + "automatic.reconnect='true', clean.session='false', keep.alive= '60',@map(type='xml'))"
+ + "Define stream FooStream2 (symbol string, price float, volume long);"
+ + "from FooStream2 select symbol, price, volume insert into BarStream2;");
+ siddhiAppRuntimeSource.addCallback("BarStream2", new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ LOG.info(event);
+ eventArrived = true;
+ count.incrementAndGet();
+ }
+ }
+ });
+ siddhiAppRuntimeSource.start();
+ try {
+ Thread.sleep(4000);
+ } catch (InterruptedException e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime("@App:name('TestExecutionPlan') "
+ + "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ + "topic='mqtt_receive_event_with_automatic_reconnect', clean.session='true', message.retain='false', "
+ + "automatic.reconnect='true', quality.of.service= '1',keep.alive= '60'," + "@map(type='xml'))"
+ + "Define stream BarStream (symbol string, price float, volume long);"
+ + "from FooStream select symbol, price, volume insert into BarStream;");
+ InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+ siddhiAppRuntime.start();
+ try {
+ fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+ fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+ Thread.sleep(500);
+ mqttBroker.stopServer();
+ fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+ } catch (InterruptedException e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ try {
+ final IConfig config = new MemoryConfig(properties);
+ mqttBroker.startServer(config);
+ Thread.sleep(2000);
+ fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+ SiddhiTestHelper.waitForEvents(waitTime, 3, count, timeout);
+ } catch (Exception e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ siddhiAppRuntimeSource.shutdown();
+ siddhiAppRuntime.shutdown();
+
+ }
+
+ @Test
+ public void mqttReceiveEventsWithoutAutomaticReconnect() {
+ LOG.info("Test for receiving events with automatic reconnect disabled");
+ SiddhiManager siddhiManager = new SiddhiManager();
+ SiddhiAppRuntime siddhiAppRuntimeSource = siddhiManager.createSiddhiAppRuntime(
+ "@App:name('TestExecutionPlan2') "
+ + "define stream BarStream2 (symbol string, price float, volume long); "
+ + "@info(name = 'query1') "
+ + "@source(type='mqtt',url= 'tcp://localhost:1883',topic='mqtt_receive_event_without_automatic_reconnect',"
+ + "automatic.reconnect='false', clean.session='true', keep.alive= '60',@map(type='xml'))"
+ + "Define stream FooStream2 (symbol string, price float, volume long);"
+ + "from FooStream2 select symbol, price, volume insert into BarStream2;");
+ siddhiAppRuntimeSource.addCallback("BarStream2", new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ for (Event event : events) {
+ LOG.info(event);
+ eventArrived = true;
+ count.incrementAndGet();
+ }
+ }
+ });
+ siddhiAppRuntimeSource.start();
+ try {
+ Thread.sleep(4000);
+ } catch (InterruptedException e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime("@App:name('TestExecutionPlan') "
+ + "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ + "topic='mqtt_receive_event_without_automatic_reconnect', clean.session='true', message.retain='false', "
+ + "automatic.reconnect='true', quality.of.service= '1',keep.alive= '60'," + "@map(type='xml'))"
+ + "Define stream BarStream (symbol string, price float, volume long);"
+ + "from FooStream select symbol, price, volume insert into BarStream;");
+ InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+ siddhiAppRuntime.start();
+ try {
+ fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+ fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+ Thread.sleep(500);
+ mqttBroker.stopServer();
+ fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+ } catch (InterruptedException e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ try {
+ final IConfig config = new MemoryConfig(properties);
+ mqttBroker.startServer(config);
+ Thread.sleep(2000);
+ fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+ SiddhiTestHelper.waitForEvents(waitTime, 2, count, timeout);
+ } catch (Exception e) {
+ AssertJUnit.fail("Thread sleep was interrupted");
+ }
+ siddhiAppRuntimeSource.shutdown();
+ siddhiAppRuntime.shutdown();
+
+ }
+}
diff --git a/pom.xml b/pom.xml
index 95fc2d4..9729642 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,6 +146,39 @@
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+ 3.0.4
+
+
+ Max
+
+ false
+
+ Low
+
+ true
+
+ ${project.build.directory}/findbugs
+
+
+
+
+ analyze-compile
+ compile
+
+ check
+
+
+
+
@@ -157,7 +190,41 @@
bintray
http://dl.bintray.com/andsel/maven
+
+
+ false
+
+ wso2
+ wso2
+ https://maven.wso2.org/nexus/content/repositories/releases/
+
+
+
+ false
+
+ wso2-nexus
+ wso2 nexus
+ http://maven.wso2.org/nexus/content/groups/wso2-public
+
+
+
+
+ false
+
+ wso2.releases
+ wso2 plugins releases
+ http://maven.wso2.org/nexus/content/repositories/releases
+
+
+
+ false
+
+ wso2-nexus
+ wso2 plugins nexus
+ http://maven.wso2.org/nexus/content/groups/wso2-public
+
+
https://github.com/siddhi-io/siddhi-io-mqtt.git