From d16450405906545a7ee11c9ea2f0df17727e726a Mon Sep 17 00:00:00 2001
From: Syed Muhammad Hashim <hashim.muhammad9@gmail.com>
Date: Sun, 3 Apr 2022 18:05:38 +0500
Subject: [PATCH] Added configuration option for auto reconnect, reconnect
 delay, and max inflight

---
 .gitignore                                    |   5 +-
 .../extension/io/mqtt/sink/MqttSink.java      |  42 ++++-
 .../extension/io/mqtt/source/MqttSource.java  |  44 ++++-
 .../extension/io/mqtt/util/MqttConstants.java |   9 +-
 .../sink/MqttSinkAutomaticReconnectTest.java  | 151 +++++++++++++++
 .../io/mqtt/sink/MqttTestClient.java          |   8 +-
 .../MqttSourceAutomaticReconnectTest.java     | 175 ++++++++++++++++++
 7 files changed, 426 insertions(+), 8 deletions(-)
 create mode 100644 component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttSinkAutomaticReconnectTest.java
 create mode 100644 component/src/test/java/io/siddhi/extension/io/mqtt/source/MqttSourceAutomaticReconnectTest.java

diff --git a/.gitignore b/.gitignore
index ea8bfa0..462ff49 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,4 +23,7 @@ hs_err_pid*
 
 .idea
 *.iml
-target
\ No newline at end of file
+target
+
+*.mapdb*
+*SNAPSHOT.md
\ No newline at end of file
diff --git a/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java b/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java
index bdde01d..4d5cdbd 100644
--- a/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java
+++ b/component/src/main/java/io/siddhi/extension/io/mqtt/sink/MqttSink.java
@@ -135,8 +135,31 @@
                                 "to connect to the MQTT broker. Once this time interval elapses, a timeout takes " +
                                 "place.",
                         type = {DataType.INT},
