Skip to content

Commit

Permalink
Keep track of absolute IDLE time for timeout
Browse files Browse the repository at this point in the history
The code used to apply the set timeout value to the TcpStream before
entering the IDLE loop. This effectively resets the timeout after
receiving and handling incoming messages, which nullifies the purpose of
the timeout when messages are received.

This change remembers when the IDLE command is sent initially and uses
that value to set the remaining time for the TcpStream timeout. This
allows the IDLE loop to reconnect the IDLE connection at the appropriate
time.

Fixes jonhoo#300.
  • Loading branch information
z33ky committed Feb 16, 2025
1 parent 6fe22ed commit 3df945a
Showing 1 changed file with 75 additions and 63 deletions.
138 changes: 75 additions & 63 deletions src/extensions/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rustls_connector::TlsStream as RustlsStream;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::ops::DerefMut;
use std::time::Duration;
use std::time::{Duration, Instant};

/// `Handle` allows a client to block waiting for changes to the remote mailbox.
///
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct Handle<'a, T: Read + Write> {
session: &'a mut Session<T>,
timeout: Duration,
keepalive: bool,
done: bool,
last_idle: Option<Instant>,
}

/// The result of a wait on a [`Handle`]
Expand Down Expand Up @@ -91,11 +91,14 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
session,
timeout: Duration::from_secs(29 * 60),
keepalive: true,
done: false,
last_idle: None,
}
}

fn init(&mut self) -> Result<()> {
let last_idle = Instant::now();
self.last_idle = Some(last_idle);

// https://tools.ietf.org/html/rfc2177
//
// The IDLE command takes no arguments.
Expand All @@ -108,39 +111,97 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
let mut v = Vec::new();
self.session.readline(&mut v)?;
if v.starts_with(b"+") {
self.done = false;
return Ok(());
}

self.last_idle = None;
self.session.read_response_onto(&mut v)?;
// We should *only* get a continuation on an error (i.e., it gives BAD or NO).
unreachable!();
}

fn terminate(&mut self) -> Result<()> {
if !self.done {
self.done = true;
if let Some(_) = self.last_idle.take() {
self.session.write_line(b"DONE")?;
self.session.read_response().map(|_| ())
} else {
Ok(())
}
}
}

/// Internal helper that doesn't consume self.
impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
/// Set the timeout duration on the connection. This will also set the frequency
/// at which the connection is refreshed.
///
/// This is necessary so that we can keep using the inner `Session` in `wait_while`.
fn wait_inner<F>(&mut self, reconnect: bool, mut callback: F) -> Result<WaitOutcome>
/// The interval defaults to 29 minutes as given in RFC 2177.
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
self.timeout = interval;
self
}

/// Do not continuously refresh the IDLE connection in the background.
///
/// By default, connections will periodically be refreshed in the background using the
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
/// this function and the connection will simply IDLE until `wait_while` returns or
/// the timeout expires.
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
self.keepalive = keepalive;
self
}

/// Block until the given callback returns `false`, or until a response
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
pub fn wait_while<F>(&mut self, mut callback: F) -> Result<WaitOutcome>
where
F: FnMut(UnsolicitedResponse) -> bool,
{
let mut v = Vec::new();
let result = loop {
match self.session.readline(&mut v) {
match {
// The server MAY consider a client inactive if it has an IDLE command
// running, and if such a server has an inactivity timeout it MAY log
// the client off implicitly at the end of its timeout period. Because
// of that, clients using IDLE are advised to terminate the IDLE and
// re-issue it at least every 29 minutes to avoid being logged off.
// This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals.
self.last_idle
// Check if the time since last_idle has exceeded the timeout.
.map(|last_idle| {
let time_since_idle = last_idle.elapsed();
if time_since_idle >= self.timeout {
Err(Error::Io(io::ErrorKind::TimedOut.into()))
} else {
Ok(time_since_idle)
}
})
// If there's no self.last_idle, initialize the connection (and return a 0 time since idle).
.unwrap_or_else(|| self.init().map(|()| Duration::ZERO))
// Finally, if no error occurred, read from the stream.
.map(|time_since_idle| {
self.session
.stream
.get_mut()
.set_read_timeout(Some(self.timeout - time_since_idle))
.expect("cannot be Some(0) since time is monotonically increasing");
self.session.readline(&mut v)
})
} {
Err(Error::Io(ref e))
if e.kind() == io::ErrorKind::TimedOut
|| e.kind() == io::ErrorKind::WouldBlock =>
{
if self.keepalive {
match self.terminate() {
Ok(()) => {
// The connection gets initialized again on the next iteration.
continue,
}
Err(e) => break Err(e),
}
}
break Ok(WaitOutcome::TimedOut);
}
Ok(_len) => {
Expand Down Expand Up @@ -183,60 +244,11 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
};
};

// Reconnect on timeout if needed
match (reconnect, result) {
(true, Ok(WaitOutcome::TimedOut)) => {
self.terminate()?;
self.init()?;
self.wait_inner(reconnect, callback)
}
(_, result) => result,
}
}
}

impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
/// Set the timeout duration on the connection. This will also set the frequency
/// at which the connection is refreshed.
///
/// The interval defaults to 29 minutes as given in RFC 2177.
pub fn timeout(&mut self, interval: Duration) -> &mut Self {
self.timeout = interval;
self
}

/// Do not continuously refresh the IDLE connection in the background.
///
/// By default, connections will periodically be refreshed in the background using the
/// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call
/// this function and the connection will simply IDLE until `wait_while` returns or
/// the timeout expires.
pub fn keepalive(&mut self, keepalive: bool) -> &mut Self {
self.keepalive = keepalive;
self
}
// set_read_timeout() can fail if the argument is Some(0), which can never be the
// case here.
self.session.stream.get_mut().set_read_timeout(None).unwrap();

/// Block until the given callback returns `false`, or until a response
/// arrives that is not explicitly handled by [`UnsolicitedResponse`].
pub fn wait_while<F>(&mut self, callback: F) -> Result<WaitOutcome>
where
F: FnMut(UnsolicitedResponse) -> bool,
{
self.init()?;
// The server MAY consider a client inactive if it has an IDLE command
// running, and if such a server has an inactivity timeout it MAY log
// the client off implicitly at the end of its timeout period. Because
// of that, clients using IDLE are advised to terminate the IDLE and
// re-issue it at least every 29 minutes to avoid being logged off.
// This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals.
self.session
.stream
.get_mut()
.set_read_timeout(Some(self.timeout))?;
let res = self.wait_inner(self.keepalive, callback);
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
res
result
}
}

Expand Down

0 comments on commit 3df945a

Please sign in to comment.