Skip to content

Commit

Permalink
Fix inflight message tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 3, 2023
1 parent 302a145 commit 5e96648
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl ClientBuilder {
let connector = Pipeline::new(connector.into_service());

let connector = Box::new(move || {
log::trace!("Opening http/2 connection to {}", connect.host());
let connect = connect.clone();
let svc = connector.clone();
let f: Fut = Box::pin(async move { svc.call(connect).await.map(IoBoxed::from) });
Expand Down
17 changes: 10 additions & 7 deletions src/client/stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::task::{Context, Poll};
use std::{cell::RefCell, fmt, pin::Pin, rc::Rc};
use std::{cell::RefCell, collections::VecDeque, fmt, pin::Pin, rc::Rc};

use ntex_bytes::Bytes;
use ntex_http::HeaderMap;
Expand All @@ -24,7 +24,7 @@ struct InflightStorageInner {
#[derive(Debug)]
pub(super) struct Inflight {
_stream: Stream,
response: Option<Either<Message, Vec<Message>>>,
response: Option<Either<Message, VecDeque<Message>>>,
waker: LocalWaker,
}

Expand All @@ -34,8 +34,7 @@ impl Inflight {
None => None,
Some(Either::Left(msg)) => Some(msg),
Some(Either::Right(mut msgs)) => {
let msg = msgs.pop();

let msg = msgs.pop_front();
if !msgs.is_empty() {
self.response = Some(Either::Right(msgs));
}
Expand All @@ -46,9 +45,14 @@ impl Inflight {

fn push(&mut self, item: Message) {
match self.response.take() {
Some(Either::Left(msg)) => self.response = Some(Either::Right(vec![msg, item])),
Some(Either::Left(msg)) => {
let mut msgs = VecDeque::new();
msgs.push_back(msg);
msgs.push_back(item);
self.response = Some(Either::Right(msgs));
}
Some(Either::Right(mut messages)) => {
messages.push(item);
messages.push_back(item);
self.response = Some(Either::Right(messages));
}
None => self.response = Some(Either::Left(item)),
Expand Down Expand Up @@ -188,7 +192,6 @@ impl Service<Message> for HandleService {
self.0.notify(id);
log::debug!("Stream {:?} is closed, notify", id);
}

inflight.push(msg);
}
Ready::Ok(())
Expand Down

0 comments on commit 5e96648

Please sign in to comment.