Skip to content

Commit

Permalink
Implement Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
0xIchigo committed Jan 8, 2025
1 parent 682f650 commit d972580
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 21 deletions.
20 changes: 8 additions & 12 deletions pubsub-client/src/nonblocking/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ use {
net::TcpStream,
sync::{mpsc, oneshot},
task::JoinHandle,
time::{interval, Interval, sleep, Duration},
time::{interval, Interval, Duration},
},
tokio_stream::wrappers::UnboundedReceiverStream,
tokio_tungstenite::{
Expand All @@ -210,14 +210,10 @@ use {
},
url::Url,
};
use crate::nonblocking::pubsub_client::{DEFAULT_PING_DURATION_SECONDS, DEFAULT_MAX_FAILED_PINGS};

pub type PubsubClientResult<T = ()> = Result<T, PubsubClientError>;

/// The interval between pings measured in seconds
pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10;
/// The maximum number of consecutive failed pings before considering the connection stale
pub const DEFAULT_MAX_FAILED_PINGS: usize = 3;

#[derive(Debug, Error)]
pub enum PubsubClientError {
#[error("url parse error")]
Expand Down Expand Up @@ -516,7 +512,7 @@ impl PubsubClient {
let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel();

let mut ping_interval: Interval = interval(Duration::from_secs(DEFAULT_PING_DURATION_SECONDS));
let mut unmatched_pings: usize = 0usize;
let mut elapsed_pings: usize = 0usize;

loop {
tokio::select! {
Expand All @@ -530,9 +526,9 @@ impl PubsubClient {
// Send `Message::Ping` each 10s if no any other communication
_ = ping_interval.tick() => {
ws.send(Message::Ping(Vec::new())).await?;
unmatched_pings += 1;
elapsed_pings += 1;

if unmatched_pings > DEFAULT_MAX_FAILED_PINGS {
if elapsed_pings > DEFAULT_MAX_FAILED_PINGS {
info!("No pong received after {} pings. Closing connection...", DEFAULT_MAX_FAILED_PINGS);
ws.close(Some(CloseFrame { code: CloseCode::Normal, reason: "No pong received".into() })).await?;
break;
Expand Down Expand Up @@ -573,17 +569,17 @@ impl PubsubClient {
// Get text from the message
let text = match msg {
Message::Text(text) => {
unmatched_pings = 0;
elapsed_pings = 0;
text
},
Message::Binary(_data) => continue, // Ignore
Message::Ping(data) => {
ws.send(Message::Pong(data)).await?;
unmatched_pings = 0;
elapsed_pings = 0;
continue
},
Message::Pong(_data) => {
unmatched_pings = 0;
elapsed_pings = 0;
continue
},
Message::Close(_frame) => break,
Expand Down
12 changes: 3 additions & 9 deletions pubsub-client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ use {
marker::PhantomData,
net::TcpStream,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
},
thread::{sleep, JoinHandle},
Expand Down Expand Up @@ -237,12 +237,6 @@ where
"msg={message_text}"
)))
}
Err(tungstenite::Error::Io(ref err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
// Read timed out
Err(PubsubClientError::WsError(tungstenite::Error::Io(
std::io::Error::from(std::io::ErrorKind::WouldBlock),
)))
}
Err(err) => Err(PubsubClientError::WsError(err)),
}
}
Expand Down Expand Up @@ -823,7 +817,7 @@ impl PubsubClient {

// Send ping if the interval has passed
if last_ping_time.elapsed() >= ping_interval {
if let Err(err) = socket.write().unwrap().write_message(Message::Ping(vec![])) {
if let Err(err) = socket.write().unwrap().send(Message::Ping(vec![])) {
info!("Error sending ping: {:?}", err);
break;
}
Expand All @@ -849,7 +843,7 @@ impl PubsubClient {
.unwrap()
.get_mut()
.get_mut()
.set_read_timeout(Some(Duration::from_secs(0.5)))
.set_read_timeout(Some(Duration::from_millis(500)))
.unwrap();

match PubsubClientSubscription::read_message(socket) {
Expand Down

0 comments on commit d972580

Please sign in to comment.