-                        optional = true, defaultValue = "30")
-
+                        optional = true, defaultValue = "30"),
+                @Parameter(
+                        name = "max.inflight",
+                        description = "The maximum number of messages the MQTT client can send without receiving " +
+                            "acknowledgments. The default value is 10",
+                        type = {DataType.INT},
+                        optional = true, defaultValue = "10"),
+                @Parameter(
+                        name = "automatic.reconnect",
+                        description = "This is an optional parameter. If set to true, in the event that the " +
+                                "connection is lost, the client will attempt to reconnect to the server. It will " +
+                                "initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
+                                "attempt, the delay will double until it is at 2 minutes at which point the delay "  +
+                                "will stay at 2 minutes. " +
+                                "If set to false, the client will not attempt to automatically reconnect to the " +
+                                "server in the event that the connection is lost. " +
+                                "The default value is `false`.",
+                        type = {DataType.BOOL},
+                        optional = true, defaultValue = "false"),
+                @Parameter(
+                        name = "max.reconnect.delay",
+                        description = "The maximum number of milliseconds the client could wait after the connection " +
+                                "is lost. The default value is 128000",
+                        type = {DataType.INT},
+                        optional = true, defaultValue = "128000")
         },
         examples =
                 {
@@ -144,6 +167,7 @@
                                 syntax = "@sink(type='mqtt', url= 'tcp://localhost:1883', " +
                                         "topic='mqtt_topic', clean.session='true', message.retain='false', " +
                                         "quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
+                                        "max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
                                         "@map(type='xml'))" +
                                         "Define stream BarStream (symbol string, price float, volume long);",
                                 description = "This query publishes events to a stream named `BarStream` via the " +
@@ -165,6 +189,9 @@ public class MqttSink extends Sink {
     private boolean cleanSession;
     private int keepAlive;
     private int connectionTimeout;
+    private int maxInflight;
+    private boolean automaticReconnect;
+    private int maxReconnectDelay;
     private MqttClient client;
     private Option messageRetainOption;
     private StreamDefinition streamDefinition;
@@ -192,6 +219,14 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
                 MqttConstants.DEFAULT_MESSAGE_RETAIN);
         this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
                     (MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
+        this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
+                (MqttConstants.MAX_INFLIGHT,
+                        MqttConstants.DEFAULT_MAX_INFLIGHT));
+        this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
+                (MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
+        this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
+                (MqttConstants.MAX_RECONNECT_DELAY,
+                        MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
         return null;
     }
 
@@ -225,6 +260,9 @@ public void connect() throws ConnectionUnavailableException {
             connectionOptions.setCleanSession(cleanSession);
             connectionOptions.setKeepAliveInterval(keepAlive);
             connectionOptions.setConnectionTimeout(connectionTimeout);
+            connectionOptions.setMaxInflight(maxInflight);
+            connectionOptions.setAutomaticReconnect(automaticReconnect);
+            connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
             client.connect(connectionOptions);
 
         } catch (MqttException e) {
diff --git a/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java b/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java
index b2a9547..9a38cbc 100644
--- a/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java
+++ b/component/src/main/java/io/siddhi/extension/io/mqtt/source/MqttSource.java
@@ -126,7 +126,34 @@
                                 "place.",
                         type = {DataType.INT},
                         optional = true,
-                        defaultValue = "30")
+                        defaultValue = "30"),
+                @Parameter(
+                        name = "max.inflight",
+                        description = "The maximum number of messages the MQTT client can send without receiving " +
+                            "acknowledgments. The default value is 10",
+                        type = {DataType.INT},
+                        optional = true,
+                        defaultValue = "10"),
+                @Parameter(
+                        name = "automatic.reconnect",
+                        description = "This is an optional parameter. If set to true, in the event that the " +
+                                "connection is lost, the client will attempt to reconnect to the server. It will " +
+                                "initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
+                                "attempt, the delay will double until it is at 2 minutes at which point the delay "  +
+                                "will stay at 2 minutes. " +
+                                "If set to false, the client will not attempt to automatically reconnect to the " +
+                                "server in the event that the connection is lost. " +
+                                "The default value is `false`.",
+                        type = {DataType.BOOL},
+                        optional = true,
+                        defaultValue = "false"),
+                @Parameter(
+                        name = "max.reconnect.delay",
+                        description = "The maximum number of milliseconds the client could wait after the connection " +
+                                "is lost. The default value is 128000",
+                        type = {DataType.INT},
+                        optional = true,
+                        defaultValue = "128000")
         },
         examples =
                 {
@@ -134,6 +161,7 @@
                                 syntax = "@source(type='mqtt', url= 'tcp://localhost:1883', " +
                                         "topic='mqtt_topic', clean.session='true'," +
                                         "quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
+                                        "max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
                                         "@map(type='xml'))" +
                                         "Define stream BarStream (symbol string, price float, volume long);",
                                 description = "This query receives events from the `mqtt_topic` topic via MQTT," +
@@ -153,6 +181,9 @@ public class MqttSource extends Source {
     private boolean cleanSession;
     private int keepAlive;
     private int connectionTimeout;
+    private int maxInflight;
+    private boolean automaticReconnect;
+    private int maxReconnectDelay;
     private MqttClient client;
     private MqttConsumer mqttConsumer;
     private String siddhiAppName;
@@ -178,6 +209,14 @@ public StateFactory init(SourceEventListener sourceEventListener, OptionHolder o
                         MqttConstants.DEFAULT_CONNECTION_TIMEOUT_INTERVAL));
         this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
                 (MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
+        this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
+                (MqttConstants.MAX_INFLIGHT,
+                        MqttConstants.DEFAULT_MAX_INFLIGHT));
+        this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
+                (MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
+        this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
+                (MqttConstants.MAX_RECONNECT_DELAY,
+                       MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
         this.mqttConsumer = new MqttConsumer(sourceEventListener);
         return null;
     }
@@ -200,6 +239,9 @@ public void connect(ConnectionCallback connectionCallback, State state) throws C
             connectionOptions.setCleanSession(cleanSession);
             connectionOptions.setKeepAliveInterval(keepAlive);
             connectionOptions.setConnectionTimeout(connectionTimeout);
+            connectionOptions.setMaxInflight(maxInflight);
+            connectionOptions.setAutomaticReconnect(automaticReconnect);
+            connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
             client.connect(connectionOptions);
             int qos = Integer.parseInt(String.valueOf(qosOption));
             mqttConsumer.subscribe(topicOption, qos, client);
diff --git a/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java b/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java
index d7fbe49..a025204 100644
--- a/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java
+++ b/component/src/main/java/io/siddhi/extension/io/mqtt/util/MqttConstants.java
@@ -42,7 +42,10 @@ private MqttConstants() {
     public static final String DEFAULT_USERNAME = null;
     public static final String CONNECTION_TIMEOUT_INTERVAL = "connection.timeout";
     public static final String DEFAULT_CONNECTION_TIMEOUT_INTERVAL = "30";
-
-
-
+    public static final String MAX_INFLIGHT = "max.inflight";
+    public static final String DEFAULT_MAX_INFLIGHT = "10";
+    public static final String AUTOMATIC_RECONNECT = "automatic.reconnect";
+    public static final String DEFAULT_AUTOMATIC_RECONNECT = "false";
+    public static final String MAX_RECONNECT_DELAY = "max.reconnect.delay";
+    public static final String DEFAULT_MAX_RECONNECT_DELAY = "128000";
 }
diff --git a/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttSinkAutomaticReconnectTest.java b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttSinkAutomaticReconnectTest.java
new file mode 100644
index 0000000..c2092d0
--- /dev/null
+++ b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttSinkAutomaticReconnectTest.java
@@ -0,0 +1,151 @@
+package io.siddhi.extension.io.mqtt.sink;
+
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.exception.ConnectionUnavailableException;
+import io.siddhi.core.stream.input.InputHandler;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.rmi.RemoteException;
+import java.util.Properties;
+
+public class MqttSinkAutomaticReconnectTest {
+  private volatile int count;
+  private volatile boolean eventArrived;
+  private static final Logger log = Logger.getLogger(MqttSinkAutomaticReconnectTest.class);
+  private static final Server mqttBroker = new Server();
+  private MqttTestClient mqttTestClient;
+  private static final Properties properties = new Properties();
+
+  @BeforeMethod
+  public void initBeforeMethod() {
+    count = 0;
+    eventArrived = false;
+  }
+
+  @BeforeClass
+  public static void init() throws Exception {
+    try {
+      properties.put("port", Integer.toString(1883));
+      properties.put("host", "0.0.0.0");
+      final IConfig config = new MemoryConfig(properties);
+      mqttBroker.startServer(config);
+      Thread.sleep(1000);
+    } catch (Exception e) {
+      throw new RemoteException("Exception caught when starting server", e);
+    }
+  }
+
+  @AfterClass
+  public static void stop() {
+    mqttBroker.stopServer();
+  }
+
+  @Test
+  public void mqttPublishEventWithAutomaticReconnect() {
+    log.info("Test for Mqtt publish events with automatic reconnect enabled");
+    SiddhiManager siddhiManager = new SiddhiManager();
+    ResultContainer resultContainer = new ResultContainer(3);
+    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
+        "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+            + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+            + "topic='mqtt_publish_event_with_automatic_reconnect',username='mqtt-user', "
+            + "password='mqtt-password', clean.session='true', message.retain='false', "
+            + "automatic.reconnect='true', keep.alive= '60'," + "@map(type='xml'))"
+            + "Define stream BarStream (symbol string, price float, volume long);"
+            + "from FooStream select symbol, price, volume insert into BarStream;");
+    InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+    try {
+      this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
+          "mqtt_publish_event_with_automatic_reconnect", 1,
+          resultContainer, true, false);
+    } catch (ConnectionUnavailableException e) {
+      AssertJUnit.fail("Could not connect to broker.");
+    }
+    siddhiAppRuntime.start();
+    try {
+      fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+      fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+      Thread.sleep(500);
+      mqttBroker.stopServer();
+      fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+    } catch (InterruptedException e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    try {
+      final IConfig config = new MemoryConfig(properties);
+      mqttBroker.startServer(config);
+      Thread.sleep(2000);
+      fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+      Thread.sleep(500);
+    } catch (Exception e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    count = mqttTestClient.getCount();
+    eventArrived = mqttTestClient.getEventArrived();
+    AssertJUnit.assertEquals(3, count);
+    AssertJUnit.assertTrue(eventArrived);
+    AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
+    AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
+    AssertJUnit.assertTrue(resultContainer.assertMessageContent("JAMES"));
+    siddhiAppRuntime.shutdown();
+  }
+
+  @Test
+  public void mqttPublishEventWithoutAutomaticReconnect() {
+    log.info("Test for Mqtt publish events with automatic reconnect disabled");
+    SiddhiManager siddhiManager = new SiddhiManager();
+    ResultContainer resultContainer = new ResultContainer(2);
+    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
+        "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+            + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+            + "topic='mqtt_publish_event_without_automatic_reconnect',username='mqtt-user', "
+            + "password='mqtt-password', clean.session='true', message.retain='false', "
+            + "automatic.reconnect='false', keep.alive= '60'," + "@map(type='xml'))"
+            + "Define stream BarStream (symbol string, price float, volume long);"
+            + "from FooStream select symbol, price, volume insert into BarStream;");
+    InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+    try {
+      this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
+          "mqtt_publish_event_without_automatic_reconnect", 1, resultContainer,
+          true, false);
+    } catch (ConnectionUnavailableException e) {
+      AssertJUnit.fail("Could not connect to broker.");
+    }
+    siddhiAppRuntime.start();
+    try {
+      fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+      fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+      Thread.sleep(500);
+      mqttBroker.stopServer();
+      fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+    } catch (InterruptedException e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    try {
+      final IConfig config = new MemoryConfig(properties);
+      mqttBroker.startServer(config);
+      Thread.sleep(2000);
+      fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+      Thread.sleep(500);
+    } catch (Exception e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+
+    count = mqttTestClient.getCount();
+    eventArrived = mqttTestClient.getEventArrived();
+    AssertJUnit.assertEquals(2, count);
+    AssertJUnit.assertTrue(eventArrived);
+    AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
+    AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
+    siddhiAppRuntime.shutdown();
+  }
+}
diff --git a/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java
index 3b9cc70..61e7d01 100644
--- a/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java
+++ b/component/src/test/java/io/siddhi/extension/io/mqtt/sink/MqttTestClient.java
@@ -38,7 +38,6 @@ public class MqttTestClient {
     private String clientId;
     private String userName = null;
     private String userPassword = "";
-    private boolean cleanSession = true;
     private boolean eventArrived;
     private int count;
     private int keepAlive = 60;
@@ -74,6 +73,12 @@ public boolean getEventArrived() {
 
     public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer)
             throws ConnectionUnavailableException {
+        this(brokerURL, topic, qos, resultContainer, false, true);
+    }
+
+    public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer,
+        boolean automaticReconnect, boolean cleanSession)
+        throws ConnectionUnavailableException {
         this.resultContainer = resultContainer;
         try {
             persistence = new MemoryPersistence();
@@ -85,6 +90,7 @@ public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer r
             connectionOptions.setCleanSession(cleanSession);
             connectionOptions.setKeepAliveInterval(keepAlive);
             connectionOptions.setConnectionTimeout(connectionTimeout);
+            connectionOptions.setAutomaticReconnect(automaticReconnect);
             client.connect(connectionOptions);
         } catch (MqttException e) {
             throw new ConnectionUnavailableException(
diff --git a/component/src/test/java/io/siddhi/extension/io/mqtt/source/MqttSourceAutomaticReconnectTest.java b/component/src/test/java/io/siddhi/extension/io/mqtt/source/MqttSourceAutomaticReconnectTest.java
new file mode 100644
index 0000000..0080ca7
--- /dev/null
+++ b/component/src/test/java/io/siddhi/extension/io/mqtt/source/MqttSourceAutomaticReconnectTest.java
@@ -0,0 +1,175 @@
+package io.siddhi.extension.io.mqtt.source;
+
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.core.util.SiddhiTestHelper;
+import org.apache.log4j.Logger;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.rmi.RemoteException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MqttSourceAutomaticReconnectTest {
+  static final Logger LOG = Logger.getLogger(MqttSourceTestCase.class);
+  private AtomicInteger count = new AtomicInteger(0);
+  private int waitTime = 50;
+  private int timeout = 60000;
+  private volatile boolean eventArrived;
+  private static final Server mqttBroker = new Server();
+  private static final Properties properties = new Properties();
+
+  @BeforeMethod
+  public void initBeforeMethod() {
+    count.set(0);
+    eventArrived = false;
+  }
+
+  @BeforeClass
+  public static void init() throws Exception {
+    try {
+      properties.put("port", Integer.toString(1883));
+      properties.put("host", "0.0.0.0");
+      final IConfig config = new MemoryConfig(properties);
+      mqttBroker.startServer(config);
+      Thread.sleep(1000);
+    } catch (Exception e) {
+      throw new RemoteException("Exception caught when starting server", e);
+    }
+  }
+
+  @AfterClass
+  public static void stop() {
+    mqttBroker.stopServer();
+  }
+
+  @Test
+  public void mqttReceiveEventsWithAutomaticReconnect() {
+    LOG.info("Test for receiving events with automatic reconnect enabled");
+    SiddhiManager siddhiManager = new SiddhiManager();
+    SiddhiAppRuntime siddhiAppRuntimeSource = siddhiManager.createSiddhiAppRuntime(
+        "@App:name('TestExecutionPlan2') "
+            + "define stream BarStream2 (symbol string, price float, volume long); "
+            + "@info(name = 'query1') "
+            + "@source(type='mqtt',url= 'tcp://localhost:1883',topic = 'mqtt_receive_event_with_automatic_reconnect',"
+            + "automatic.reconnect='true', clean.session='false', keep.alive= '60',@map(type='xml'))"
+            + "Define stream FooStream2 (symbol string, price float, volume long);"
+            + "from FooStream2 select symbol, price, volume insert into BarStream2;");
+    siddhiAppRuntimeSource.addCallback("BarStream2", new StreamCallback() {
+      @Override
+      public void receive(Event[] events) {
+        for (Event event : events) {
+          LOG.info(event);
+          eventArrived = true;
+          count.incrementAndGet();
+        }
+      }
+    });
+    siddhiAppRuntimeSource.start();
+    try {
+      Thread.sleep(4000);
+    } catch (InterruptedException e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime("@App:name('TestExecutionPlan') "
+        + "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+        + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+        + "topic='mqtt_receive_event_with_automatic_reconnect', clean.session='true', message.retain='false', "
+        + "automatic.reconnect='true', quality.of.service= '1',keep.alive= '60'," + "@map(type='xml'))"
+        + "Define stream BarStream (symbol string, price float, volume long);"
+        + "from FooStream select symbol, price, volume insert into BarStream;");
+    InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+    siddhiAppRuntime.start();
+    try {
+      fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+      fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+      Thread.sleep(500);
+      mqttBroker.stopServer();
+      fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+    } catch (InterruptedException e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    try {
+      final IConfig config = new MemoryConfig(properties);
+      mqttBroker.startServer(config);
+      Thread.sleep(2000);
+      fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+      SiddhiTestHelper.waitForEvents(waitTime, 3, count, timeout);
+    } catch (Exception e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    siddhiAppRuntimeSource.shutdown();
+    siddhiAppRuntime.shutdown();
+
+  }
+
+  @Test
+  public void mqttReceiveEventsWithoutAutomaticReconnect() {
+    LOG.info("Test for receiving events with automatic reconnect disabled");
+    SiddhiManager siddhiManager = new SiddhiManager();
+    SiddhiAppRuntime siddhiAppRuntimeSource = siddhiManager.createSiddhiAppRuntime(
+        "@App:name('TestExecutionPlan2') "
+            + "define stream BarStream2 (symbol string, price float, volume long); "
+            + "@info(name = 'query1') "
+            + "@source(type='mqtt',url= 'tcp://localhost:1883',topic='mqtt_receive_event_without_automatic_reconnect',"
+            + "automatic.reconnect='false', clean.session='true', keep.alive= '60',@map(type='xml'))"
+            + "Define stream FooStream2 (symbol string, price float, volume long);"
+            + "from FooStream2 select symbol, price, volume insert into BarStream2;");
+    siddhiAppRuntimeSource.addCallback("BarStream2", new StreamCallback() {
+      @Override
+      public void receive(Event[] events) {
+        for (Event event : events) {
+          LOG.info(event);
+          eventArrived = true;
+          count.incrementAndGet();
+        }
+      }
+    });
+    siddhiAppRuntimeSource.start();
+    try {
+      Thread.sleep(4000);
+    } catch (InterruptedException e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime("@App:name('TestExecutionPlan') "
+        + "define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+        + "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+        + "topic='mqtt_receive_event_without_automatic_reconnect', clean.session='true', message.retain='false', "
+        + "automatic.reconnect='true', quality.of.service= '1',keep.alive= '60'," + "@map(type='xml'))"
+        + "Define stream BarStream (symbol string, price float, volume long);"
+        + "from FooStream select symbol, price, volume insert into BarStream;");
+    InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
+    siddhiAppRuntime.start();
+    try {
+      fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
+      fooStream.send(new Object[] { "IBM", 75.6f, 100L });
+      Thread.sleep(500);
+      mqttBroker.stopServer();
+      fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
+    } catch (InterruptedException e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    try {
+      final IConfig config = new MemoryConfig(properties);
+      mqttBroker.startServer(config);
+      Thread.sleep(2000);
+      fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
+      SiddhiTestHelper.waitForEvents(waitTime, 2, count, timeout);
+    } catch (Exception e) {
+      AssertJUnit.fail("Thread sleep was interrupted");
+    }
+    siddhiAppRuntimeSource.shutdown();
+    siddhiAppRuntime.shutdown();
+
+  }
+}