Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 2, 2023
1 parent da8675f commit c9a2bd0
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 11 deletions.
18 changes: 15 additions & 3 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::{client::Client, ClientError};
type Fut = BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>;
type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>>;

#[derive(Clone)]
/// Manages http client network connectivity.
pub struct Pool {
inner: Rc<Inner>,
Expand All @@ -23,6 +24,7 @@ pub struct Pool {

/// Notify one active waiter
fn notify(waiters: &mut VecDeque<oneshot::Sender<()>>) {
log::debug!("Notify waiter, total {:?}", waiters.len());
while let Some(waiter) = waiters.pop_front() {
if waiter.send(()).is_ok() {
break;
Expand Down Expand Up @@ -70,7 +72,7 @@ impl Pool {
// check existing connections
let available = connections.iter().filter(|item| item.is_ready()).count();
let client = if available > 0 {
let idx = WyRand::new().generate_range(0_usize..=available);
let idx = WyRand::new().generate_range(0_usize..available);
connections
.iter()
.filter(|item| item.is_ready())
Expand Down Expand Up @@ -121,6 +123,7 @@ impl Pool {
Ok(Err(err)) => Err(ClientError::from(err)),
Err(_) => Err(ClientError::HandshakeTimeout),
};
inner.connecting.set(false);
for waiter in waiters.borrow_mut().drain(..) {
let _ = waiter.send(());
}
Expand All @@ -132,6 +135,10 @@ impl Pool {
.await
.map_err(From::from);
} else {
log::debug!(
"New connection is being established {:?} or number of existing cons {} greater than allowed {}",
self.inner.connecting.get(), num, self.inner.maxconn);

// wait for available connection
let (tx, rx) = oneshot::channel();
self.waiters.borrow_mut().push_back(tx);
Expand All @@ -146,13 +153,13 @@ impl Pool {
/// Readiness depends on number of opened streams and max concurrency setting
pub fn is_ready(&self) -> bool {
let connections = self.inner.connections.borrow();

for client in &*connections {
if client.is_ready() {
return true;
}
}
false

!self.inner.connecting.get() && connections.len() < self.inner.maxconn
}

#[inline]
Expand All @@ -166,6 +173,11 @@ impl Pool {
let (tx, rx) = oneshot::channel();
self.waiters.borrow_mut().push_back(tx);
let _ = rx.await;
'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
if tx.send(()).is_ok() {
break 'inner;
}
}
} else {
break;
}
Expand Down
18 changes: 12 additions & 6 deletions src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,7 @@ impl RecvStream {
_ => false,
};
if to_remove {
let id = self.0.id();
self.1.notify(id);
inner.remove(&id);
log::debug!("Stream {:?} is closed, notify", id);
inner.remove(&self.0.id());
}
Poll::Ready(Some(msg))
} else {
Expand All @@ -154,7 +151,6 @@ impl Drop for RecvStream {
if !self.0.recv_state().is_closed() {
self.0.reset(Reason::CANCEL);
}
self.1.notify(self.0.id());
self.1 .0.inflight.borrow_mut().remove(&self.0.id());
}
}
Expand All @@ -180,9 +176,19 @@ impl Service<Message> for HandleService {
type Error = ();
type Future<'f> = Ready<(), ()>;

fn call<'a>(&'a self, msg: Message, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
fn call<'a>(&'a self, mut msg: Message, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
let id = msg.id();
if let Some(inflight) = self.0 .0.inflight.borrow_mut().get_mut(&id) {
let eof = match msg.kind() {
MessageKind::Headers { eof, .. } => *eof,
MessageKind::Eof(..) | MessageKind::Empty | MessageKind::Disconnect(..) => true,
_ => false,
};
if eof {
self.0.notify(id);
log::debug!("Stream {:?} is closed, notify", id);
}

inflight.push(msg);
}
Ready::Ok(())
Expand Down
89 changes: 87 additions & 2 deletions tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use ::openssl::ssl::{AlpnError, SslAcceptor, SslConnector, SslFiletype, SslMetho
use ntex::http::{
test::server as test_server, uri::Scheme, HeaderMap, HttpService, Method, Response,
};
use ntex::service::ServiceFactory;
use ntex::service::{fn_service, ServiceFactory};
use ntex::time::{sleep, Millis};
use ntex::{channel::oneshot, connect::openssl, io::IoBoxed, util::Bytes};
use ntex_h2::{client::Client, frame::Reason};
use ntex_h2::{client::Client, client::Pool, frame::Reason};

fn ssl_acceptor() -> SslAcceptor {
// load ssl keys
Expand Down Expand Up @@ -103,6 +103,91 @@ async fn test_max_concurrent_streams() {
assert!(opened.get());
}

#[ntex::test]
async fn test_max_concurrent_streams_pool() {
env_logger::init();
let srv = start_server();
let addr = srv.addr();
let client = Pool::build(
"localhost",
fn_service(move |_| {
let addr = addr;
async move { Ok(connect(addr).await) }
}),
)
.maxconn(1)
.finish();
assert!(client.is_ready());

let (stream, _recv_stream) = client
.send(Method::GET, "/".into(), HeaderMap::default(), false)
.await
.unwrap();
sleep(Millis(150)).await;
assert!(!client.is_ready());

let client2 = client.clone();
let opened = Rc::new(Cell::new(false));
let opened2 = opened.clone();
ntex::rt::spawn(async move {
let _stream = client2
.send(Method::GET, "/".into(), HeaderMap::default(), false)
.await
.unwrap();
opened2.set(true);
});

stream.send_payload(Bytes::new(), true).await.unwrap();
client.ready().await;
sleep(Millis(150)).await;
assert!(client.is_ready());
assert!(opened.get());
}

#[ntex::test]
async fn test_max_concurrent_streams_pool2() {
let srv = start_server();
let addr = srv.addr();

let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let client = Pool::build(
"localhost",
fn_service(move |_| {
let addr = addr;
cnt2.set(cnt2.get() + 1);
async move { Ok(connect(addr).await) }
}),
)
.maxconn(2)
.finish();
assert!(client.is_ready());

let (stream, _recv_stream) = client
.send(Method::GET, "/".into(), HeaderMap::default(), false)
.await
.unwrap();
sleep(Millis(150)).await;
assert!(client.is_ready());

let client2 = client.clone();
let opened = Rc::new(Cell::new(false));
let opened2 = opened.clone();
ntex::rt::spawn(async move {
let _stream = client2
.send(Method::GET, "/".into(), HeaderMap::default(), false)
.await
.unwrap();
opened2.set(true);
});

stream.send_payload(Bytes::new(), true).await.unwrap();
sleep(Millis(250)).await;
assert!(client.is_ready());
assert!(opened.get());
assert!(cnt.get() == 2);
}

#[ntex::test]
async fn test_max_concurrent_streams_reset() {
let srv = start_server();
Expand Down

0 comments on commit c9a2bd0

Please sign in to comment.