Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 16, 2023
1 parent 81b5b6c commit fa7c944
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 34 deletions.
62 changes: 31 additions & 31 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ impl StreamState {
self.review_state();
}

fn check_error(&self) -> Result<(), OperationError> {
if let Some(err) = self.error.take() {
self.error.set(Some(err.clone()));
Err(err)
} else {
Ok(())
}
}

fn review_state(&self) {
if self.recv.get().is_closed() {
self.send_reset.wake();
Expand Down Expand Up @@ -558,11 +567,8 @@ impl StreamRef {
match self.0.send.get() {
HalfState::Payload => {
// check if stream is disconnected
if let Some(e) = self.0.error.take() {
let res = e.clone();
self.0.error.set(Some(e));
return Err(res);
}
self.0.check_error()?;

log::debug!(
"{:?} sending {} bytes, eof: {}, send: {:?}",
self.0.id,
Expand Down Expand Up @@ -650,30 +656,24 @@ impl StreamRef {

/// Check for available send capacity
pub fn poll_send_capacity(&self, cx: &Context<'_>) -> Poll<Result<WindowSize, OperationError>> {
if let Some(err) = self.0.error.take() {
self.0.error.set(Some(err.clone()));
Poll::Ready(Err(err))
} else {
self.0.con.check_error()?;
self.0.check_error()?;
self.0.con.check_error()?;

let win = self.0.send_window.get().window_size();
if win > 0 {
Poll::Ready(Ok(win))
} else {
self.0.send_cap.register(cx.waker());
Poll::Pending
}
let win = self.0.send_window.get().window_size();
if win > 0 {
Poll::Ready(Ok(win))
} else {
self.0.send_cap.register(cx.waker());
Poll::Pending
}
}

/// Check if send part of stream get reset
pub fn poll_send_reset(&self, cx: &Context<'_>) -> Poll<Result<(), OperationError>> {
if self.0.send.get().is_closed() {
Poll::Ready(Ok(()))
} else if let Some(err) = self.0.error.take() {
self.0.error.set(Some(err.clone()));
Poll::Ready(Err(err))
} else {
self.0.check_error()?;
self.0.con.check_error()?;
self.0.send_reset.register(cx.waker());
Poll::Pending
Expand Down Expand Up @@ -723,26 +723,26 @@ impl fmt::Debug for StreamState {
.field("recv_size", &self.recv_size.get())
.field("send", &self.send.get())
.field("send_window", &self.send_window.get())
.field("flags", &self.flags.get())
.finish()
}
}

pub fn parse_u64(src: &[u8]) -> Option<u64> {
if src.len() > 19 {
// At danger for overflow...
return None;
}

let mut ret = 0;
None
} else {
let mut ret = 0;
for &d in src {
if !d.is_ascii_digit() {
return None;
}

for &d in src {
if !d.is_ascii_digit() {
return None;
ret *= 10;
ret += (d - b'0') as u64;
}

ret *= 10;
ret += (d - b'0') as u64;
Some(ret)
}

Some(ret)
}
19 changes: 16 additions & 3 deletions tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ async fn test_max_concurrent_streams() {
assert!(client.active_streams() == 1);
assert_eq!(stream.id(), recv_stream.id());
assert_eq!(stream.stream(), recv_stream.stream());
assert!(format!("{:?}", stream).contains("SendStream"));
assert!(format!("{:?}", recv_stream).contains("RecvStream"));

let client2 = client.clone();
let opened = Rc::new(Cell::new(false));
Expand Down Expand Up @@ -130,9 +132,20 @@ async fn test_max_concurrent_streams_pool() {
let addr = addr;
async move { Ok(connect(addr).await) }
}),
)
.maxconn(1)
.finish();
);
assert!(format!("{:?}", client).contains("ClientBuilder"));
let client = client
.maxconn(1)
.scheme(Scheme::HTTPS)
.connector(
"localhost",
fn_service(move |_| {
let addr = addr;
async move { Ok(connect(addr).await) }
}),
)
.finish();
assert!(format!("{:?}", client).contains("Client"));
assert!(client.is_ready());

let (stream, _recv_stream) = client
Expand Down

0 comments on commit fa7c944

Please sign in to comment.