Skip to content

Commit

Permalink
chore: simplify keep alive implementation (#94)
Browse files Browse the repository at this point in the history
A small tweak to #93 to avoid duplicating logic
  • Loading branch information
obmarg authored May 22, 2024
1 parent 1c99823 commit c621e5e
Showing 1 changed file with 42 additions and 40 deletions.
82 changes: 42 additions & 40 deletions src/next/actor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{
collections::{hash_map::Entry, HashMap},
future::IntoFuture,
future::{pending, IntoFuture},
time::Duration,
};

use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
use futures::{
channel::mpsc,
future::{BoxFuture, FusedFuture},
pin_mut, FutureExt, SinkExt, StreamExt,
};
use serde_json::{json, Value};

use crate::{
Expand Down Expand Up @@ -158,44 +162,32 @@ impl ConnectionActor {
if let Some(client) = &mut self.client {
let mut next_command = client.next().fuse();
let mut next_message = self.connection.receive().fuse();
if let Some(keep_alive_duration) = self.keep_alive_duration {
let mut keep_alive = futures_timer::Delay::new(keep_alive_duration).fuse();
futures::select! {
_ = keep_alive => {
warning!(
"No messages received within keep-alive ({:?}s) from server. Closing the connection",
keep_alive_duration);
return Some(Next::Command(ConnectionCommand::Close(
4503,
"Service unavailable. keep-alive failure".to_string(),
)));
},
command = next_command => {
let Some(command) = command else {
self.client.take();
continue;
};

return Some(Next::Command(command));
},
message = next_message => {
return Some(Next::Message(message?));
},
}
} else {
futures::select! {
command = next_command => {
let Some(command) = command else {
self.client.take();
continue;
};

return Some(Next::Command(command));
},
message = next_message => {
return Some(Next::Message(message?));
},
}

let keep_alive = delay_or_pending(self.keep_alive_duration);
pin_mut!(keep_alive);

futures::select! {
_ = keep_alive => {
warning!(
"No messages received within keep-alive ({:?}s) from server. Closing the connection",
keep_alive_duration
);
return Some(Next::Command(ConnectionCommand::Close(
4503,
"Service unavailable. keep-alive failure".to_string(),
)));
},
command = next_command => {
let Some(command) = command else {
self.client.take();
continue;
};

return Some(Next::Command(command));
},
message = next_message => {
return Some(Next::Message(message?));
},
}
}

Expand Down Expand Up @@ -268,3 +260,13 @@ impl Event {
}
}
}

fn delay_or_pending(duration: Option<Duration>) -> impl FusedFuture {
async move {
match duration {
Some(duration) => futures_timer::Delay::new(duration).await,
None => pending::<()>().await,
}
}
.fuse()
}

0 comments on commit c621e5e

Please sign in to comment.