From f539c0ce237784946fe58ec78c5a2c09edbd56ba Mon Sep 17 00:00:00 2001 From: Graeme Coupar Date: Wed, 22 May 2024 22:17:24 +0100 Subject: [PATCH] chore: simplify keep alive implementation --- src/next/actor.rs | 82 ++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/src/next/actor.rs b/src/next/actor.rs index 60ecfd9..7bc95da 100644 --- a/src/next/actor.rs +++ b/src/next/actor.rs @@ -1,10 +1,14 @@ use std::{ collections::{hash_map::Entry, HashMap}, - future::IntoFuture, + future::{pending, IntoFuture}, time::Duration, }; -use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt}; +use futures::{ + channel::mpsc, + future::{BoxFuture, FusedFuture}, + pin_mut, FutureExt, SinkExt, StreamExt, +}; use serde_json::{json, Value}; use crate::{ @@ -158,44 +162,32 @@ impl ConnectionActor { if let Some(client) = &mut self.client { let mut next_command = client.next().fuse(); let mut next_message = self.connection.receive().fuse(); - if let Some(keep_alive_duration) = self.keep_alive_duration { - let mut keep_alive = futures_timer::Delay::new(keep_alive_duration).fuse(); - futures::select! { - _ = keep_alive => { - warning!( - "No messages received within keep-alive ({:?}s) from server. Closing the connection", - keep_alive_duration); - return Some(Next::Command(ConnectionCommand::Close( - 4503, - "Service unavailable. keep-alive failure".to_string(), - ))); - }, - command = next_command => { - let Some(command) = command else { - self.client.take(); - continue; - }; - - return Some(Next::Command(command)); - }, - message = next_message => { - return Some(Next::Message(message?)); - }, - } - } else { - futures::select! { - command = next_command => { - let Some(command) = command else { - self.client.take(); - continue; - }; - - return Some(Next::Command(command)); - }, - message = next_message => { - return Some(Next::Message(message?)); - }, - } + + let keep_alive = delay_or_pending(self.keep_alive_duration); + pin_mut!(keep_alive); + + futures::select! { + _ = keep_alive => { + warning!( + "No messages received within keep-alive ({:?}s) from server. Closing the connection", + keep_alive_duration + ); + return Some(Next::Command(ConnectionCommand::Close( + 4503, + "Service unavailable. keep-alive failure".to_string(), + ))); + }, + command = next_command => { + let Some(command) = command else { + self.client.take(); + continue; + }; + + return Some(Next::Command(command)); + }, + message = next_message => { + return Some(Next::Message(message?)); + }, } } @@ -268,3 +260,13 @@ impl Event { } } } + +fn delay_or_pending(duration: Option) -> impl FusedFuture { + async move { + if let Some(duration) = duration { + futures_timer::Delay::new(duration).await; + } + pending::<()>().await; + } + .fuse() +}