Skip to content

Commit

Permalink
Simplify websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
veloce committed Oct 23, 2023
1 parent dfe2451 commit 4809238
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 54 deletions.
98 changes: 47 additions & 51 deletions lib/src/model/auth/auth_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const _kDisconnectOnBackgroundTimeout = Duration(minutes: 20);
typedef CurrentConnection = ({
Uri route,
IOWebSocketChannel channel,
StreamController<SocketEvent> streamController,
});

/// Lichess websocket client.
Expand All @@ -70,27 +69,31 @@ typedef CurrentConnection = ({
/// The socket will close itself after a short delay when there are no more
/// subscriptions.
class AuthSocket {
AuthSocket(this._ref, this._log) {
_appLifecycleListener = AppLifecycleListener(
onHide: () {
_closeInBackgroundTimer?.cancel();
_closeInBackgroundTimer = Timer(
_kDisconnectOnBackgroundTimeout,
() {
_log.info(
'App is in background for ${_kDisconnectOnBackgroundTimeout.inMinutes}m, closing socket.',
);
_close();
},
);
},
onShow: () {
_closeInBackgroundTimer?.cancel();
},
);
}

late final AppLifecycleListener _appLifecycleListener;
AuthSocket(this._ref, this._log);

late final AppLifecycleListener _appLifecycleListener = AppLifecycleListener(
onHide: () {
_closeInBackgroundTimer?.cancel();
_closeInBackgroundTimer = Timer(
_kDisconnectOnBackgroundTimeout,
() {
_log.info(
'App is in background for ${_kDisconnectOnBackgroundTimeout.inMinutes}m, closing socket.',
);
_close();
},
);
},
onShow: () {
_closeInBackgroundTimer?.cancel();
},
);

late final StreamController<SocketEvent> _streamController =
StreamController<SocketEvent>.broadcast(
onListen: _onStreamListen,
onCancel: _onStreamCancel,
);

final Logger _log;
final AuthSocketRef _ref;
Expand Down Expand Up @@ -120,10 +123,8 @@ class AuthSocket {
/// The current socket route if connected.
Uri? get route => _connection?.route;

/// Returns the socket event broadcast stream filtered on the given route if connected.
Stream<SocketEvent>? getStreamOnRoute(Uri route) =>
_connection?.streamController.stream
.where((_) => route == _connection?.route);
/// The socket broadcast stream.
Stream<SocketEvent> get stream => _streamController.stream;

/// The Socket Random Identifier.
String get sri => _ref.read(sriProvider);
Expand All @@ -135,19 +136,29 @@ class AuthSocket {
/// An optional `forceReconnect` boolean can be provided to force a reconnection.
///
/// Returns a tuple of:
/// - the socket event broadcast [Stream]
/// - the socket event broadcast [Stream] filtered by the route,
/// - a [Future] that completes when the socket is ready. The future might never
/// complete if the socket fails to connect because it tries to reconnect automatically.
(Stream<SocketEvent>, Future<void>) connect(
Uri route, {
bool? forceReconnect = false,
}) {
final filteredStream = _streamController.stream.where((_) {
if (route != _connection?.route) {
_log.warning(
'Received event for route $route on active route ${_connection?.route}. Have you forgotten to cancel a subscription?',
);
return false;
}
return true;
});

if (forceReconnect == false &&
_connection != null &&
_connection!.channel.closeCode == null &&
route == _connection!.route) {
return (
_connection!.streamController.stream,
filteredStream,
_connection!.channel.ready,
);
}
Expand All @@ -159,15 +170,7 @@ class AuthSocket {
final connection = _doConnect(route);

return (
connection.streamController.stream.where((_) {
if (route != _connection?.route) {
_log.warning(
'Received event for route $route on active route ${_connection?.route}. Have you forgotten to cancel a subscription?',
);
return false;
}
return true;
}),
filteredStream,
connection.channel.ready,
);
}
Expand Down Expand Up @@ -212,6 +215,7 @@ class AuthSocket {

void dispose() {
_close();
_streamController.close();
_appLifecycleListener.dispose();
}

Expand All @@ -226,16 +230,14 @@ class AuthSocket {
_closeTimer?.cancel();
_closeTimer = Timer(
delay ?? Duration.zero,
() => _closeCurrent(() {
() {
_closeCurrent();
if (_connection == null) {
return;
}
_connection?.streamController.close().then((_) {
_log.fine('WebSocket stream controller properly closed.');
});
_log.info('WebSocket connection closed.');
_connection = null;
}),
},
);
}

Expand Down Expand Up @@ -282,11 +284,6 @@ class AuthSocket {
_connection = (
route: route,
channel: channel,
streamController: _connection?.streamController ??
StreamController<SocketEvent>.broadcast(
onListen: _onStreamListen,
onCancel: _onStreamCancel,
),
);

channel.ready.then(
Expand Down Expand Up @@ -331,7 +328,7 @@ class AuthSocket {
}

if (event != SocketEvent.pong) {
_connection?.streamController.add(event);
_streamController.add(event);
}
}

Expand Down Expand Up @@ -401,14 +398,13 @@ class AuthSocket {
}
}

/// Closes current connection, but keep the streamController open to allow for reconnections
void _closeCurrent([void Function()? afterClose]) {
/// Closes websocket connection but keeps the reference to the current connection.
void _closeCurrent() {
_sink?.close();
_socketStreamSubscription?.cancel();
_pingTimer?.cancel();
_reconnectTimer?.cancel();
_ackResendTimer?.cancel();
afterClose?.call();
}
}

Expand Down
4 changes: 1 addition & 3 deletions lib/src/model/lobby/lobby_game.dart
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ class LobbyNumbers extends _$LobbyNumbers {

@override
({int nbPlayers, int nbGames})? build() {
final socket = ref.watch(authSocketProvider);
final stream = socket.getStreamOnRoute(Uri(path: '/lobby/socket/v5')) ??
const Stream.empty();
final stream = ref.watch(authSocketProvider).stream;

ref.onDispose(() {
_socketSubscription?.cancel();
Expand Down

0 comments on commit 4809238

Please sign in to comment.