From dfbde9bb6fedba6d7d3da6ca87b1f9d67b4df538 Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Mon, 5 Aug 2024 10:08:50 +0100 Subject: [PATCH 1/6] Issue 556 - ping callback --- .../mqtt_client_mqtt_connection_keep_alive.dart | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart index 4f7b3fc..8b75008 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart @@ -10,6 +10,9 @@ part of '../../mqtt_client.dart'; /// Ping response received callback typedef PongCallback = void Function(); +/// Ping request sent callback +typedef PingCallback = void Function(); + /// Implements keep alive functionality on the Mqtt Connection, /// ensuring that the connection remains active according to the /// keep alive seconds setting. @@ -64,9 +67,12 @@ class MqttConnectionKeepAlive { /// Used to synchronise shutdown and ping operations. bool _shutdownPadlock = false; - /// Ping response received callback + /// Ping response received callback. PongCallback? pongCallback; + /// Ping request sent callback. + PingCallback? pingCallback; + /// The event bus events.EventBus? _clientEventBus; @@ -88,6 +94,9 @@ class MqttConnectionKeepAlive { try { _connectionHandler.sendMessage(pingMsg); pinged = true; + if (pingCallback != null) { + pingCallback!(); + } } catch (e) { MqttLogger.log( 'MqttConnectionKeepAlive::pingRequired - exception occurred'); From 30133a8d75bc3953016de8a4cda51aff1d11ad95 Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Mon, 5 Aug 2024 10:47:07 +0100 Subject: [PATCH 2/6] Issue 556 - latency --- ...qtt_client_mqtt_connection_keep_alive.dart | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart index 8b75008..6099803 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart @@ -73,6 +73,18 @@ class MqttConnectionKeepAlive { /// Ping request sent callback. PingCallback? pingCallback; + /// Latency(time between sending a ping and receiving a pong) in ms + /// of the last ping/pong cycle. Reset on disconnect. + int lastCycleLatency = 0; + + DateTime _lastPingTime = DateTime.now(); + + /// Average latency(time between sending a ping and receiving a pong) in ms + /// of all the ping/pong cycles in a connection period. Reset on disconnect. + int averageCycleLatency = 0; + + int _cycleCount = 0; + /// The event bus events.EventBus? _clientEventBus; @@ -94,6 +106,7 @@ class MqttConnectionKeepAlive { try { _connectionHandler.sendMessage(pingMsg); pinged = true; + _lastPingTime = DateTime.now(); if (pingCallback != null) { pingCallback!(); } @@ -160,10 +173,17 @@ class MqttConnectionKeepAlive { /// Processed ping response messages received from a message broker. bool pingResponseReceived(MqttMessage? pingMsg) { MqttLogger.log('MqttConnectionKeepAlive::pingResponseReceived'); + + // Calculate latency + lastCycleLatency = DateTime.now().millisecond - _lastPingTime.millisecond; + _cycleCount++; + averageCycleLatency += averageCycleLatency ~/ _cycleCount; + // Call the pong callback if not null if (pongCallback != null) { pongCallback!(); } + // Cancel the disconnect timer if needed. disconnectTimer?.cancel(); return true; @@ -177,6 +197,9 @@ class MqttConnectionKeepAlive { MqttLogger.log('MqttConnectionKeepAlive::stop - stopping keep alive'); pingTimer!.cancel(); disconnectTimer?.cancel(); + lastCycleLatency = 0; + averageCycleLatency = 0; + _cycleCount = 0; } /// Handle the disconnect timer timeout From c557560cf1440ac08c5b6d469f4d2b9530cd84ae Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Mon, 5 Aug 2024 10:55:38 +0100 Subject: [PATCH 3/6] Issue 556 --- test/mqtt_client_keep_alive_test.dart | 41 +++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/test/mqtt_client_keep_alive_test.dart b/test/mqtt_client_keep_alive_test.dart index e57e7b6..37d1e93 100644 --- a/test/mqtt_client_keep_alive_test.dart +++ b/test/mqtt_client_keep_alive_test.dart @@ -196,4 +196,45 @@ void main() { expect(ka.disconnectTimer, isNull); }); }); + group('Latency', () { + test('Ping callback', () async { + final clientEventBus = events.EventBus(); + var disconnect = false; + void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) { + disconnect = true; + } + + var pingCalled = false; + void pingCallback() { + pingCalled = true; + } + + clientEventBus + .on() + .listen(disconnectOnNoPingResponse); + final ch = MockCH( + clientEventBus, + maxConnectionAttempts: 3, + ); + ch.connectionStatus.state = MqttConnectionState.connected; + final ka = MqttConnectionKeepAlive(ch, clientEventBus, 2); + ka.pingCallback = pingCallback; + verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any())) + .called(1); + verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any())) + .called(1); + verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1); + expect(ka.pingTimer?.isActive, isTrue); + expect(ka.disconnectTimer, isNull); + await MqttUtilities.asyncSleep(3); + verify(() => ch.sendMessage(any())).called(1); + expect(pingCalled, isTrue); + final pingMessageRx = MqttPingResponseMessage(); + ka.pingResponseReceived(pingMessageRx); + expect(disconnect, isFalse); + ka.stop(); + expect(ka.pingTimer?.isActive, isFalse); + expect(ka.disconnectTimer, isNull); + }); + }); } From 8c8c433e141289a8ee0a3a48be8912111056e43f Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Mon, 5 Aug 2024 15:19:19 +0100 Subject: [PATCH 4/6] Issue 556 --- ...qtt_client_mqtt_connection_keep_alive.dart | 11 +++-- lib/src/mqtt_client.dart | 19 +++++++- test/mqtt_client_keep_alive_test.dart | 47 +++++++++++++++++++ 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart index 6099803..2a604b7 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart @@ -77,7 +77,7 @@ class MqttConnectionKeepAlive { /// of the last ping/pong cycle. Reset on disconnect. int lastCycleLatency = 0; - DateTime _lastPingTime = DateTime.now(); + int _lastPingTime = 0; /// Average latency(time between sending a ping and receiving a pong) in ms /// of all the ping/pong cycles in a connection period. Reset on disconnect. @@ -106,7 +106,7 @@ class MqttConnectionKeepAlive { try { _connectionHandler.sendMessage(pingMsg); pinged = true; - _lastPingTime = DateTime.now(); + _lastPingTime = DateTime.now().millisecondsSinceEpoch; if (pingCallback != null) { pingCallback!(); } @@ -174,10 +174,11 @@ class MqttConnectionKeepAlive { bool pingResponseReceived(MqttMessage? pingMsg) { MqttLogger.log('MqttConnectionKeepAlive::pingResponseReceived'); - // Calculate latency - lastCycleLatency = DateTime.now().millisecond - _lastPingTime.millisecond; + // Calculate latencies + lastCycleLatency = DateTime.now().millisecondsSinceEpoch - _lastPingTime; _cycleCount++; - averageCycleLatency += averageCycleLatency ~/ _cycleCount; + averageCycleLatency = + (averageCycleLatency + lastCycleLatency) ~/ _cycleCount; // Call the pong callback if not null if (pongCallback != null) { diff --git a/lib/src/mqtt_client.dart b/lib/src/mqtt_client.dart index c2bfb7b..2a2b139 100755 --- a/lib/src/mqtt_client.dart +++ b/lib/src/mqtt_client.dart @@ -250,7 +250,7 @@ class MqttClient { subscriptionsManager?.onUnsubscribed = cb; } - /// Ping response received callback. + /// Ping response(pong) received callback. /// If set when a ping response is received from the broker /// this will be called. /// Can be used for health monitoring outside of the client itself. @@ -264,6 +264,20 @@ class MqttClient { keepAlive?.pongCallback = cb; } + /// Ping request(ping) sent callback. + /// If set when a ping request is sent from the client + /// this will be called. + /// Can be used in tandem with the [pongCallback] for latency calculations. + PingCallback? _pingCallback; + + /// The ping sent callback + PingCallback? get pingCallback => _pingCallback; + + set pingCallback(PingCallback? cb) { + _pingCallback = cb; + keepAlive?.pingCallback = cb; + } + /// The event bus @protected events.EventBus? clientEventBus; @@ -320,6 +334,9 @@ class MqttClient { if (pongCallback != null) { keepAlive!.pongCallback = pongCallback; } + if (pingCallback != null) { + keepAlive!.pingCallback = pingCallback; + } } else { MqttLogger.log('MqttClient::connect - keep alive is disabled'); } diff --git a/test/mqtt_client_keep_alive_test.dart b/test/mqtt_client_keep_alive_test.dart index 37d1e93..8463a91 100644 --- a/test/mqtt_client_keep_alive_test.dart +++ b/test/mqtt_client_keep_alive_test.dart @@ -236,5 +236,52 @@ void main() { expect(ka.pingTimer?.isActive, isFalse); expect(ka.disconnectTimer, isNull); }); + test('Latency counts', () async { + final latencies = [0, 0]; + final clientEventBus = events.EventBus(); + var disconnect = false; + void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) { + disconnect = true; + } + + clientEventBus + .on() + .listen(disconnectOnNoPingResponse); + final ch = MockCH( + clientEventBus, + maxConnectionAttempts: 3, + ); + ch.connectionStatus.state = MqttConnectionState.connected; + final ka = MqttConnectionKeepAlive(ch, clientEventBus, 3); + verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any())) + .called(1); + verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any())) + .called(1); + verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1); + expect(ka.pingTimer?.isActive, isTrue); + expect(ka.disconnectTimer, isNull); + await MqttUtilities.asyncSleep(3); + verify(() => ch.sendMessage(any())).called(1); + await MqttUtilities.asyncSleep(1); + final pingMessageRx = MqttPingResponseMessage(); + ka.pingResponseReceived(pingMessageRx); + latencies[0] = ka.lastCycleLatency; + expect(ka.lastCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency > 1000, isTrue); + await MqttUtilities.asyncSleep(2); + verify(() => ch.sendMessage(any())).called(1); + await MqttUtilities.asyncSleep(1); + ka.pingResponseReceived(pingMessageRx); + latencies[1] = ka.lastCycleLatency; + expect(ka.lastCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency, (latencies[0] + latencies[1]) ~/ 2); + expect(disconnect, isFalse); + ka.stop(); + expect(ka.averageCycleLatency, 0); + expect(ka.lastCycleLatency, 0); + expect(ka.pingTimer?.isActive, isFalse); + expect(ka.disconnectTimer, isNull); + }); }); } From 8ebb127ebb0c95688579da621cc6218b9f28254f Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Mon, 5 Aug 2024 15:46:40 +0100 Subject: [PATCH 5/6] Issue 556 --- example/mqtt_server_client.dart | 29 +++++++++++++++++++++++++++-- lib/src/mqtt_client.dart | 8 ++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/example/mqtt_server_client.dart b/example/mqtt_server_client.dart index 87ec0f2..8033f79 100644 --- a/example/mqtt_server_client.dart +++ b/example/mqtt_server_client.dart @@ -27,6 +27,7 @@ import 'package:mqtt_client/mqtt_server_client.dart'; final client = MqttServerClient('test.mosquitto.org', ''); var pongCount = 0; // Pong counter +var pingCount = 0; // Ping counter Future main() async { /// A websocket URL must start with ws:// or wss:// or Dart will throw an exception, consult your websocket MQTT broker @@ -41,7 +42,7 @@ Future main() async { /// list so in most cases you can ignore this. /// Set logging on if needed, defaults to off - client.logging(on: true); + client.logging(on: false); /// Set the correct MQTT protocol for mosquito client.setProtocolV311(); @@ -66,9 +67,13 @@ Future main() async { client.onSubscribed = onSubscribed; /// Set a ping received callback if needed, called whenever a ping response(pong) is received - /// from the broker. + /// from the broker. Can be used for health monitoring. client.pongCallback = pong; + /// Set a ping sent callback if needed, called whenever a ping request(ping) is sent + /// by the client. Can be used for latency calculations. + client.pingCallback = ping; + /// Create a connection message to use or use the default one. The default one sets the /// client identifier, any supplied username/password and clean session, /// an example of a specific one below. @@ -160,6 +165,13 @@ Future main() async { print('EXAMPLE::Sleeping....'); await MqttUtilities.asyncSleep(60); + /// Print the ping/pong cycle latency data before disconnecting. + print('EXAMPLE::Keep alive latencies'); + print( + 'The latency of the last ping/pong cycle is ${client.lastCycleLatency} milliseconds'); + print( + 'The average latency of all the ping/pong cycles is ${client.averageCycleLatency} milliseconds'); + /// Finally, unsubscribe and exit gracefully print('EXAMPLE::Unsubscribing'); client.unsubscribe(topic); @@ -193,6 +205,11 @@ void onDisconnected() { } else { print('EXAMPLE:: Pong count is incorrect, expected 3. actual $pongCount'); } + if (pingCount == 3) { + print('EXAMPLE:: Ping count is correct'); + } else { + print('EXAMPLE:: Ping count is incorrect, expected 3. actual $pingCount'); + } } /// The successful connect callback @@ -205,4 +222,12 @@ void onConnected() { void pong() { print('EXAMPLE::Ping response client callback invoked'); pongCount++; + print( + 'EXAMPLE::Latency of this ping/pong cycle is ${client.lastCycleLatency} milliseconds'); +} + +/// Ping callback +void ping() { + print('EXAMPLE::Ping sent client callback invoked'); + pingCount++; } diff --git a/lib/src/mqtt_client.dart b/lib/src/mqtt_client.dart index 2a2b139..1986e9c 100755 --- a/lib/src/mqtt_client.dart +++ b/lib/src/mqtt_client.dart @@ -278,6 +278,14 @@ class MqttClient { keepAlive?.pingCallback = cb; } + /// The latency of the last ping/pong cycle in milliseconds. + /// Cleared on disconnect. + int? get lastCycleLatency => keepAlive?.lastCycleLatency; + + /// The average latency of all ping/pong cycles in a connection period in + /// milliseconds. Cleared on disconnect. + int? get averageCycleLatency => keepAlive?.averageCycleLatency; + /// The event bus @protected events.EventBus? clientEventBus; From a62ad411785be95dd415f205a617751505e158bc Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Tue, 6 Aug 2024 10:19:49 +0100 Subject: [PATCH 6/6] Issue 556 --- .../mqtt_client_mqtt_connection_keep_alive.dart | 6 ++++-- test/mqtt_client_keep_alive_test.dart | 17 +++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart index 2a604b7..5b7e51e 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart @@ -177,8 +177,10 @@ class MqttConnectionKeepAlive { // Calculate latencies lastCycleLatency = DateTime.now().millisecondsSinceEpoch - _lastPingTime; _cycleCount++; - averageCycleLatency = - (averageCycleLatency + lastCycleLatency) ~/ _cycleCount; + // Average latency calculation is + // new_avg = prev_avg + ((new_value − prev_avg) ~/ n + 1) + averageCycleLatency += + (lastCycleLatency - averageCycleLatency) ~/ _cycleCount; // Call the pong callback if not null if (pongCallback != null) { diff --git a/test/mqtt_client_keep_alive_test.dart b/test/mqtt_client_keep_alive_test.dart index 8463a91..e23ce32 100644 --- a/test/mqtt_client_keep_alive_test.dart +++ b/test/mqtt_client_keep_alive_test.dart @@ -237,7 +237,7 @@ void main() { expect(ka.disconnectTimer, isNull); }); test('Latency counts', () async { - final latencies = [0, 0]; + final latencies = [0, 0, 0]; final clientEventBus = events.EventBus(); var disconnect = false; void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) { @@ -260,22 +260,27 @@ void main() { verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1); expect(ka.pingTimer?.isActive, isTrue); expect(ka.disconnectTimer, isNull); - await MqttUtilities.asyncSleep(3); + await MqttUtilities.asyncSleep(4); verify(() => ch.sendMessage(any())).called(1); - await MqttUtilities.asyncSleep(1); final pingMessageRx = MqttPingResponseMessage(); ka.pingResponseReceived(pingMessageRx); latencies[0] = ka.lastCycleLatency; expect(ka.lastCycleLatency > 1000, isTrue); expect(ka.averageCycleLatency > 1000, isTrue); - await MqttUtilities.asyncSleep(2); + await MqttUtilities.asyncSleep(3); verify(() => ch.sendMessage(any())).called(1); - await MqttUtilities.asyncSleep(1); ka.pingResponseReceived(pingMessageRx); latencies[1] = ka.lastCycleLatency; expect(ka.lastCycleLatency > 1000, isTrue); expect(ka.averageCycleLatency > 1000, isTrue); - expect(ka.averageCycleLatency, (latencies[0] + latencies[1]) ~/ 2); + await MqttUtilities.asyncSleep(3); + verify(() => ch.sendMessage(any())).called(1); + ka.pingResponseReceived(pingMessageRx); + latencies[2] = ka.lastCycleLatency; + expect(ka.lastCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency, + (latencies[0] + latencies[1] + latencies[2]) ~/ 3); expect(disconnect, isFalse); ka.stop(); expect(ka.averageCycleLatency, 0);