From bd09502bd5c20732023a49ea3cf71731e4ae7ccf Mon Sep 17 00:00:00 2001 From: Collin Styles Date: Sat, 21 Oct 2023 14:10:31 -0700 Subject: [PATCH] Replace `All` and `Any`'s `accum` field with `done` It looks like `All` was originally implemented by copying from `TryFold` from which it inherited its `accum` field. However, `accum` can only ever be one of two values: `None` (if `All` has already completed) or `Some(true)` (if it's still processing values from the inner `Stream`). It doesn't need to keep track of an accumulator because the very fact that it hasn't short-circuited yet means that the accumulated value can't be `Some(false)`. Therefore, we only need two values here and we can represent them with a `bool` indicating whether or not `All` has already completed. The same principle applies for `Any` but substituting `Some(false)` for `Some(true)`. --- futures-util/src/stream/stream/all.rs | 22 +++++++++++----------- futures-util/src/stream/stream/any.rs | 22 +++++++++++----------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/futures-util/src/stream/stream/all.rs b/futures-util/src/stream/stream/all.rs index b2aaa6b78..1435c798f 100644 --- a/futures-util/src/stream/stream/all.rs +++ b/futures-util/src/stream/stream/all.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("All") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(true), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,22 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() && ready!(fut.poll(cx)); + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); this.future.set(None); - if !acc { - this.accum.take().unwrap(); + if !res { + *this.done = true; break false; } // early exit - *this.accum = Some(acc); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break true; } } } else { diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs index f8b2a5829..cc3d695b9 100644 --- a/futures-util/src/stream/stream/any.rs +++ b/futures-util/src/stream/stream/any.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Any") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(false), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,22 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() || ready!(fut.poll(cx)); + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); this.future.set(None); - if acc { - this.accum.take().unwrap(); + if res { + *this.done = true; break true; } // early exit - *this.accum = Some(acc); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break false; } } } else {