From 6e2e2db6c06f9cb60204b92539c578915eb92c22 Mon Sep 17 00:00:00 2001 From: Santiago Garcia Gil Date: Wed, 15 Jan 2025 09:02:33 -0500 Subject: [PATCH] [dart client] fix: retry reconnect web socket (#97) --- clients/client-dart/CHANGELOG.md | 3 ++ clients/client-dart/lib/src/async_client.dart | 38 ++++++++++++------- clients/client-dart/lib/src/status_codes.dart | 1 + clients/client-dart/lib/src/transport.dart | 15 +++++--- clients/client-dart/pubspec.yaml | 2 +- 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/clients/client-dart/CHANGELOG.md b/clients/client-dart/CHANGELOG.md index f479ca9..62abd10 100644 --- a/clients/client-dart/CHANGELOG.md +++ b/clients/client-dart/CHANGELOG.md @@ -1,4 +1,7 @@ # CHANGELOG +## [2.0.2] +- Fix reconnection +- Refactor ## [2.0.1] - Null safety - Refactor diff --git a/clients/client-dart/lib/src/async_client.dart b/clients/client-dart/lib/src/async_client.dart index 8f7b0f0..f53f8c3 100644 --- a/clients/client-dart/lib/src/async_client.dart +++ b/clients/client-dart/lib/src/async_client.dart @@ -31,7 +31,7 @@ class AsyncClient { late List _subProtocols; Transport? _transport; - late String _actualToken; + late String _currentToken; late RetryTimer _connectRetryTimer; late IOWebSocketChannel _channel; // ---- @@ -41,7 +41,7 @@ class AsyncClient { // ---- AsyncClient(this._config) { - _actualToken = _config.channelSecret; + _currentToken = _config.channelSecret; _subProtocols = [JSON_FLOW]; if (_config.enableBinaryTransport) { @@ -94,11 +94,11 @@ class AsyncClient { return [message, kind]; }) .where((data) => - data[1] == + data.last == EVENT_KIND_USER) // only allows passing user events from this point .map((data) { // performs an ack of the user message received - final message = data[0] as ChannelMessage; + final message = data.first as ChannelMessage; _ackMessage(message); return message; @@ -223,7 +223,7 @@ class AsyncClient { // Function to handle the refreshed channel secret sent by the server void _handleNewToken(ChannelMessage message) { - _actualToken = message.payload; + _currentToken = message.payload; _ackMessage(message); } @@ -237,10 +237,17 @@ class AsyncClient { } void _onTransportClose(int code, String reason) { - _socketStreamSub?.cancel(); - _socketStreamSub = null; - _transport = null; + _log.fine('close code: $code'); + bool wasClosedClean = _transport?.isClosedCleanly() ?? true; + cleanConnection(); + if (!wasClosedClean) { + _log.severe( + 'Transport not closed cleanly, Scheduling reconnect... code: $code', + ); + _connectRetryTimer.schedule(); + return; + } switch (code) { case StatusCodes.ok: { @@ -257,9 +264,8 @@ class AsyncClient { default: { _log.severe( - 'Transport not closed cleanly, Scheduling reconnect... code: $code', + '-- Transport closed cleanly, not reconnecting! code: $code', ); - _connectRetryTimer.schedule(); } } } @@ -271,16 +277,20 @@ class AsyncClient { 'Transport error and channel is not open, Scheduling reconnect...', ); - _socketStreamSub?.cancel(); - _socketStreamSub = null; - _transport = null; + cleanConnection(); _connectRetryTimer.schedule(); } } + void cleanConnection() { + _socketStreamSub?.cancel(); + _socketStreamSub = null; + _transport = null; + } + void _onListen() { _socketStreamSub = _transport?.subscribe(cancelOnErrorFlag: true); - _transport?.send('Auth::$_actualToken'); + _transport?.send('Auth::$_currentToken'); } } diff --git a/clients/client-dart/lib/src/status_codes.dart b/clients/client-dart/lib/src/status_codes.dart index 50c965a..8acad82 100644 --- a/clients/client-dart/lib/src/status_codes.dart +++ b/clients/client-dart/lib/src/status_codes.dart @@ -1,4 +1,5 @@ class StatusCodes { static const int ok = 1000; static const int credentials_error = 1008; + static const int error = 1006; } diff --git a/clients/client-dart/lib/src/transport.dart b/clients/client-dart/lib/src/transport.dart index 186d8e8..a211e25 100644 --- a/clients/client-dart/lib/src/transport.dart +++ b/clients/client-dart/lib/src/transport.dart @@ -8,14 +8,15 @@ import 'binary_decoder.dart'; import 'channel_message.dart'; import 'json_decoder.dart'; import 'message_decoder.dart'; +import 'status_codes.dart'; class Transport { final _log = Logger('Transport'); final IOWebSocketChannel _webSocketCh; final StreamController _localStream; final int _heartbeatIntervalMs; - final Function _signalSocketClose; - final Function _signalSocketError; + final Function(int, String) _signalSocketClose; + final Function(Object) _signalSocketError; String? pendingHeartbeatRef; int _ref = 0; @@ -50,11 +51,14 @@ class Transport { return _webSocketCh.protocol; } + bool isClosedCleanly() => _closeWasClean; + Future close(int code, String reason) async { _closeWasClean = true; if (_heartbeatTimer != null) { _heartbeatTimer?.cancel(); } + return await _webSocketCh.sink.close(code, reason); } @@ -107,7 +111,8 @@ class Transport { _heartbeatTimer?.cancel(); } - _signalSocketClose(_webSocketCh.closeCode, _webSocketCh.closeReason); + _signalSocketClose(_webSocketCh.closeCode ?? StatusCodes.error, + _webSocketCh.closeReason ?? ''); } void resetHeartbeat() { @@ -138,9 +143,9 @@ class Transport { } void _abnormalClose(reason) { + _log.fine('Abnormal Close'); _closeWasClean = false; - _log.fine('Abnormal Close: Modify clean to: $_closeWasClean'); - _webSocketCh.sink.close(1000, reason); + _webSocketCh.sink.close(StatusCodes.error, reason); } String _makeRef() { diff --git a/clients/client-dart/pubspec.yaml b/clients/client-dart/pubspec.yaml index 3e7d9bf..9f3f800 100644 --- a/clients/client-dart/pubspec.yaml +++ b/clients/client-dart/pubspec.yaml @@ -1,6 +1,6 @@ name: channel_sender_client description: Client for Async DataFlow Channel Sender -version: 2.0.1 +version: 2.0.2 homepage: https://github.com/bancolombia/async-dataflow repository: https://github.com/bancolombia/async-dataflow.git