Skip to content

Commit

Permalink
[dart client] fix: retry reconnect web socket (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
santitigaga authored Jan 15, 2025
1 parent b72ca09 commit 6e2e2db
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 20 deletions.
3 changes: 3 additions & 0 deletions clients/client-dart/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# CHANGELOG
## [2.0.2]
- Fix reconnection
- Refactor
## [2.0.1]
- Null safety
- Refactor
Expand Down
38 changes: 24 additions & 14 deletions clients/client-dart/lib/src/async_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AsyncClient {

late List<String> _subProtocols;
Transport? _transport;
late String _actualToken;
late String _currentToken;
late RetryTimer _connectRetryTimer;
late IOWebSocketChannel _channel;
// ----
Expand All @@ -41,7 +41,7 @@ class AsyncClient {
// ----

AsyncClient(this._config) {
_actualToken = _config.channelSecret;
_currentToken = _config.channelSecret;

_subProtocols = [JSON_FLOW];
if (_config.enableBinaryTransport) {
Expand Down Expand Up @@ -94,11 +94,11 @@ class AsyncClient {
return [message, kind];
})
.where((data) =>
data[1] ==
data.last ==
EVENT_KIND_USER) // only allows passing user events from this point
.map((data) {
// performs an ack of the user message received
final message = data[0] as ChannelMessage;
final message = data.first as ChannelMessage;
_ackMessage(message);

return message;
Expand Down Expand Up @@ -223,7 +223,7 @@ class AsyncClient {

// Function to handle the refreshed channel secret sent by the server
void _handleNewToken(ChannelMessage message) {
_actualToken = message.payload;
_currentToken = message.payload;
_ackMessage(message);
}

Expand All @@ -237,10 +237,17 @@ class AsyncClient {
}

void _onTransportClose(int code, String reason) {
_socketStreamSub?.cancel();
_socketStreamSub = null;
_transport = null;
_log.fine('close code: $code');
bool wasClosedClean = _transport?.isClosedCleanly() ?? true;
cleanConnection();
if (!wasClosedClean) {
_log.severe(
'Transport not closed cleanly, Scheduling reconnect... code: $code',
);

_connectRetryTimer.schedule();
return;
}
switch (code) {
case StatusCodes.ok:
{
Expand All @@ -257,9 +264,8 @@ class AsyncClient {
default:
{
_log.severe(
'Transport not closed cleanly, Scheduling reconnect... code: $code',
'-- Transport closed cleanly, not reconnecting! code: $code',
);
_connectRetryTimer.schedule();
}
}
}
Expand All @@ -271,16 +277,20 @@ class AsyncClient {
'Transport error and channel is not open, Scheduling reconnect...',
);

_socketStreamSub?.cancel();
_socketStreamSub = null;
_transport = null;
cleanConnection();

_connectRetryTimer.schedule();
}
}

void cleanConnection() {
_socketStreamSub?.cancel();
_socketStreamSub = null;
_transport = null;
}

void _onListen() {
_socketStreamSub = _transport?.subscribe(cancelOnErrorFlag: true);
_transport?.send('Auth::$_actualToken');
_transport?.send('Auth::$_currentToken');
}
}
1 change: 1 addition & 0 deletions clients/client-dart/lib/src/status_codes.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
class StatusCodes {
static const int ok = 1000;
static const int credentials_error = 1008;
static const int error = 1006;
}
15 changes: 10 additions & 5 deletions clients/client-dart/lib/src/transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import 'binary_decoder.dart';
import 'channel_message.dart';
import 'json_decoder.dart';
import 'message_decoder.dart';
import 'status_codes.dart';

class Transport {
final _log = Logger('Transport');
final IOWebSocketChannel _webSocketCh;
final StreamController<ChannelMessage> _localStream;
final int _heartbeatIntervalMs;
final Function _signalSocketClose;
final Function _signalSocketError;
final Function(int, String) _signalSocketClose;
final Function(Object) _signalSocketError;

String? pendingHeartbeatRef;
int _ref = 0;
Expand Down Expand Up @@ -50,11 +51,14 @@ class Transport {
return _webSocketCh.protocol;
}

bool isClosedCleanly() => _closeWasClean;

Future<dynamic> close(int code, String reason) async {
_closeWasClean = true;
if (_heartbeatTimer != null) {
_heartbeatTimer?.cancel();
}

return await _webSocketCh.sink.close(code, reason);
}

Expand Down Expand Up @@ -107,7 +111,8 @@ class Transport {
_heartbeatTimer?.cancel();
}

_signalSocketClose(_webSocketCh.closeCode, _webSocketCh.closeReason);
_signalSocketClose(_webSocketCh.closeCode ?? StatusCodes.error,
_webSocketCh.closeReason ?? '');
}

void resetHeartbeat() {
Expand Down Expand Up @@ -138,9 +143,9 @@ class Transport {
}

void _abnormalClose(reason) {
_log.fine('Abnormal Close');
_closeWasClean = false;
_log.fine('Abnormal Close: Modify clean to: $_closeWasClean');
_webSocketCh.sink.close(1000, reason);
_webSocketCh.sink.close(StatusCodes.error, reason);
}

String _makeRef() {
Expand Down
2 changes: 1 addition & 1 deletion clients/client-dart/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: channel_sender_client
description: Client for Async DataFlow Channel Sender
version: 2.0.1
version: 2.0.2
homepage: https://github.com/bancolombia/async-dataflow
repository: https://github.com/bancolombia/async-dataflow.git

Expand Down

0 comments on commit 6e2e2db

Please sign in to comment.