Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue556 #557

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions example/mqtt_server_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import 'package:mqtt_client/mqtt_server_client.dart';
final client = MqttServerClient('test.mosquitto.org', '');

var pongCount = 0; // Pong counter
var pingCount = 0; // Ping counter

Future<int> main() async {
/// A websocket URL must start with ws:// or wss:// or Dart will throw an exception, consult your websocket MQTT broker
Expand All @@ -41,7 +42,7 @@ Future<int> main() async {
/// list so in most cases you can ignore this.

/// Set logging on if needed, defaults to off
client.logging(on: true);
client.logging(on: false);

/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();
Expand All @@ -66,9 +67,13 @@ Future<int> main() async {
client.onSubscribed = onSubscribed;

/// Set a ping received callback if needed, called whenever a ping response(pong) is received
/// from the broker.
/// from the broker. Can be used for health monitoring.
client.pongCallback = pong;

/// Set a ping sent callback if needed, called whenever a ping request(ping) is sent
/// by the client. Can be used for latency calculations.
client.pingCallback = ping;

/// Create a connection message to use or use the default one. The default one sets the
/// client identifier, any supplied username/password and clean session,
/// an example of a specific one below.
Expand Down Expand Up @@ -160,6 +165,13 @@ Future<int> main() async {
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);

/// Print the ping/pong cycle latency data before disconnecting.
print('EXAMPLE::Keep alive latencies');
print(
'The latency of the last ping/pong cycle is ${client.lastCycleLatency} milliseconds');
print(
'The average latency of all the ping/pong cycles is ${client.averageCycleLatency} milliseconds');

/// Finally, unsubscribe and exit gracefully
print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic);
Expand Down Expand Up @@ -193,6 +205,11 @@ void onDisconnected() {
} else {
print('EXAMPLE:: Pong count is incorrect, expected 3. actual $pongCount');
}
if (pingCount == 3) {
print('EXAMPLE:: Ping count is correct');
} else {
print('EXAMPLE:: Ping count is incorrect, expected 3. actual $pingCount');
}
}

/// The successful connect callback
Expand All @@ -205,4 +222,12 @@ void onConnected() {
void pong() {
print('EXAMPLE::Ping response client callback invoked');
pongCount++;
print(
'EXAMPLE::Latency of this ping/pong cycle is ${client.lastCycleLatency} milliseconds');
}

/// Ping callback
void ping() {
print('EXAMPLE::Ping sent client callback invoked');
pingCount++;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ part of '../../mqtt_client.dart';
/// Ping response received callback
typedef PongCallback = void Function();

/// Ping request sent callback
typedef PingCallback = void Function();

/// Implements keep alive functionality on the Mqtt Connection,
/// ensuring that the connection remains active according to the
/// keep alive seconds setting.
Expand Down Expand Up @@ -64,9 +67,24 @@ class MqttConnectionKeepAlive {
/// Used to synchronise shutdown and ping operations.
bool _shutdownPadlock = false;

/// Ping response received callback
/// Ping response received callback.
PongCallback? pongCallback;

/// Ping request sent callback.
PingCallback? pingCallback;

/// Latency(time between sending a ping and receiving a pong) in ms
/// of the last ping/pong cycle. Reset on disconnect.
int lastCycleLatency = 0;

int _lastPingTime = 0;

/// Average latency(time between sending a ping and receiving a pong) in ms
/// of all the ping/pong cycles in a connection period. Reset on disconnect.
int averageCycleLatency = 0;

int _cycleCount = 0;

/// The event bus
events.EventBus? _clientEventBus;

Expand All @@ -88,6 +106,10 @@ class MqttConnectionKeepAlive {
try {
_connectionHandler.sendMessage(pingMsg);
pinged = true;
_lastPingTime = DateTime.now().millisecondsSinceEpoch;
if (pingCallback != null) {
pingCallback!();
}
} catch (e) {
MqttLogger.log(
'MqttConnectionKeepAlive::pingRequired - exception occurred');
Expand Down Expand Up @@ -151,10 +173,20 @@ class MqttConnectionKeepAlive {
/// Processed ping response messages received from a message broker.
bool pingResponseReceived(MqttMessage? pingMsg) {
MqttLogger.log('MqttConnectionKeepAlive::pingResponseReceived');

// Calculate latencies
lastCycleLatency = DateTime.now().millisecondsSinceEpoch - _lastPingTime;
_cycleCount++;
// Average latency calculation is
// new_avg = prev_avg + ((new_value − prev_avg) ~/ n + 1)
averageCycleLatency +=
(lastCycleLatency - averageCycleLatency) ~/ _cycleCount;

// Call the pong callback if not null
if (pongCallback != null) {
pongCallback!();
}

// Cancel the disconnect timer if needed.
disconnectTimer?.cancel();
return true;
Expand All @@ -168,6 +200,9 @@ class MqttConnectionKeepAlive {
MqttLogger.log('MqttConnectionKeepAlive::stop - stopping keep alive');
pingTimer!.cancel();
disconnectTimer?.cancel();
lastCycleLatency = 0;
averageCycleLatency = 0;
_cycleCount = 0;
}

/// Handle the disconnect timer timeout
Expand Down
27 changes: 26 additions & 1 deletion lib/src/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class MqttClient {
subscriptionsManager?.onUnsubscribed = cb;
}

/// Ping response received callback.
/// Ping response(pong) received callback.
/// If set when a ping response is received from the broker
/// this will be called.
/// Can be used for health monitoring outside of the client itself.
Expand All @@ -264,6 +264,28 @@ class MqttClient {
keepAlive?.pongCallback = cb;
}

/// Ping request(ping) sent callback.
/// If set when a ping request is sent from the client
/// this will be called.
/// Can be used in tandem with the [pongCallback] for latency calculations.
PingCallback? _pingCallback;

/// The ping sent callback
PingCallback? get pingCallback => _pingCallback;

set pingCallback(PingCallback? cb) {
_pingCallback = cb;
keepAlive?.pingCallback = cb;
}

/// The latency of the last ping/pong cycle in milliseconds.
/// Cleared on disconnect.
int? get lastCycleLatency => keepAlive?.lastCycleLatency;

/// The average latency of all ping/pong cycles in a connection period in
/// milliseconds. Cleared on disconnect.
int? get averageCycleLatency => keepAlive?.averageCycleLatency;

/// The event bus
@protected
events.EventBus? clientEventBus;
Expand Down Expand Up @@ -320,6 +342,9 @@ class MqttClient {
if (pongCallback != null) {
keepAlive!.pongCallback = pongCallback;
}
if (pingCallback != null) {
keepAlive!.pingCallback = pingCallback;
}
} else {
MqttLogger.log('MqttClient::connect - keep alive is disabled');
}
Expand Down
93 changes: 93 additions & 0 deletions test/mqtt_client_keep_alive_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,97 @@ void main() {
expect(ka.disconnectTimer, isNull);
});
});
group('Latency', () {
test('Ping callback', () async {
final clientEventBus = events.EventBus();
var disconnect = false;
void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) {
disconnect = true;
}

var pingCalled = false;
void pingCallback() {
pingCalled = true;
}

clientEventBus
.on<DisconnectOnNoPingResponse>()
.listen(disconnectOnNoPingResponse);
final ch = MockCH(
clientEventBus,
maxConnectionAttempts: 3,
);
ch.connectionStatus.state = MqttConnectionState.connected;
final ka = MqttConnectionKeepAlive(ch, clientEventBus, 2);
ka.pingCallback = pingCallback;
verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any()))
.called(1);
verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any()))
.called(1);
verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1);
expect(ka.pingTimer?.isActive, isTrue);
expect(ka.disconnectTimer, isNull);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
expect(pingCalled, isTrue);
final pingMessageRx = MqttPingResponseMessage();
ka.pingResponseReceived(pingMessageRx);
expect(disconnect, isFalse);
ka.stop();
expect(ka.pingTimer?.isActive, isFalse);
expect(ka.disconnectTimer, isNull);
});
test('Latency counts', () async {
final latencies = <int>[0, 0, 0];
final clientEventBus = events.EventBus();
var disconnect = false;
void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) {
disconnect = true;
}

clientEventBus
.on<DisconnectOnNoPingResponse>()
.listen(disconnectOnNoPingResponse);
final ch = MockCH(
clientEventBus,
maxConnectionAttempts: 3,
);
ch.connectionStatus.state = MqttConnectionState.connected;
final ka = MqttConnectionKeepAlive(ch, clientEventBus, 3);
verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any()))
.called(1);
verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any()))
.called(1);
verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1);
expect(ka.pingTimer?.isActive, isTrue);
expect(ka.disconnectTimer, isNull);
await MqttUtilities.asyncSleep(4);
verify(() => ch.sendMessage(any())).called(1);
final pingMessageRx = MqttPingResponseMessage();
ka.pingResponseReceived(pingMessageRx);
latencies[0] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
ka.pingResponseReceived(pingMessageRx);
latencies[1] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
ka.pingResponseReceived(pingMessageRx);
latencies[2] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency,
(latencies[0] + latencies[1] + latencies[2]) ~/ 3);
expect(disconnect, isFalse);
ka.stop();
expect(ka.averageCycleLatency, 0);
expect(ka.lastCycleLatency, 0);
expect(ka.pingTimer?.isActive, isFalse);
expect(ka.disconnectTimer, isNull);
});
});
}
Loading