Skip to content

Commit

Permalink
fix: pass pool_timer to hyper_util to enable the idle cleanup task (#…
Browse files Browse the repository at this point in the history
…2434)

* fix: pass pool_timer to hyper_util to enable the idle cleanup task

* tests: integration test for pool idle timeout
  • Loading branch information
RobMor authored Sep 30, 2024
1 parent d85f44b commit baf9712
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,8 @@ impl ClientBuilder {
}
}

#[cfg(not(target_arch = "wasm32"))]
builder.timer(hyper_util::rt::TokioTimer::new());
builder.pool_timer(hyper_util::rt::TokioTimer::new());
builder.pool_idle_timeout(config.pool_idle_timeout);
builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
connector.set_keepalive(config.tcp_keepalive);
Expand Down
21 changes: 21 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,24 @@ async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent

server.shutdown().await;
}

#[tokio::test]
async fn close_connection_after_idle_timeout() {
let mut server = server::http(move |_| async move { http::Response::default() });

let client = reqwest::Client::builder()
.pool_idle_timeout(std::time::Duration::from_secs(1))
.build()
.unwrap();

let url = format!("http://{}", server.addr());

client.get(&url).send().await.unwrap();

tokio::time::sleep(std::time::Duration::from_secs(2)).await;

assert!(server
.events()
.iter()
.any(|e| matches!(e, server::Event::ConnectionClosed)));
}
23 changes: 23 additions & 0 deletions tests/support/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,27 @@ use tokio::sync::oneshot;
pub struct Server {
addr: net::SocketAddr,
panic_rx: std_mpsc::Receiver<()>,
events_rx: std_mpsc::Receiver<Event>,
shutdown_tx: Option<oneshot::Sender<()>>,
}

#[non_exhaustive]
pub enum Event {
ConnectionClosed,
}

impl Server {
pub fn addr(&self) -> net::SocketAddr {
self.addr
}

pub fn events(&mut self) -> Vec<Event> {
let mut events = Vec::new();
while let Ok(event) = self.events_rx.try_recv() {
events.push(event);
}
events
}
}

impl Drop for Server {
Expand Down Expand Up @@ -67,6 +81,7 @@ where

let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (panic_tx, panic_rx) = std_mpsc::channel();
let (events_tx, events_rx) = std_mpsc::channel();
let tname = format!(
"test({})-support-server",
test_name,
Expand All @@ -92,8 +107,10 @@ where
async move { Ok::<_, Infallible>(fut.await) }
});
let builder = builder.clone();
let events_tx = events_tx.clone();
tokio::spawn(async move {
let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io), svc).await;
let _ = events_tx.send(Event::ConnectionClosed);
});
}
}
Expand All @@ -105,6 +122,7 @@ where
Server {
addr,
panic_rx,
events_rx,
shutdown_tx: Some(shutdown_tx),
}
})
Expand Down Expand Up @@ -152,6 +170,7 @@ where

let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (panic_tx, panic_rx) = std_mpsc::channel();
let (events_tx, events_rx) = std_mpsc::channel();
let tname = format!(
"test({})-support-server",
test_name,
Expand All @@ -169,9 +188,11 @@ where
Some(accepted) = endpoint.accept() => {
let conn = accepted.await.expect("accepted");
let mut h3_conn = h3::server::Connection::new(h3_quinn::Connection::new(conn)).await.unwrap();
let events_tx = events_tx.clone();
let func = func.clone();
tokio::spawn(async move {
while let Ok(Some((req, stream))) = h3_conn.accept().await {
let events_tx = events_tx.clone();
let func = func.clone();
tokio::spawn(async move {
let (mut tx, rx) = stream.split();
Expand All @@ -198,6 +219,7 @@ where
}
}
tx.finish().await.unwrap();
events_tx.send(Event::ConnectionClosed).unwrap();
});
}
});
Expand All @@ -211,6 +233,7 @@ where
Server {
addr,
panic_rx,
events_rx,
shutdown_tx: Some(shutdown_tx),
}
})
Expand Down

0 comments on commit baf9712

Please sign in to comment.