Skip to content

Commit

Permalink
Merge branch 'feature/improve-dart-client' into fix/socket-reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
santitigaga committed Feb 7, 2025
2 parents 2113286 + 7c9f1d0 commit ead8468
Show file tree
Hide file tree
Showing 34 changed files with 765 additions and 405 deletions.
4 changes: 4 additions & 0 deletions clients/client-dart/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# CHANGELOG
## [2.0.3]
- Fix reconnection with heartbeats issue
- Improve logs
- Refactor
## [2.0.2]
- Fix reconnection
- Refactor
Expand Down
119 changes: 58 additions & 61 deletions clients/client-dart/lib/src/async_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import 'package:web_socket_channel/io.dart';
import 'async_config.dart';
import 'channel_message.dart';
import 'retry_timer.dart';
import 'status_codes.dart';
import 'transport.dart';

/// Async Data Flow Low Level Client
Expand Down Expand Up @@ -51,13 +50,16 @@ class AsyncClient {
// creates localstream
_localStream = StreamController(onListen: _onListen);

_connectRetryTimer = RetryTimer(() async {
_openChannel();
_buildTransport();
_onListen();
_connectRetryTimer = RetryTimer(
() async {
_openChannel();
_buildTransport();
_onListen();

return 1;
});
return 1;
},
maxRetries: _config.maxRetries,
);
}

