Skip to content

Commit

Permalink
feat: add keep-alive capability (#93)
Browse files Browse the repository at this point in the history
If no messages are received within the keep-alive duration, close the
connection.
  • Loading branch information
rhishikeshj authored May 22, 2024
1 parent a9a98cc commit 1c99823
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 17 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ graphql_client = { version = "0.13.0", optional = true }
ws_stream_wasm = { version = "0.7", optional = true }
pin-project-lite = { version = "0.2.7", optional = true }
pharos = { version = "0.5.2", optional = true }
futures-timer = "3.0.3"

[dev-dependencies]
assert_matches = "1.5"
Expand Down
62 changes: 47 additions & 15 deletions src/next/actor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::{
collections::{hash_map::Entry, HashMap},
future::IntoFuture,
time::Duration,
};

use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
use serde_json::{json, Value};

use crate::{logging::trace, protocol::Event, Error};
use crate::{
logging::{trace, warning},
protocol::Event,
Error,
};

use super::{
connection::{Connection, Message},
Expand All @@ -23,17 +28,20 @@ pub struct ConnectionActor {
client: Option<mpsc::Receiver<ConnectionCommand>>,
connection: Box<dyn Connection + Send>,
operations: HashMap<usize, mpsc::Sender<Value>>,
keep_alive_duration: Option<Duration>,
}

impl ConnectionActor {
pub(super) fn new(
connection: Box<dyn Connection + Send>,
client: mpsc::Receiver<ConnectionCommand>,
keep_alive_duration: Option<Duration>,
) -> Self {
ConnectionActor {
client: Some(client),
connection,
operations: HashMap::new(),
keep_alive_duration,
}
}

Expand Down Expand Up @@ -150,18 +158,44 @@ impl ConnectionActor {
if let Some(client) = &mut self.client {
let mut next_command = client.next().fuse();
let mut next_message = self.connection.receive().fuse();
futures::select! {
command = next_command => {
let Some(command) = command else {
self.client.take();
continue;
};

return Some(Next::Command(command));
},
message = next_message => {
return Some(Next::Message(message?));
},
if let Some(keep_alive_duration) = self.keep_alive_duration {
let mut keep_alive = futures_timer::Delay::new(keep_alive_duration).fuse();
futures::select! {
_ = keep_alive => {
warning!(
"No messages received within keep-alive ({:?}s) from server. Closing the connection",
keep_alive_duration);
return Some(Next::Command(ConnectionCommand::Close(
4503,
"Service unavailable. keep-alive failure".to_string(),
)));
},
command = next_command => {
let Some(command) = command else {
self.client.take();
continue;
};

return Some(Next::Command(command));
},
message = next_message => {
return Some(Next::Message(message?));
},
}
} else {
futures::select! {
command = next_command => {
let Some(command) = command else {
self.client.take();
continue;
};

return Some(Next::Command(command));
},
message = next_message => {
return Some(Next::Message(message?));
},
}
}
}

Expand All @@ -170,8 +204,6 @@ impl ConnectionActor {
// then we should shut down
return None;
}

return Some(Next::Message(self.connection.receive().await?));
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/next/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::future::IntoFuture;
use std::{future::IntoFuture, time::Duration};

use futures::{channel::mpsc, future::BoxFuture, FutureExt};
use serde::Serialize;
Expand Down Expand Up @@ -30,6 +30,7 @@ pub struct ClientBuilder {
payload: Option<serde_json::Value>,
subscription_buffer_size: Option<usize>,
connection: Box<dyn Connection + Send>,
keep_alive_duration: Option<Duration>,
}

impl super::Client {
Expand All @@ -52,6 +53,7 @@ impl super::Client {
payload: None,
subscription_buffer_size: None,
connection: Box::new(connection),
keep_alive_duration: None,
}
}
}
Expand Down Expand Up @@ -80,6 +82,15 @@ impl ClientBuilder {
}
}

/// Sets the duration within which if Client does not receive Ping messages
/// the connection will be closed
pub fn keep_alive_duration(self, new: Duration) -> Self {
ClientBuilder {
keep_alive_duration: Some(new),
..self
}
}

/// Initialise a Client and use it to run a single subscription
///
/// ```rust
Expand Down Expand Up @@ -146,6 +157,7 @@ impl ClientBuilder {
let Self {
payload,
subscription_buffer_size,
keep_alive_duration,
mut connection,
} = self;

Expand Down Expand Up @@ -195,7 +207,7 @@ impl ClientBuilder {

let (command_sender, command_receiver) = mpsc::channel(5);

let actor = ConnectionActor::new(connection, command_receiver);
let actor = ConnectionActor::new(connection, command_receiver, keep_alive_duration);

let client = Client::new_internal(command_sender, subscription_buffer_size.unwrap_or(5));

Expand Down

0 comments on commit 1c99823

Please sign in to comment.