From ce5eaa3a9498043246b4ad2173587011ea1cd3ae Mon Sep 17 00:00:00 2001 From: Jacob Kiesel Date: Sun, 14 May 2023 12:01:04 -0600 Subject: [PATCH 1/6] Add fairness to futures_util::{select, try_select, select_ok}, fixes #2135 --- futures-util/Cargo.toml | 4 ++- futures-util/src/future/select.rs | 25 ++++++++++--- futures-util/src/future/select_ok.rs | 31 +++++++++++----- futures-util/src/future/try_select.rs | 36 +++++++++++++------ futures-util/src/lib.rs | 17 +++++++++ futures/tests/future_select.rs | 16 +++++++++ futures/tests/future_select_ok.rs | 51 +++++++++++++++++++++++---- futures/tests/future_try_select.rs | 14 ++++++++ 8 files changed, 163 insertions(+), 31 deletions(-) create mode 100644 futures/tests/future_select.rs create mode 100644 futures/tests/future_try_select.rs diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 68dbd86b7d..ca9db2d286 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -12,7 +12,8 @@ Common utilities and extension traits for the futures-rs library. [features] default = ["std", "async-await", "async-await-macro"] -std = ["alloc", "futures-core/std", "futures-task/std", "slab"] +std = ["alloc", "futures-core/std", "futures-task/std", "rand/std", "rand/std_rng", "slab"] +getrandom = ["rand/getrandom"] alloc = ["futures-core/alloc", "futures-task/alloc"] async-await = [] async-await-macro = ["async-await", "futures-macro"] @@ -43,6 +44,7 @@ futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" pin-project-lite = "0.2.6" +rand = { version = "0.8.5", default-features = false, features = ["small_rng"] } [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index 7e33d195f7..6eeb4fd811 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -3,12 +3,15 @@ use crate::future::{Either, FutureExt}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; +use rand::rngs::SmallRng; +use rand::Rng; /// Future for the [`select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Select { inner: Option<(A, B)>, + rng: SmallRng, } impl Unpin for Select {} @@ -22,6 +25,9 @@ impl Unpin for Select {} /// Note that this function consumes the receiving futures and returns a /// wrapped version of them. /// +/// If both futures are ready when this is polled, the winner will be pseudo-randomly +/// selected. +/// /// Also note that if both this and the second future have the same /// output type you can use the `Either::factor_first` method to /// conveniently extract out the value at the end. @@ -88,6 +94,7 @@ where { assert_future::, _>(Select { inner: Some((future1, future2)), + rng: crate::gen_rng(), }) } @@ -109,16 +116,24 @@ where } } + let a_polls_first = self.rng.gen::(); let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); - if let Poll::Ready(val) = a.poll_unpin(cx) { - return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1))); + macro_rules! poll_wrap { + ($to_poll:expr, $unpolled:expr, $wrap:expr) => { + if let Poll::Ready(val) = $to_poll.poll_unpin(cx) { + return Poll::Ready($wrap((val, $unpolled))); + } + }; } - if let Poll::Ready(val) = b.poll_unpin(cx) { - return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0))); + if a_polls_first { + poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); + poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); + } else { + poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); + poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); } - Poll::Pending } } diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 5d5579930b..3043f91e14 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -6,12 +6,15 @@ use core::mem; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; +use rand::prelude::SliceRandom; +use rand::rngs::SmallRng; /// Future for the [`select_ok`] function. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SelectOk { inner: Vec, + rng: SmallRng, } impl Unpin for SelectOk {} @@ -26,6 +29,17 @@ impl Unpin for SelectOk {} /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. /// +/// # Note for users migrating from 0.3 to 0.4 +/// +/// This function used to be biased in favor of futures that appeared earlier in the +/// iterator. This is no longer the case, the futures are now shuffled prior to being polled. +/// This prevents starvation problems. It also has the side effect that the returned `Vec` +/// of remaining futures may be longer than it was in version 0.3, because of this shuffling. +/// Some futures that would have been polled and had errors get dropped, may now instead +/// remain in the collection without being polled. +/// +/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro. +/// /// # Panics /// /// This function will panic if the iterator specified contains no items. @@ -34,7 +48,7 @@ where I: IntoIterator, I::Item: TryFuture + Unpin, { - let ret = SelectOk { inner: iter.into_iter().collect() }; + let ret = SelectOk { inner: iter.into_iter().collect(), rng: crate::gen_rng() }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); assert_future::< Result<(::Ok, Vec), ::Error>, @@ -46,24 +60,25 @@ impl Future for SelectOk { type Output = Result<(Fut::Ok, Vec), Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { inner, rng } = &mut *self; + inner.shuffle(rng); // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { - let item = - self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { - Poll::Pending => None, - Poll::Ready(e) => Some((i, e)), - }); + let item = inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), + }); match item { Some((idx, res)) => { // always remove Ok or Err, if it's not the last Err continue looping - drop(self.inner.remove(idx)); + drop(inner.remove(idx)); match res { Ok(e) => { let rest = mem::take(&mut self.inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { - if self.inner.is_empty() { + if inner.is_empty() { return Poll::Ready(Err(e)); } } diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index bc282f7db1..cbef1290d6 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -2,12 +2,15 @@ use crate::future::{Either, TryFutureExt}; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; +use rand::rngs::SmallRng; +use rand::Rng; /// Future for the [`try_select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct TrySelect { inner: Option<(A, B)>, + rng: SmallRng, } impl Unpin for TrySelect {} @@ -24,6 +27,9 @@ type EitherErr = Either<(::Error, B), (::E /// Note that this function consumes the receiving futures and returns a /// wrapped version of them. /// +/// If both futures are ready when this is polled, the winner will be pseudo-randomly +/// selected. +/// /// Also note that if both this and the second future have the same /// success/error type you can use the `Either::factor_first` method to /// conveniently extract out the value at the end. @@ -57,6 +63,7 @@ where { super::assert_future::, EitherErr>, _>(TrySelect { inner: Some((future1, future2)), + rng: crate::gen_rng(), }) } @@ -69,17 +76,26 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.try_poll_unpin(cx) { - Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))), - Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))), - Poll::Pending => match b.try_poll_unpin(cx) { - Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))), - Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending + macro_rules! poll_wrap { + ($poll_first:expr, $poll_second:expr, $wrap_first:expr, $wrap_second:expr) => { + match $poll_first.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_first((x, $poll_second)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_first((x, $poll_second)))), + Poll::Pending => match $poll_second.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_second((x, $poll_first)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_second((x, $poll_first)))), + Poll::Pending => { + self.inner = Some((a, b)); + Poll::Pending + } + }, } - }, + }; + } + if self.rng.gen::() { + poll_wrap!(a, b, Either::Left, Either::Right) + } else { + poll_wrap!(b, a, Either::Right, Either::Left) } } } diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index abe35e68f7..cdc8ae2ab6 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -26,6 +26,8 @@ extern crate std; // Macro re-exports pub use futures_core::ready; pub use pin_utils::pin_mut; +use rand::rngs::SmallRng; +use rand::SeedableRng; #[cfg(feature = "async-await")] #[macro_use] @@ -331,3 +333,18 @@ mod abortable; mod fns; mod unfold_state; + +fn gen_rng() -> SmallRng { + #[cfg(feature = "std")] + { + SmallRng::from_rng(rand::thread_rng()).expect("generating SmallRng via thread_rng failed") + } + #[cfg(all(feature = "getrandom", not(feature = "std")))] + { + SmallRng::from_entropy() + } + #[cfg(not(any(feature = "getrandom", feature = "std")))] + { + SmallRng::seed_from_u64(0) + } +} diff --git a/futures/tests/future_select.rs b/futures/tests/future_select.rs new file mode 100644 index 0000000000..da30a49271 --- /dev/null +++ b/futures/tests/future_select.rs @@ -0,0 +1,16 @@ +use std::future::ready; + +use futures::future::select; +use futures_executor::block_on; + +#[test] +fn is_fair() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = block_on(select(ready(0), ready(1))).factor_first(); + results.push(i); + } + const THRESHOLD: usize = 30; + assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); + assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) +} diff --git a/futures/tests/future_select_ok.rs b/futures/tests/future_select_ok.rs index 8aec00362d..a031f05fc5 100644 --- a/futures/tests/future_select_ok.rs +++ b/futures/tests/future_select_ok.rs @@ -1,30 +1,67 @@ +use std::fmt::Debug; +use std::time::Duration; + use futures::executor::block_on; -use futures::future::{err, ok, select_ok}; +use futures::future::{err, ok, select_ok, Future}; +use futures_channel::oneshot; +use std::thread; #[test] fn ignore_err() { let v = vec![err(1), err(2), ok(3), ok(4)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); - assert_eq!(i, 3); + assert!(i == 3 || i == 4); - assert_eq!(v.len(), 1); + assert!(v.len() < 4); - let (i, v) = block_on(select_ok(v)).ok().unwrap(); - assert_eq!(i, 4); + let (j, v) = block_on(select_ok(v)).ok().unwrap(); + assert!(j == 3 || j == 4); + assert_ne!(j, i); - assert!(v.is_empty()); + assert!(v.len() < 3); } #[test] fn last_err() { - let v = vec![ok(1), err(2), err(3)]; + let (ok_sender, ok_receiver) = oneshot::channel(); + let (first_err_sender, first_err_receiver) = oneshot::channel(); + let (second_err_sender, second_err_receiver) = oneshot::channel(); + async fn await_unwrap(o: impl Future>) -> T { + o.await.unwrap() + } + let v = vec![ + Box::pin(await_unwrap(ok_receiver)), + Box::pin(await_unwrap(first_err_receiver)), + Box::pin(await_unwrap(second_err_receiver)), + ]; + ok_sender.send(Ok(1)).unwrap(); let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 1); assert_eq!(v.len(), 2); + first_err_sender.send(Err(2)).unwrap(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + second_err_sender.send(Err(3)).unwrap(); + }); let i = block_on(select_ok(v)).err().unwrap(); assert_eq!(i, 3); } + +#[test] +fn is_fair() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let v = vec![err(1), err(2), ok(3), ok(4)]; + + let (i, _v) = block_on(select_ok(v)).ok().unwrap(); + results.push(i); + } + const THRESHOLD: usize = 30; + assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD); + assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD); +} diff --git a/futures/tests/future_try_select.rs b/futures/tests/future_try_select.rs new file mode 100644 index 0000000000..382466fc7c --- /dev/null +++ b/futures/tests/future_try_select.rs @@ -0,0 +1,14 @@ +use futures::future::{ok, try_select}; +use futures_executor::block_on; + +#[test] +fn is_fair() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = block_on(try_select(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first(); + results.push(i); + } + const THRESHOLD: usize = 30; + assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); + assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) +} From 328b4dc16aa35f05e16ac12ad2c65c0b67f3c92b Mon Sep 17 00:00:00 2001 From: Jacob Kiesel Date: Tue, 13 Sep 2022 14:29:14 -0600 Subject: [PATCH 2/6] remove rand dependency, use internal prng --- futures-util/Cargo.toml | 4 +--- futures-util/src/async_await/mod.rs | 2 -- futures-util/src/async_await/random.rs | 2 +- futures-util/src/future/select.rs | 14 ++++++++------ futures-util/src/future/select_ok.rs | 12 +++++------- futures-util/src/future/try_select.rs | 13 ++++++++----- futures-util/src/lib.rs | 17 ----------------- 7 files changed, 23 insertions(+), 41 deletions(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index ca9db2d286..68dbd86b7d 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -12,8 +12,7 @@ Common utilities and extension traits for the futures-rs library. [features] default = ["std", "async-await", "async-await-macro"] -std = ["alloc", "futures-core/std", "futures-task/std", "rand/std", "rand/std_rng", "slab"] -getrandom = ["rand/getrandom"] +std = ["alloc", "futures-core/std", "futures-task/std", "slab"] alloc = ["futures-core/alloc", "futures-task/alloc"] async-await = [] async-await-macro = ["async-await", "futures-macro"] @@ -44,7 +43,6 @@ futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" pin-project-lite = "0.2.6" -rand = { version = "0.8.5", default-features = false, features = ["small_rng"] } [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/async_await/mod.rs b/futures-util/src/async_await/mod.rs index 09152f94c5..f2b5eed31a 100644 --- a/futures-util/src/async_await/mod.rs +++ b/futures-util/src/async_await/mod.rs @@ -35,10 +35,8 @@ mod stream_select_mod; pub use self::stream_select_mod::*; #[cfg(feature = "std")] -#[cfg(feature = "async-await-macro")] mod random; #[cfg(feature = "std")] -#[cfg(feature = "async-await-macro")] pub use self::random::*; #[doc(hidden)] diff --git a/futures-util/src/async_await/random.rs b/futures-util/src/async_await/random.rs index 2ac2f78a87..fcb168f4c6 100644 --- a/futures-util/src/async_await/random.rs +++ b/futures-util/src/async_await/random.rs @@ -17,7 +17,7 @@ pub fn shuffle(slice: &mut [T]) { } /// Return a value from `0..n`. -fn gen_index(n: usize) -> usize { +pub(crate) fn gen_index(n: usize) -> usize { (random() % n as u64) as usize } diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index 6eeb4fd811..6bb9b80fc1 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -3,15 +3,12 @@ use crate::future::{Either, FutureExt}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use rand::rngs::SmallRng; -use rand::Rng; /// Future for the [`select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Select { inner: Option<(A, B)>, - rng: SmallRng, } impl Unpin for Select {} @@ -94,7 +91,6 @@ where { assert_future::, _>(Select { inner: Some((future1, future2)), - rng: crate::gen_rng(), }) } @@ -116,7 +112,6 @@ where } } - let a_polls_first = self.rng.gen::(); let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); macro_rules! poll_wrap { @@ -127,13 +122,20 @@ where }; } - if a_polls_first { + #[cfg(feature = "std")] + if crate::gen_index(2) == 0 { poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); } else { poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); } + + #[cfg(not(feature = "std"))] + { + poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); + poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); + } Poll::Pending } } diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 3043f91e14..00349614f2 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -6,15 +6,12 @@ use core::mem; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; -use rand::prelude::SliceRandom; -use rand::rngs::SmallRng; /// Future for the [`select_ok`] function. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SelectOk { inner: Vec, - rng: SmallRng, } impl Unpin for SelectOk {} @@ -48,7 +45,7 @@ where I: IntoIterator, I::Item: TryFuture + Unpin, { - let ret = SelectOk { inner: iter.into_iter().collect(), rng: crate::gen_rng() }; + let ret = SelectOk { inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); assert_future::< Result<(::Ok, Vec), ::Error>, @@ -60,8 +57,9 @@ impl Future for SelectOk { type Output = Result<(Fut::Ok, Vec), Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { inner, rng } = &mut *self; - inner.shuffle(rng); + let Self { inner } = &mut *self; + #[cfg(feature = "std")] + crate::shuffle(inner); // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { let item = inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { @@ -74,7 +72,7 @@ impl Future for SelectOk { drop(inner.remove(idx)); match res { Ok(e) => { - let rest = mem::take(&mut self.inner); + let rest = mem::take(inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index cbef1290d6..f2811bf364 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -2,15 +2,12 @@ use crate::future::{Either, TryFutureExt}; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; -use rand::rngs::SmallRng; -use rand::Rng; /// Future for the [`try_select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct TrySelect { inner: Option<(A, B)>, - rng: SmallRng, } impl Unpin for TrySelect {} @@ -63,7 +60,6 @@ where { super::assert_future::, EitherErr>, _>(TrySelect { inner: Some((future1, future2)), - rng: crate::gen_rng(), }) } @@ -92,10 +88,17 @@ where } }; } - if self.rng.gen::() { + + #[cfg(feature = "std")] + if crate::gen_index(2) == 0 { poll_wrap!(a, b, Either::Left, Either::Right) } else { poll_wrap!(b, a, Either::Right, Either::Left) } + + #[cfg(not(feature = "std"))] + { + poll_wrap!(a, b, Either::Left, Either::Right) + } } } diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index cdc8ae2ab6..abe35e68f7 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -26,8 +26,6 @@ extern crate std; // Macro re-exports pub use futures_core::ready; pub use pin_utils::pin_mut; -use rand::rngs::SmallRng; -use rand::SeedableRng; #[cfg(feature = "async-await")] #[macro_use] @@ -333,18 +331,3 @@ mod abortable; mod fns; mod unfold_state; - -fn gen_rng() -> SmallRng { - #[cfg(feature = "std")] - { - SmallRng::from_rng(rand::thread_rng()).expect("generating SmallRng via thread_rng failed") - } - #[cfg(all(feature = "getrandom", not(feature = "std")))] - { - SmallRng::from_entropy() - } - #[cfg(not(any(feature = "getrandom", feature = "std")))] - { - SmallRng::seed_from_u64(0) - } -} From b1152d1cb34ac877f931e0973fe38ae8a542959c Mon Sep 17 00:00:00 2001 From: Jacob Kiesel Date: Thu, 15 Sep 2022 17:05:31 -0600 Subject: [PATCH 3/6] fix randomization problems by rearranging modules --- futures-macro/src/select.rs | 2 +- futures-macro/src/stream_select.rs | 2 +- futures-util/src/async_await/mod.rs | 5 ----- futures-util/src/future/select_ok.rs | 4 +++- futures-util/src/future/try_select.rs | 10 ++++++---- futures-util/src/lib.rs | 5 +++++ futures-util/src/{async_await => }/random.rs | 0 7 files changed, 16 insertions(+), 12 deletions(-) rename futures-util/src/{async_await => }/random.rs (100%) diff --git a/futures-macro/src/select.rs b/futures-macro/src/select.rs index 57bb74a842..529b9a5dbe 100644 --- a/futures-macro/src/select.rs +++ b/futures-macro/src/select.rs @@ -279,7 +279,7 @@ fn select_inner(input: TokenStream, random: bool) -> TokenStream { let shuffle = if random { quote! { - __futures_crate::async_await::shuffle(&mut __select_arr); + __futures_crate::shuffle(&mut __select_arr); } } else { quote!() diff --git a/futures-macro/src/stream_select.rs b/futures-macro/src/stream_select.rs index 9927b53073..7571a565b2 100644 --- a/futures-macro/src/stream_select.rs +++ b/futures-macro/src/stream_select.rs @@ -56,7 +56,7 @@ pub(crate) fn stream_select(input: TokenStream) -> Result(_: &T) {} diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 00349614f2..3d6fa67cc1 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -59,7 +59,9 @@ impl Future for SelectOk { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let Self { inner } = &mut *self; #[cfg(feature = "std")] - crate::shuffle(inner); + { + crate::shuffle(inner); + } // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { let item = inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index f2811bf364..a24525bbdc 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -90,10 +90,12 @@ where } #[cfg(feature = "std")] - if crate::gen_index(2) == 0 { - poll_wrap!(a, b, Either::Left, Either::Right) - } else { - poll_wrap!(b, a, Either::Right, Either::Left) + { + if crate::gen_index(2) == 0 { + poll_wrap!(a, b, Either::Left, Either::Right) + } else { + poll_wrap!(b, a, Either::Right, Either::Left) + } } #[cfg(not(feature = "std"))] diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index abe35e68f7..b77b2f8122 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -34,6 +34,11 @@ mod async_await; #[doc(hidden)] pub use self::async_await::*; +#[cfg(feature = "std")] +mod random; +#[cfg(feature = "std")] +pub use self::random::*; + // Not public API. #[cfg(feature = "async-await")] #[doc(hidden)] diff --git a/futures-util/src/async_await/random.rs b/futures-util/src/random.rs similarity index 100% rename from futures-util/src/async_await/random.rs rename to futures-util/src/random.rs From 00b449c8a17db62e8801ec7c0225d6c13b9f7c23 Mon Sep 17 00:00:00 2001 From: Jacob Kiesel Date: Mon, 14 Aug 2023 04:22:38 -0500 Subject: [PATCH 4/6] Add biased variants for newly fair select functions --- futures-util/src/future/mod.rs | 6 +- futures-util/src/future/select.rs | 89 ++++++++++++++++++++++++++- futures-util/src/future/select_ok.rs | 40 ++++++++++-- futures-util/src/future/try_select.rs | 55 ++++++++++++++++- futures/tests/future_select.rs | 12 +++- futures/tests/future_select_ok.rs | 14 ++++- futures/tests/future_try_select.rs | 13 +++- 7 files changed, 215 insertions(+), 14 deletions(-) diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 1280ce9864..78ccd13174 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -86,7 +86,7 @@ mod join_all; pub use self::join_all::{join_all, JoinAll}; mod select; -pub use self::select::{select, Select}; +pub use self::select::{select, select_biased, Select}; #[cfg(feature = "alloc")] mod select_all; @@ -102,12 +102,12 @@ mod try_join_all; pub use self::try_join_all::{try_join_all, TryJoinAll}; mod try_select; -pub use self::try_select::{try_select, TrySelect}; +pub use self::try_select::{try_select, try_select_biased, TrySelect}; #[cfg(feature = "alloc")] mod select_ok; #[cfg(feature = "alloc")] -pub use self::select_ok::{select_ok, SelectOk}; +pub use self::select_ok::{select_ok, select_ok_biased, SelectOk}; mod either; pub use self::either::Either; diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index 6bb9b80fc1..7d5eba436d 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -9,6 +9,7 @@ use futures_core::task::{Context, Poll}; #[derive(Debug)] pub struct Select { inner: Option<(A, B)>, + _biased: bool, } impl Unpin for Select {} @@ -23,7 +24,8 @@ impl Unpin for Select {} /// wrapped version of them. /// /// If both futures are ready when this is polled, the winner will be pseudo-randomly -/// selected. +/// selected, unless the std feature is not enabled. If std is enabled, the first +/// argument will always win. /// /// Also note that if both this and the second future have the same /// output type you can use the `Either::factor_first` method to @@ -91,6 +93,88 @@ where { assert_future::, _>(Select { inner: Some((future1, future2)), + _biased: false, + }) +} + +/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// If both futures are ready when this is polled, the winner will always be the first argument. +/// +/// Also note that if both this and the second future have the same +/// output type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// A simple example +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{ +/// pin_mut, +/// future::Either, +/// future::self, +/// }; +/// +/// // These two futures have different types even though their outputs have the same type. +/// let future1 = async { +/// future::pending::<()>().await; // will never finish +/// 1 +/// }; +/// let future2 = async { +/// future::ready(2).await +/// }; +/// +/// // 'select_biased' requires Future + Unpin bounds +/// pin_mut!(future1); +/// pin_mut!(future2); +/// +/// let value = match future::select_biased(future1, future2).await { +/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1` +/// // `_` represents `future2` +/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2` +/// // `_` represents `future1` +/// }; +/// +/// assert!(value == 2); +/// # }); +/// ``` +/// +/// A more complex example +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt}; +/// +/// // A poor-man's join implemented on top of select +/// +/// fn join(a: A, b: B) -> impl Future +/// where A: Future + Unpin, +/// B: Future + Unpin, +/// { +/// future::select_biased(a, b).then(|either| { +/// match either { +/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(), +/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(), +/// } +/// }) +/// } +/// ``` +pub fn select_biased(future1: A, future2: B) -> Select +where + A: Future + Unpin, + B: Future + Unpin, +{ + assert_future::, _>(Select { + inner: Some((future1, future2)), + _biased: true, }) } @@ -111,6 +195,7 @@ where Some(value) => value, } } + let _biased = self._biased; let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); @@ -123,7 +208,7 @@ where } #[cfg(feature = "std")] - if crate::gen_index(2) == 0 { + if _biased || crate::gen_index(2) == 0 { poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); } else { diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 3d6fa67cc1..1aadd9c5e2 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -12,6 +12,7 @@ use futures_core::task::{Context, Poll}; #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SelectOk { inner: Vec, + _biased: bool, } impl Unpin for SelectOk {} @@ -35,7 +36,7 @@ impl Unpin for SelectOk {} /// Some futures that would have been polled and had errors get dropped, may now instead /// remain in the collection without being polled. /// -/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro. +/// If you were relying on this biased behavior, consider switching to the [`select_ok_biased`] function. /// /// # Panics /// @@ -45,7 +46,36 @@ where I: IntoIterator, I::Item: TryFuture + Unpin, { - let ret = SelectOk { inner: iter.into_iter().collect() }; + let ret = SelectOk { inner: iter.into_iter().collect(), _biased: false }; + assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); + assert_future::< + Result<(::Ok, Vec), ::Error>, + _, + >(ret) +} + +/// Creates a new future which will select the first successful future over a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike +/// `select_all`, this will only return the first successful completion, or the last +/// failure. This is useful in contexts where any success is desired and failures +/// are ignored, unless all the futures fail. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// If multiple futures are ready at the same time this function is biased towards +/// entries that are earlier in the list. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no items. +pub fn select_ok_biased(iter: I) -> SelectOk +where + I: IntoIterator, + I::Item: TryFuture + Unpin, +{ + let ret = SelectOk { inner: iter.into_iter().collect(), _biased: true }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); assert_future::< Result<(::Ok, Vec), ::Error>, @@ -57,10 +87,12 @@ impl Future for SelectOk { type Output = Result<(Fut::Ok, Vec), Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { inner } = &mut *self; + let Self { inner, _biased } = &mut *self; #[cfg(feature = "std")] { - crate::shuffle(inner); + if !*_biased { + crate::shuffle(inner); + } } // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index a24525bbdc..8927865d2f 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -8,6 +8,7 @@ use futures_core::task::{Context, Poll}; #[derive(Debug)] pub struct TrySelect { inner: Option<(A, B)>, + _biased: bool, } impl Unpin for TrySelect {} @@ -25,7 +26,8 @@ type EitherErr = Either<(::Error, B), (::E /// wrapped version of them. /// /// If both futures are ready when this is polled, the winner will be pseudo-randomly -/// selected. +/// selected, unless the `std` feature is disabled. If the std feature is disabled, +/// the first argument will always win. /// /// Also note that if both this and the second future have the same /// success/error type you can use the `Either::factor_first` method to @@ -60,6 +62,55 @@ where { super::assert_future::, EitherErr>, _>(TrySelect { inner: Some((future1, future2)), + _biased: false, + }) +} + +/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// If both futures are ready when this is polled, the winner will always be the first one. +/// +/// Also note that if both this and the second future have the same +/// success/error type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt}; +/// +/// // A poor-man's try_join implemented on top of select +/// +/// fn try_join(a: A, b: B) -> impl TryFuture +/// where A: TryFuture + Unpin + 'static, +/// B: TryFuture + Unpin + 'static, +/// E: 'static, +/// { +/// future::try_select_biased(a, b).then(|res| -> Box> + Unpin> { +/// match res { +/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))), +/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))), +/// Err(Either::Left((e, _))) => Box::new(future::err(e)), +/// Err(Either::Right((e, _))) => Box::new(future::err(e)), +/// } +/// }) +/// } +/// ``` +pub fn try_select_biased(future1: A, future2: B) -> TrySelect +where + A: TryFuture + Unpin, + B: TryFuture + Unpin, +{ + super::assert_future::, EitherErr>, _>(TrySelect { + inner: Some((future1, future2)), + _biased: true, }) } @@ -91,7 +142,7 @@ where #[cfg(feature = "std")] { - if crate::gen_index(2) == 0 { + if self._biased || crate::gen_index(2) == 0 { poll_wrap!(a, b, Either::Left, Either::Right) } else { poll_wrap!(b, a, Either::Right, Either::Left) diff --git a/futures/tests/future_select.rs b/futures/tests/future_select.rs index da30a49271..5e2dffa94d 100644 --- a/futures/tests/future_select.rs +++ b/futures/tests/future_select.rs @@ -1,6 +1,6 @@ use std::future::ready; -use futures::future::select; +use futures::future::{select, select_biased}; use futures_executor::block_on; #[test] @@ -14,3 +14,13 @@ fn is_fair() { assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) } + +#[test] +fn is_biased() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = block_on(select_biased(ready(0), ready(1))).factor_first(); + results.push(i); + } + assert!(results.iter().all(|i| *i == 0)); +} diff --git a/futures/tests/future_select_ok.rs b/futures/tests/future_select_ok.rs index a031f05fc5..fa4c48720b 100644 --- a/futures/tests/future_select_ok.rs +++ b/futures/tests/future_select_ok.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use std::time::Duration; use futures::executor::block_on; -use futures::future::{err, ok, select_ok, Future}; +use futures::future::{err, ok, select_ok, select_ok_biased, Future}; use futures_channel::oneshot; use std::thread; @@ -65,3 +65,15 @@ fn is_fair() { assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD); assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD); } + +#[test] +fn is_biased() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let v = vec![err(1), err(2), ok(3), ok(4)]; + + let (i, _v) = block_on(select_ok_biased(v)).ok().unwrap(); + results.push(i); + } + assert!(results.iter().all(|i| *i == 3)); +} diff --git a/futures/tests/future_try_select.rs b/futures/tests/future_try_select.rs index 382466fc7c..40dcd6da7f 100644 --- a/futures/tests/future_try_select.rs +++ b/futures/tests/future_try_select.rs @@ -1,4 +1,4 @@ -use futures::future::{ok, try_select}; +use futures::future::{ok, try_select, try_select_biased}; use futures_executor::block_on; #[test] @@ -12,3 +12,14 @@ fn is_fair() { assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) } + +#[test] +fn is_biased() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = + block_on(try_select_biased(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first(); + results.push(i); + } + assert!(results.iter().all(|i| *i == 0)); +} From bc53113d071c5ae998ba9e64f91febfc794ec1d4 Mon Sep 17 00:00:00 2001 From: Jacob Kiesel Date: Wed, 16 Aug 2023 22:25:04 -0600 Subject: [PATCH 5/6] Use types to represent a select strategy instead of a boolean --- futures-util/src/future/mod.rs | 3 +++ futures-util/src/future/select.rs | 26 ++++++++++++---------- futures-util/src/future/select_ok.rs | 22 +++++++++--------- futures-util/src/future/select_strategy.rs | 25 +++++++++++++++++++++ futures-util/src/future/try_select.rs | 21 +++++++++-------- 5 files changed, 66 insertions(+), 31 deletions(-) create mode 100644 futures-util/src/future/select_strategy.rs diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 78ccd13174..be352fab40 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -85,6 +85,9 @@ mod join_all; #[cfg(feature = "alloc")] pub use self::join_all::{join_all, JoinAll}; +mod select_strategy; +pub use select_strategy::{Biased, Fair, IsBiased}; + mod select; pub use self::select::{select, select_biased, Select}; diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index 7d5eba436d..cbce54c3a0 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -1,5 +1,6 @@ -use super::assert_future; +use super::{assert_future, Biased, Fair, IsBiased}; use crate::future::{Either, FutureExt}; +use core::marker::PhantomData; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; @@ -7,12 +8,12 @@ use futures_core::task::{Context, Poll}; /// Future for the [`select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] -pub struct Select { +pub struct Select { inner: Option<(A, B)>, - _biased: bool, + _phantom: PhantomData, } -impl Unpin for Select {} +impl Unpin for Select {} /// Waits for either one of two differently-typed futures to complete. /// @@ -86,14 +87,14 @@ impl Unpin for Select {} /// }) /// } /// ``` -pub fn select(future1: A, future2: B) -> Select +pub fn select(future1: A, future2: B) -> Select where A: Future + Unpin, B: Future + Unpin, { assert_future::, _>(Select { inner: Some((future1, future2)), - _biased: false, + _phantom: PhantomData, }) } @@ -167,21 +168,22 @@ where /// }) /// } /// ``` -pub fn select_biased(future1: A, future2: B) -> Select +pub fn select_biased(future1: A, future2: B) -> Select where A: Future + Unpin, B: Future + Unpin, { assert_future::, _>(Select { inner: Some((future1, future2)), - _biased: true, + _phantom: PhantomData, }) } -impl Future for Select +impl Future for Select where A: Future + Unpin, B: Future + Unpin, + BIASED: IsBiased, { type Output = Either<(A::Output, B), (B::Output, A)>; @@ -195,7 +197,6 @@ where Some(value) => value, } } - let _biased = self._biased; let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); @@ -208,7 +209,7 @@ where } #[cfg(feature = "std")] - if _biased || crate::gen_index(2) == 0 { + if BIASED::IS_BIASED || crate::gen_index(2) == 0 { poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); } else { @@ -225,10 +226,11 @@ where } } -impl FusedFuture for Select +impl FusedFuture for Select where A: Future + Unpin, B: Future + Unpin, + BIASED: IsBiased, { fn is_terminated(&self) -> bool { self.inner.is_none() diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 1aadd9c5e2..848cd52b89 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -1,7 +1,9 @@ use super::assert_future; +use super::{Biased, Fair, IsBiased}; use crate::future::TryFutureExt; use alloc::vec::Vec; use core::iter::FromIterator; +use core::marker::PhantomData; use core::mem; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; @@ -10,12 +12,12 @@ use futures_core::task::{Context, Poll}; /// Future for the [`select_ok`] function. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct SelectOk { +pub struct SelectOk { inner: Vec, - _biased: bool, + _phantom: PhantomData, } -impl Unpin for SelectOk {} +impl Unpin for SelectOk {} /// Creates a new future which will select the first successful future over a list of futures. /// @@ -41,12 +43,12 @@ impl Unpin for SelectOk {} /// # Panics /// /// This function will panic if the iterator specified contains no items. -pub fn select_ok(iter: I) -> SelectOk +pub fn select_ok(iter: I) -> SelectOk where I: IntoIterator, I::Item: TryFuture + Unpin, { - let ret = SelectOk { inner: iter.into_iter().collect(), _biased: false }; + let ret = SelectOk { inner: iter.into_iter().collect(), _phantom: PhantomData }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); assert_future::< Result<(::Ok, Vec), ::Error>, @@ -70,12 +72,12 @@ where /// # Panics /// /// This function will panic if the iterator specified contains no items. -pub fn select_ok_biased(iter: I) -> SelectOk +pub fn select_ok_biased(iter: I) -> SelectOk where I: IntoIterator, I::Item: TryFuture + Unpin, { - let ret = SelectOk { inner: iter.into_iter().collect(), _biased: true }; + let ret = SelectOk { inner: iter.into_iter().collect(), _phantom: PhantomData }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); assert_future::< Result<(::Ok, Vec), ::Error>, @@ -83,14 +85,14 @@ where >(ret) } -impl Future for SelectOk { +impl Future for SelectOk { type Output = Result<(Fut::Ok, Vec), Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { inner, _biased } = &mut *self; + let Self { inner, _phantom } = &mut *self; #[cfg(feature = "std")] { - if !*_biased { + if !BIASED::IS_BIASED { crate::shuffle(inner); } } diff --git a/futures-util/src/future/select_strategy.rs b/futures-util/src/future/select_strategy.rs new file mode 100644 index 0000000000..4d82e1328d --- /dev/null +++ b/futures-util/src/future/select_strategy.rs @@ -0,0 +1,25 @@ +/// When used with a select future, this will make the future biased. +/// When multiple futures are ready, the winner will be the first one +/// specified. +#[derive(Debug)] +pub struct Biased; + +/// When used with a select future, this will make the future fair. +/// When multiple futures are ready, the winner will be pseudo-randomly +/// selected. This is the default behavior. +#[derive(Debug)] +pub struct Fair; + +/// Reports whether the type is an instance of [`Biased`] or not. +pub trait IsBiased { + /// Contains the answer to our question: is this biased? + const IS_BIASED: bool; +} + +impl IsBiased for Biased { + const IS_BIASED: bool = true; +} + +impl IsBiased for Fair { + const IS_BIASED: bool = false; +} diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index 8927865d2f..fbc3e5c6a4 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -1,4 +1,6 @@ +use super::{Biased, Fair, IsBiased}; use crate::future::{Either, TryFutureExt}; +use core::marker::PhantomData; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; @@ -6,12 +8,12 @@ use futures_core::task::{Context, Poll}; /// Future for the [`try_select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] -pub struct TrySelect { +pub struct TrySelect { inner: Option<(A, B)>, - _biased: bool, + _phantom: PhantomData, } -impl Unpin for TrySelect {} +impl Unpin for TrySelect {} type EitherOk = Either<(::Ok, B), (::Ok, A)>; type EitherErr = Either<(::Error, B), (::Error, A)>; @@ -55,14 +57,14 @@ type EitherErr = Either<(::Error, B), (::E /// }) /// } /// ``` -pub fn try_select(future1: A, future2: B) -> TrySelect +pub fn try_select(future1: A, future2: B) -> TrySelect where A: TryFuture + Unpin, B: TryFuture + Unpin, { super::assert_future::, EitherErr>, _>(TrySelect { inner: Some((future1, future2)), - _biased: false, + _phantom: PhantomData, }) } @@ -103,21 +105,22 @@ where /// }) /// } /// ``` -pub fn try_select_biased(future1: A, future2: B) -> TrySelect +pub fn try_select_biased(future1: A, future2: B) -> TrySelect where A: TryFuture + Unpin, B: TryFuture + Unpin, { super::assert_future::, EitherErr>, _>(TrySelect { inner: Some((future1, future2)), - _biased: true, + _phantom: PhantomData, }) } -impl Future for TrySelect +impl Future for TrySelect where A: TryFuture, B: TryFuture, + BIASED: IsBiased, { type Output = Result, EitherErr>; @@ -142,7 +145,7 @@ where #[cfg(feature = "std")] { - if self._biased || crate::gen_index(2) == 0 { + if BIASED::IS_BIASED || crate::gen_index(2) == 0 { poll_wrap!(a, b, Either::Left, Either::Right) } else { poll_wrap!(b, a, Either::Right, Either::Left) From 91672a1d24b3595e1e5c7c392aac21c2722b58e3 Mon Sep 17 00:00:00 2001 From: Jacob Kiesel Date: Mon, 29 Apr 2024 17:13:13 -0600 Subject: [PATCH 6/6] fix documentation bug --- futures-util/src/future/select.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index cbce54c3a0..9a887173f4 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -25,7 +25,7 @@ impl Unpin for Select {} /// wrapped version of them. /// /// If both futures are ready when this is polled, the winner will be pseudo-randomly -/// selected, unless the std feature is not enabled. If std is enabled, the first +/// selected, unless the std feature is not enabled. If std is not enabled, the first /// argument will always win. /// /// Also note that if both this and the second future have the same