From cfdd151e73ddc75cc20c3ca65dcc870c09a04c87 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 9 Oct 2023 15:11:53 +0600 Subject: [PATCH] Refactor client pool limits (#20) --- CHANGELOG.md | 4 ++- src/client/pool.rs | 71 +++++++++++++++++++++++++++++++-------------- tests/connection.rs | 31 ++++++++++++++------ 3 files changed, 75 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4aa15a6..b09bb81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,11 @@ # Changes -## [0.4.1] - 2023-10-06 +## [0.4.1] - 2023-10-09 * Refactor Message type, remove MessageKind::Empty +* Refactor client pool limits + ## [0.4.0] - 2023-10-03 * Refactor client api diff --git a/src/client/pool.rs b/src/client/pool.rs index adcd059..c6f9945 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -78,21 +78,35 @@ impl Client { idx += 1; } } - - // check existing connections - let available = connections.iter().filter(|item| item.is_ready()).count(); - let client = if available > 0 { - let idx = WyRand::new().generate_range(0_usize..available); - connections - .iter() - .filter(|item| item.is_ready()) - .nth(idx) - .cloned() + let num = connections.len(); + if self.inner.minconn > 0 && num < self.inner.minconn { + // create new connection + (None, num) } else { - None - }; - - (client, connections.len()) + // first search for connections with less than 50% capacity usage + let client = connections.iter().find(|item| { + let cap = item.max_streams().unwrap_or(self.inner.max_streams) >> 1; + item.active_streams() <= cap + }); + if let Some(client) = client { + (Some(client.clone()), num) + } else { + // check existing connections + let available = connections.iter().filter(|item| item.is_ready()).count(); + let client = if available > 0 { + let idx = WyRand::new().generate_range(0_usize..available); + connections + .iter() + .filter(|item| item.is_ready()) + .nth(idx) + .cloned() + } else { + None + }; + + (client, num) + } + } }; if let Some(client) = client { @@ -103,7 +117,10 @@ impl Client { } // can create new connection - if !self.inner.connecting.get() && num < self.inner.maxconn { + if !self.inner.connecting.get() + && (num < self.inner.maxconn + || (self.inner.minconn > 0 && num < self.inner.minconn)) + { // create new connection self.inner.connecting.set(true); let (tx, rx) = oneshot::channel(); @@ -202,11 +219,12 @@ impl Client { pub struct ClientBuilder(Inner); struct Inner { + minconn: usize, maxconn: usize, conn_timeout: Millis, conn_lifetime: Duration, disconnect_timeout: Millis, - limit: usize, + max_streams: u32, scheme: Scheme, config: crate::Config, authority: ByteString, @@ -243,7 +261,8 @@ impl ClientBuilder { conn_timeout: Millis(1_000), conn_lifetime: Duration::from_secs(0), disconnect_timeout: Millis(3_000), - limit: 100, + max_streams: 100, + minconn: 1, maxconn: 16, scheme: Scheme::HTTP, config: crate::Config::client(), @@ -293,8 +312,8 @@ impl ClientBuilder { /// If limit is 0, the connector uses "MAX_CONCURRENT_STREAMS" config from connection /// settings. /// The default limit size is 100. - pub fn limit(mut self, limit: usize) -> Self { - self.0.limit = limit; + pub fn max_streams(mut self, limit: u32) -> Self { + self.0.max_streams = limit; self } @@ -309,6 +328,14 @@ impl ClientBuilder { self } + /// Sets the minimum concurrent connections. + /// + /// By default min connections is set to a 1. + pub fn minconn(mut self, num: usize) -> Self { + self.0.minconn = num; + self + } + /// Sets the maximum concurrent connections. /// /// By default max connections is set to a 16. @@ -386,8 +413,9 @@ impl fmt::Debug for Client { .field("conn_timeout", &self.inner.conn_timeout) .field("conn_lifetime", &self.inner.conn_lifetime) .field("disconnect_timeout", &self.inner.disconnect_timeout) + .field("minconn", &self.inner.minconn) .field("maxconn", &self.inner.maxconn) - .field("limit", &self.inner.limit) + .field("max-streams", &self.inner.max_streams) .field("pool", &self.inner.pool) .field("config", &self.inner.config) .finish() @@ -402,8 +430,9 @@ impl fmt::Debug for ClientBuilder { .field("conn_timeout", &self.0.conn_timeout) .field("conn_lifetime", &self.0.conn_lifetime) .field("disconnect_timeout", &self.0.disconnect_timeout) + .field("minconn", &self.0.minconn) .field("maxconn", &self.0.maxconn) - .field("limit", &self.0.limit) + .field("max-streams", &self.0.max_streams) .field("pool", &self.0.pool) .field("config", &self.0.config) .finish() diff --git a/tests/connection.rs b/tests/connection.rs index 58f052c..01b34ad 100644 --- a/tests/connection.rs +++ b/tests/connection.rs @@ -7,7 +7,7 @@ use ntex::http::{ use ntex::service::{fn_service, ServiceFactory}; use ntex::time::{sleep, Millis}; use ntex::{channel::oneshot, connect::openssl, io::IoBoxed, util::Bytes}; -use ntex_h2::{client::Client, client::SimpleClient, frame::Reason}; +use ntex_h2::{client, client::Client, client::SimpleClient, frame::Reason}; fn ssl_acceptor() -> SslAcceptor { // load ssl keys @@ -71,20 +71,33 @@ async fn connect(addr: net::SocketAddr) -> IoBoxed { #[ntex::test] async fn test_max_concurrent_streams() { let srv = start_server(); - let io = connect(srv.addr()).await; - let client = SimpleClient::new( - io, - ntex_h2::Config::client(), - Scheme::HTTP, - "localhost".into(), - ); - sleep(Millis(50)).await; // we need to get settings frame from server + let addr = srv.addr(); + let client = client::Connector::new(fn_service(move |_| { + let addr = addr; + async move { Ok(connect(addr).await) } + })) + .scheme(Scheme::HTTP) + .connector(fn_service(move |_| { + let addr = addr; + async move { Ok(connect(addr).await) } + })) + .connect("localhost") + .await + .unwrap(); + + loop { + sleep(Millis(150)).await; // we need to get settings frame from server + if client.max_streams() == Some(1) { + break; + } + } let (stream, _recv_stream) = client .send(Method::GET, "/".into(), HeaderMap::default(), false) .await .unwrap(); assert!(!client.is_ready()); + assert!(client.active_streams() == 1); let client2 = client.clone(); let opened = Rc::new(Cell::new(false));