void dispose() {
Expand All @@ -67,8 +69,9 @@ class AsyncClient {
// Opens up the connection and performs auth flow.

AsyncClient connect() {
if (_transport != null && _transport!.isOpen()) {
_log.warning('Connect: Transport is already open');
var transport = _transport;
if (transport != null && transport.isOpen()) {
_log.info('async-client. socket already created');

return this;
}
Expand Down Expand Up @@ -107,7 +110,7 @@ class AsyncClient {
// build transport object
_buildTransport();

_log.finest('ADF connection');
_log.info('async-client. ADF connection');
_broadCastStream = _broadCastStream.asBroadcastStream();

return this;
Expand Down Expand Up @@ -141,29 +144,36 @@ class AsyncClient {
throw ArgumentError('Invalid onData function');
}

return _broadCastStream.listen((message) {
if (eventFilters.contains(message.event)) {
onData(message);
}
}, onError: (error, stacktrace) {
if (onError != null) {
onError(error);
}
}, onDone: () {
_log.warning('Subscription for "$eventFilters" terminated.');
});
return _broadCastStream.listen(
(message) {
if (eventFilters.contains(message.event)) {
onData(message);
}
},
onError: (error, stacktrace) {
if (onError != null) {
onError(error);
}
},
onDone: () {
_log.warning(
'async-client. Subscription for "$eventFilters" terminated.',
);
},
);
}

void _openChannel() {
try {
var url = '${_config.socketUrl}?channel=${_config.channelRef}';
_channel = IOWebSocketChannel.connect(
'${_config.socketUrl}?channel=${_config.channelRef}',
url,
protocols: _subProtocols,
headers: _buildHeaders(),
);
_log.finest('New websocket connection');
_log.info('async-client. New websocket connection ${_config.channelRef}');
} catch (e) {
_log.severe('Error creating websocket connection: $e');
_log.severe('async-client. Error creating websocket connection: $e');
}
}

Expand All @@ -174,11 +184,11 @@ class AsyncClient {
_localStream,
_onTransportClose,
_onTransportError,
_config.heartbeatInterval,
_config.hbInterval,
);
_log.finest('Transport configured');
_log.info('async-client. Transport configured');
} catch (e) {
_log.severe('Error configuring transport: $e');
_log.severe('async-client. Error configuring transport: $e');
}
}

Expand All @@ -203,11 +213,12 @@ class AsyncClient {
return base64Url.encode(values);
}

// Disconnects client with ADF channel sender
Future<bool> disconnect() async {
await _transport?.close(StatusCodes.ok, 'Client disconnect');
_log.finer('async-client. disconnect() called');

await _transport?.close(1000, 'Client disconnect');
_connectRetryTimer.reset();
_log.finer('transport closed');
_log.finer('async-client. async-client. disconnect() called end');

return true;
}
Expand Down Expand Up @@ -237,44 +248,30 @@ class AsyncClient {
}

void _onTransportClose(int code, String reason) {
_log.fine('close code: $code');
bool wasClosedClean = _transport?.isClosedCleanly() ?? true;
cleanConnection();
if (!wasClosedClean) {
_log.severe(
'Transport not closed cleanly, Scheduling reconnect... code: $code',
);

_log.fine('async-client. channel close: $code $reason');
bool closeWasClean = _transport?.isClosedCleanly() ?? true;
int reasonCode = extractCode(reason);
bool shouldRetry = code > 1001 || (code == 1001 && reasonCode >= 3050);

if (!closeWasClean &&
shouldRetry &&
reason != 'Invalid token for channel') {
_log.info('async-client. Scheduling reconnect, clean: $closeWasClean');
_connectRetryTimer.schedule();
return;
}
switch (code) {
case StatusCodes.ok:
{
_log.info('Transport closed by client, not reconnecting');
}
break;
case StatusCodes.credentials_error:
{
_log.severe(
'Transport closed due invalid credentials, not reconnecting!',
);
}
break;
default:
{
_log.severe(
'-- Transport closed cleanly, not reconnecting! code: $code',
);
}
} else {
cleanConnection();
}
}

int extractCode(String stringCode) {
return int.tryParse(stringCode) ?? 0;
}

void _onTransportError(Object error) {
_log.severe('Transport error: $error');
_log.severe('async-client. Transport error: $error');
if (!isOpen()) {
_log.severe(
'Transport error and channel is not open, Scheduling reconnect...',
'async-client. Transport error and channel is not open, Scheduling reconnect...',
);

cleanConnection();
Expand Down
10 changes: 7 additions & 3 deletions clients/client-dart/lib/src/async_config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ class AsyncConfig {
final String channelRef;
final String channelSecret;
final bool enableBinaryTransport;
final int heartbeatInterval;
int hbInterval = 5000;
int? maxRetries;

AsyncConfig({
required this.socketUrl,
required this.channelRef,
required this.channelSecret,
this.enableBinaryTransport = false,
this.heartbeatInterval = 1000,
});
int? heartbeatInterval,
this.maxRetries,
}) {
hbInterval = heartbeatInterval ?? hbInterval;
}
}
10 changes: 5 additions & 5 deletions clients/client-dart/lib/src/channel_message.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
class ChannelMessage {
final String? messageId;
final String? correlationId;
final String? event;
final dynamic payload;

ChannelMessage(this.messageId, this.correlationId, this.event, this.payload);

ChannelMessage.fromMap(Map<String, dynamic> map)
Expand All @@ -7,11 +12,6 @@ class ChannelMessage {
event = map['event'],
payload = map['payload'];

final String? messageId;
final String? correlationId;
final String? event;
dynamic payload;

@override
String toString() {
return '{messageId: $messageId, correlationId: $correlationId, event: $event, payload: $payload }';
Expand Down
19 changes: 12 additions & 7 deletions clients/client-dart/lib/src/retry_timer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import 'utils.dart';
class RetryTimer {
final _log = Logger('RetryTimer');

int _initialWait = 100;
late final Function _jitterFn;
int _initialWait = 50;
int _maxWait = 6000;
late Function _jitterFn;
int _maxRetries = 10;
int _defaultJitterFn(int num) {
var randomFactor = 0.25;

Expand All @@ -18,18 +19,19 @@ class RetryTimer {

int _tries = 0;
late Future Function() _function;

Timer? _timer;

RetryTimer(
Future Function() function, {
int? initialWait,
int? maxWait,
Function? jitterFn,
int? maxRetries,
}) {
_initialWait = initialWait ?? _initialWait;

_maxWait = maxWait ?? _maxWait;

_maxRetries = maxRetries ?? _maxRetries;
_jitterFn = jitterFn ?? _defaultJitterFn;
_function = function;
}
Expand All @@ -38,20 +40,23 @@ class RetryTimer {
_tries = 0;
_timer?.cancel();
_timer = null;
_log.finest('Retry timer reset');
_log.finest('async-client. Retry timer reset');
}

void schedule() {
var delay = _delay();
_log.info('async-client. scheduling retry in $delay ms');
_timer = Timer(Duration(milliseconds: delay), () async {
try {
await _function();
if (_tries <= _maxRetries) {
_log.info('async-client. retrying $_tries of $_maxRetries');
await _function();
}
} catch (e) {
_log.severe('Captured error calling delayed function: $e');
}
});
_tries = _tries + 1;
_log.fine('Retry scheduled. Due in $delay ms. Retry #$_tries');
}

int _delay() {
Expand Down
5 changes: 0 additions & 5 deletions clients/client-dart/lib/src/status_codes.dart

This file was deleted.

Loading

0 comments on commit ead8468

Please sign in to comment.