diff --git a/src/client/pool.rs b/src/client/pool.rs index adcd059..c6f9945 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -78,21 +78,35 @@ impl Client { idx += 1; } } - - // check existing connections - let available = connections.iter().filter(|item| item.is_ready()).count(); - let client = if available > 0 { - let idx = WyRand::new().generate_range(0_usize..available); - connections - .iter() - .filter(|item| item.is_ready()) - .nth(idx) - .cloned() + let num = connections.len(); + if self.inner.minconn > 0 && num < self.inner.minconn { + // create new connection + (None, num) } else { - None - }; - - (client, connections.len()) + // first search for connections with less than 50% capacity usage + let client = connections.iter().find(|item| { + let cap = item.max_streams().unwrap_or(self.inner.max_streams) >> 1; + item.active_streams() <= cap + }); + if let Some(client) = client { + (Some(client.clone()), num) + } else { + // check existing connections + let available = connections.iter().filter(|item| item.is_ready()).count(); + let client = if available > 0 { + let idx = WyRand::new().generate_range(0_usize..available); + connections + .iter() + .filter(|item| item.is_ready()) + .nth(idx) + .cloned() + } else { + None + }; + + (client, num) + } + } }; if let Some(client) = client { @@ -103,7 +117,10 @@ impl Client { } // can create new connection - if !self.inner.connecting.get() && num < self.inner.maxconn { + if !self.inner.connecting.get() + && (num < self.inner.maxconn + || (self.inner.minconn > 0 && num < self.inner.minconn)) + { // create new connection self.inner.connecting.set(true); let (tx, rx) = oneshot::channel(); @@ -202,11 +219,12 @@ impl Client { pub struct ClientBuilder(Inner); struct Inner { + minconn: usize, maxconn: usize, conn_timeout: Millis, conn_lifetime: Duration, disconnect_timeout: Millis, - limit: usize, + max_streams: u32, scheme: Scheme, config: crate::Config, authority: ByteString, @@ -243,7 +261,8 @@ impl ClientBuilder { conn_timeout: Millis(1_000), conn_lifetime: Duration::from_secs(0), disconnect_timeout: Millis(3_000), - limit: 100, + max_streams: 100, + minconn: 1, maxconn: 16, scheme: Scheme::HTTP, config: crate::Config::client(), @@ -293,8 +312,8 @@ impl ClientBuilder { /// If limit is 0, the connector uses "MAX_CONCURRENT_STREAMS" config from connection /// settings. /// The default limit size is 100. - pub fn limit(mut self, limit: usize) -> Self { - self.0.limit = limit; + pub fn max_streams(mut self, limit: u32) -> Self { + self.0.max_streams = limit; self } @@ -309,6 +328,14 @@ impl ClientBuilder { self } + /// Sets the minimum concurrent connections. + /// + /// By default min connections is set to a 1. + pub fn minconn(mut self, num: usize) -> Self { + self.0.minconn = num; + self + } + /// Sets the maximum concurrent connections. /// /// By default max connections is set to a 16. @@ -386,8 +413,9 @@ impl fmt::Debug for Client { .field("conn_timeout", &self.inner.conn_timeout) .field("conn_lifetime", &self.inner.conn_lifetime) .field("disconnect_timeout", &self.inner.disconnect_timeout) + .field("minconn", &self.inner.minconn) .field("maxconn", &self.inner.maxconn) - .field("limit", &self.inner.limit) + .field("max-streams", &self.inner.max_streams) .field("pool", &self.inner.pool) .field("config", &self.inner.config) .finish() @@ -402,8 +430,9 @@ impl fmt::Debug for ClientBuilder { .field("conn_timeout", &self.0.conn_timeout) .field("conn_lifetime", &self.0.conn_lifetime) .field("disconnect_timeout", &self.0.disconnect_timeout) + .field("minconn", &self.0.minconn) .field("maxconn", &self.0.maxconn) - .field("limit", &self.0.limit) + .field("max-streams", &self.0.max_streams) .field("pool", &self.0.pool) .field("config", &self.0.config) .finish()