From 1c998238ea99d48cb771cb939cfa498ca0ffd502 Mon Sep 17 00:00:00 2001 From: Rhishikesh Date: Wed, 22 May 2024 23:14:29 +0200 Subject: [PATCH] feat: add keep-alive capability (#93) If no messages are received within the keep-alive duration, close the connection. --- Cargo.lock | 7 +++++ Cargo.toml | 1 + src/next/actor.rs | 62 ++++++++++++++++++++++++++++++++++----------- src/next/builder.rs | 16 ++++++++++-- 4 files changed, 69 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e45cd7d..400b098 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -997,6 +997,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.28" @@ -1086,6 +1092,7 @@ dependencies = [ "axum-macros", "cynic", "futures", + "futures-timer", "graphql-ws-client", "graphql_client", "insta", diff --git a/Cargo.toml b/Cargo.toml index f768fb2..c594b69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ graphql_client = { version = "0.13.0", optional = true } ws_stream_wasm = { version = "0.7", optional = true } pin-project-lite = { version = "0.2.7", optional = true } pharos = { version = "0.5.2", optional = true } +futures-timer = "3.0.3" [dev-dependencies] assert_matches = "1.5" diff --git a/src/next/actor.rs b/src/next/actor.rs index 8528390..60ecfd9 100644 --- a/src/next/actor.rs +++ b/src/next/actor.rs @@ -1,12 +1,17 @@ use std::{ collections::{hash_map::Entry, HashMap}, future::IntoFuture, + time::Duration, }; use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt}; use serde_json::{json, Value}; -use crate::{logging::trace, protocol::Event, Error}; +use crate::{ + logging::{trace, warning}, + protocol::Event, + Error, +}; use super::{ connection::{Connection, Message}, @@ -23,17 +28,20 @@ pub struct ConnectionActor { client: Option>, connection: Box, operations: HashMap>, + keep_alive_duration: Option, } impl ConnectionActor { pub(super) fn new( connection: Box, client: mpsc::Receiver, + keep_alive_duration: Option, ) -> Self { ConnectionActor { client: Some(client), connection, operations: HashMap::new(), + keep_alive_duration, } } @@ -150,18 +158,44 @@ impl ConnectionActor { if let Some(client) = &mut self.client { let mut next_command = client.next().fuse(); let mut next_message = self.connection.receive().fuse(); - futures::select! { - command = next_command => { - let Some(command) = command else { - self.client.take(); - continue; - }; - - return Some(Next::Command(command)); - }, - message = next_message => { - return Some(Next::Message(message?)); - }, + if let Some(keep_alive_duration) = self.keep_alive_duration { + let mut keep_alive = futures_timer::Delay::new(keep_alive_duration).fuse(); + futures::select! { + _ = keep_alive => { + warning!( + "No messages received within keep-alive ({:?}s) from server. Closing the connection", + keep_alive_duration); + return Some(Next::Command(ConnectionCommand::Close( + 4503, + "Service unavailable. keep-alive failure".to_string(), + ))); + }, + command = next_command => { + let Some(command) = command else { + self.client.take(); + continue; + }; + + return Some(Next::Command(command)); + }, + message = next_message => { + return Some(Next::Message(message?)); + }, + } + } else { + futures::select! { + command = next_command => { + let Some(command) = command else { + self.client.take(); + continue; + }; + + return Some(Next::Command(command)); + }, + message = next_message => { + return Some(Next::Message(message?)); + }, + } } } @@ -170,8 +204,6 @@ impl ConnectionActor { // then we should shut down return None; } - - return Some(Next::Message(self.connection.receive().await?)); } } } diff --git a/src/next/builder.rs b/src/next/builder.rs index 3486121..ce25ffb 100644 --- a/src/next/builder.rs +++ b/src/next/builder.rs @@ -1,4 +1,4 @@ -use std::future::IntoFuture; +use std::{future::IntoFuture, time::Duration}; use futures::{channel::mpsc, future::BoxFuture, FutureExt}; use serde::Serialize; @@ -30,6 +30,7 @@ pub struct ClientBuilder { payload: Option, subscription_buffer_size: Option, connection: Box, + keep_alive_duration: Option, } impl super::Client { @@ -52,6 +53,7 @@ impl super::Client { payload: None, subscription_buffer_size: None, connection: Box::new(connection), + keep_alive_duration: None, } } } @@ -80,6 +82,15 @@ impl ClientBuilder { } } + /// Sets the duration within which if Client does not receive Ping messages + /// the connection will be closed + pub fn keep_alive_duration(self, new: Duration) -> Self { + ClientBuilder { + keep_alive_duration: Some(new), + ..self + } + } + /// Initialise a Client and use it to run a single subscription /// /// ```rust @@ -146,6 +157,7 @@ impl ClientBuilder { let Self { payload, subscription_buffer_size, + keep_alive_duration, mut connection, } = self; @@ -195,7 +207,7 @@ impl ClientBuilder { let (command_sender, command_receiver) = mpsc::channel(5); - let actor = ConnectionActor::new(connection, command_receiver); + let actor = ConnectionActor::new(connection, command_receiver, keep_alive_duration); let client = Client::new_internal(command_sender, subscription_buffer_size.unwrap_or(5));