diff --git a/futures-util/src/stream/stream/all.rs b/futures-util/src/stream/stream/all.rs index b2aaa6b78..1435c798f 100644 --- a/futures-util/src/stream/stream/all.rs +++ b/futures-util/src/stream/stream/all.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("All") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(true), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,22 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() && ready!(fut.poll(cx)); + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); this.future.set(None); - if !acc { - this.accum.take().unwrap(); + if !res { + *this.done = true; break false; } // early exit - *this.accum = Some(acc); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break true; } } } else { diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs index f8b2a5829..cc3d695b9 100644 --- a/futures-util/src/stream/stream/any.rs +++ b/futures-util/src/stream/stream/any.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Any") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(false), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,22 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() || ready!(fut.poll(cx)); + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); this.future.set(None); - if acc { - this.accum.take().unwrap(); + if res { + *this.done = true; break true; } // early exit - *this.accum = Some(acc); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break false; } } } else {