Skip to content

Commit

Permalink
Refactor client pool limits (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Oct 9, 2023
1 parent 0feb305 commit cfdd151
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 31 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Changes

## [0.4.1] - 2023-10-06
## [0.4.1] - 2023-10-09

* Refactor Message type, remove MessageKind::Empty

* Refactor client pool limits

## [0.4.0] - 2023-10-03

* Refactor client api
Expand Down
71 changes: 50 additions & 21 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,35 @@ impl Client {
idx += 1;
}
}

// check existing connections
let available = connections.iter().filter(|item| item.is_ready()).count();
let client = if available > 0 {
let idx = WyRand::new().generate_range(0_usize..available);
connections
.iter()
.filter(|item| item.is_ready())
.nth(idx)
.cloned()
let num = connections.len();
if self.inner.minconn > 0 && num < self.inner.minconn {
// create new connection
(None, num)
} else {
None
};

(client, connections.len())
// first search for connections with less than 50% capacity usage
let client = connections.iter().find(|item| {
let cap = item.max_streams().unwrap_or(self.inner.max_streams) >> 1;
item.active_streams() <= cap
});
if let Some(client) = client {
(Some(client.clone()), num)
} else {
// check existing connections
let available = connections.iter().filter(|item| item.is_ready()).count();
let client = if available > 0 {
let idx = WyRand::new().generate_range(0_usize..available);
connections
.iter()
.filter(|item| item.is_ready())
.nth(idx)
.cloned()
} else {
None
};

(client, num)
}
}
};

if let Some(client) = client {
Expand All @@ -103,7 +117,10 @@ impl Client {
}

// can create new connection
if !self.inner.connecting.get() && num < self.inner.maxconn {
if !self.inner.connecting.get()
&& (num < self.inner.maxconn
|| (self.inner.minconn > 0 && num < self.inner.minconn))
{
// create new connection
self.inner.connecting.set(true);
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -202,11 +219,12 @@ impl Client {
pub struct ClientBuilder(Inner);

struct Inner {
minconn: usize,
maxconn: usize,
conn_timeout: Millis,
conn_lifetime: Duration,
disconnect_timeout: Millis,
limit: usize,
max_streams: u32,
scheme: Scheme,
config: crate::Config,
authority: ByteString,
Expand Down Expand Up @@ -243,7 +261,8 @@ impl ClientBuilder {
conn_timeout: Millis(1_000),
conn_lifetime: Duration::from_secs(0),
disconnect_timeout: Millis(3_000),
limit: 100,
max_streams: 100,
minconn: 1,
maxconn: 16,
scheme: Scheme::HTTP,
config: crate::Config::client(),
Expand Down Expand Up @@ -293,8 +312,8 @@ impl ClientBuilder {
/// If limit is 0, the connector uses "MAX_CONCURRENT_STREAMS" config from connection
/// settings.
/// The default limit size is 100.
pub fn limit(mut self, limit: usize) -> Self {
self.0.limit = limit;
pub fn max_streams(mut self, limit: u32) -> Self {
self.0.max_streams = limit;
self
}

Expand All @@ -309,6 +328,14 @@ impl ClientBuilder {
self
}

/// Sets the minimum concurrent connections.
///
/// By default min connections is set to a 1.
pub fn minconn(mut self, num: usize) -> Self {
self.0.minconn = num;
self
}

/// Sets the maximum concurrent connections.
///
/// By default max connections is set to a 16.
Expand Down Expand Up @@ -386,8 +413,9 @@ impl fmt::Debug for Client {
.field("conn_timeout", &self.inner.conn_timeout)
.field("conn_lifetime", &self.inner.conn_lifetime)
.field("disconnect_timeout", &self.inner.disconnect_timeout)
.field("minconn", &self.inner.minconn)
.field("maxconn", &self.inner.maxconn)
.field("limit", &self.inner.limit)
.field("max-streams", &self.inner.max_streams)
.field("pool", &self.inner.pool)
.field("config", &self.inner.config)
.finish()
Expand All @@ -402,8 +430,9 @@ impl fmt::Debug for ClientBuilder {
.field("conn_timeout", &self.0.conn_timeout)
.field("conn_lifetime", &self.0.conn_lifetime)
.field("disconnect_timeout", &self.0.disconnect_timeout)
.field("minconn", &self.0.minconn)
.field("maxconn", &self.0.maxconn)
.field("limit", &self.0.limit)
.field("max-streams", &self.0.max_streams)
.field("pool", &self.0.pool)
.field("config", &self.0.config)
.finish()
Expand Down
31 changes: 22 additions & 9 deletions tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ntex::http::{
use ntex::service::{fn_service, ServiceFactory};
use ntex::time::{sleep, Millis};
use ntex::{channel::oneshot, connect::openssl, io::IoBoxed, util::Bytes};
use ntex_h2::{client::Client, client::SimpleClient, frame::Reason};
use ntex_h2::{client, client::Client, client::SimpleClient, frame::Reason};

fn ssl_acceptor() -> SslAcceptor {
// load ssl keys
Expand Down Expand Up @@ -71,20 +71,33 @@ async fn connect(addr: net::SocketAddr) -> IoBoxed {
#[ntex::test]
async fn test_max_concurrent_streams() {
let srv = start_server();
let io = connect(srv.addr()).await;
let client = SimpleClient::new(
io,
ntex_h2::Config::client(),
Scheme::HTTP,
"localhost".into(),
);
sleep(Millis(50)).await; // we need to get settings frame from server
let addr = srv.addr();
let client = client::Connector::new(fn_service(move |_| {
let addr = addr;
async move { Ok(connect(addr).await) }
}))
.scheme(Scheme::HTTP)
.connector(fn_service(move |_| {
let addr = addr;
async move { Ok(connect(addr).await) }
}))
.connect("localhost")
.await
.unwrap();

loop {
sleep(Millis(150)).await; // we need to get settings frame from server
if client.max_streams() == Some(1) {
break;
}
}

let (stream, _recv_stream) = client
.send(Method::GET, "/".into(), HeaderMap::default(), false)
.await
.unwrap();
assert!(!client.is_ready());
assert!(client.active_streams() == 1);

let client2 = client.clone();
let opened = Rc::new(Cell::new(false));
Expand Down

0 comments on commit cfdd151

Please sign in to comment.