From 984689e2f15b3a1591641064136d4e8aec929214 Mon Sep 17 00:00:00 2001 From: Jacob Kiesel Date: Tue, 6 Sep 2022 21:26:18 -0600 Subject: [PATCH] Add fairness to futures_util::{select, try_select, select_ok}, fixes #2135 --- futures-util/Cargo.toml | 4 ++- futures-util/src/future/select.rs | 24 ++++++++++--- futures-util/src/future/select_ok.rs | 31 +++++++++++----- futures-util/src/future/try_select.rs | 37 +++++++++++++------ futures-util/src/lib.rs | 17 +++++++++ futures/tests/future_select.rs | 16 +++++++++ futures/tests/future_select_ok.rs | 51 +++++++++++++++++++++++---- futures/tests/future_try_select.rs | 14 ++++++++ 8 files changed, 163 insertions(+), 31 deletions(-) create mode 100644 futures/tests/future_select.rs create mode 100644 futures/tests/future_try_select.rs diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 071df637db..856e248e20 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -12,7 +12,8 @@ Common utilities and extension traits for the futures-rs library. [features] default = ["std", "async-await", "async-await-macro"] -std = ["alloc", "futures-core/std", "futures-task/std", "slab"] +std = ["alloc", "futures-core/std", "futures-task/std", "rand/std", "rand/std_rng", "slab"] +getrandom = ["rand/getrandom"] alloc = ["futures-core/alloc", "futures-task/alloc"] async-await = [] async-await-macro = ["async-await", "futures-macro"] @@ -42,6 +43,7 @@ futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" pin-project-lite = "0.2.6" +rand = { version = "0.8.5", default-features = false, features = ["small_rng"] } [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index e693a30b00..05f407ff69 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -3,12 +3,15 @@ use crate::future::{Either, FutureExt}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; +use rand::rngs::SmallRng; +use rand::Rng; /// Future for the [`select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct Select { inner: Option<(A, B)>, + rng: SmallRng, } impl Unpin for Select {} @@ -22,6 +25,9 @@ impl Unpin for Select {} /// Note that this function consumes the receiving futures and returns a /// wrapped version of them. /// +/// If both futures are ready when this is polled, the winner will be pseudo-randomly +/// selected. +/// /// Also note that if both this and the second future have the same /// output type you can use the `Either::factor_first` method to /// conveniently extract out the value at the end. @@ -88,6 +94,7 @@ where { assert_future::, _>(Select { inner: Some((future1, future2)), + rng: crate::gen_rng(), }) } @@ -101,14 +108,21 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - if let Poll::Ready(val) = a.poll_unpin(cx) { - return Poll::Ready(Either::Left((val, b))); + macro_rules! poll_wrap { + ($to_poll:expr, $unpolled:expr, $wrap:expr) => { + if let Poll::Ready(val) = $to_poll.poll_unpin(cx) { + return Poll::Ready($wrap((val, $unpolled))); + } + }; } - if let Poll::Ready(val) = b.poll_unpin(cx) { - return Poll::Ready(Either::Right((val, a))); + if self.rng.gen::() { + poll_wrap!(a, b, Either::Left); + poll_wrap!(b, a, Either::Right); + } else { + poll_wrap!(b, a, Either::Right); + poll_wrap!(a, b, Either::Left); } - self.inner = Some((a, b)); Poll::Pending } diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 5d5579930b..3043f91e14 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -6,12 +6,15 @@ use core::mem; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; +use rand::prelude::SliceRandom; +use rand::rngs::SmallRng; /// Future for the [`select_ok`] function. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SelectOk { inner: Vec, + rng: SmallRng, } impl Unpin for SelectOk {} @@ -26,6 +29,17 @@ impl Unpin for SelectOk {} /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. /// +/// # Note for users migrating from 0.3 to 0.4 +/// +/// This function used to be biased in favor of futures that appeared earlier in the +/// iterator. This is no longer the case, the futures are now shuffled prior to being polled. +/// This prevents starvation problems. It also has the side effect that the returned `Vec` +/// of remaining futures may be longer than it was in version 0.3, because of this shuffling. +/// Some futures that would have been polled and had errors get dropped, may now instead +/// remain in the collection without being polled. +/// +/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro. +/// /// # Panics /// /// This function will panic if the iterator specified contains no items. @@ -34,7 +48,7 @@ where I: IntoIterator, I::Item: TryFuture + Unpin, { - let ret = SelectOk { inner: iter.into_iter().collect() }; + let ret = SelectOk { inner: iter.into_iter().collect(), rng: crate::gen_rng() }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); assert_future::< Result<(::Ok, Vec), ::Error>, @@ -46,24 +60,25 @@ impl Future for SelectOk { type Output = Result<(Fut::Ok, Vec), Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { inner, rng } = &mut *self; + inner.shuffle(rng); // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { - let item = - self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { - Poll::Pending => None, - Poll::Ready(e) => Some((i, e)), - }); + let item = inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), + }); match item { Some((idx, res)) => { // always remove Ok or Err, if it's not the last Err continue looping - drop(self.inner.remove(idx)); + drop(inner.remove(idx)); match res { Ok(e) => { let rest = mem::take(&mut self.inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { - if self.inner.is_empty() { + if inner.is_empty() { return Poll::Ready(Err(e)); } } diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index bc282f7db1..a1585ecd65 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -2,12 +2,15 @@ use crate::future::{Either, TryFutureExt}; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; +use rand::rngs::SmallRng; +use rand::Rng; /// Future for the [`try_select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct TrySelect { inner: Option<(A, B)>, + rng: SmallRng, } impl Unpin for TrySelect {} @@ -24,6 +27,10 @@ type EitherErr = Either<(::Error, B), (::E /// Note that this function consumes the receiving futures and returns a /// wrapped version of them. /// +/// If both futures are ready when this is polled, the winner will be pseudo-randomly +/// selected. +/// +/// /// Also note that if both this and the second future have the same /// success/error type you can use the `Either::factor_first` method to /// conveniently extract out the value at the end. @@ -57,6 +64,7 @@ where { super::assert_future::, EitherErr>, _>(TrySelect { inner: Some((future1, future2)), + rng: crate::gen_rng(), }) } @@ -69,17 +77,26 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.try_poll_unpin(cx) { - Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))), - Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))), - Poll::Pending => match b.try_poll_unpin(cx) { - Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))), - Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending + macro_rules! poll_wrap { + ($poll_first:expr, $poll_second:expr, $wrap_first:expr, $wrap_second:expr) => { + match $poll_first.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_first((x, $poll_second)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_first((x, $poll_second)))), + Poll::Pending => match $poll_second.try_poll_unpin(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_second((x, $poll_first)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_second((x, $poll_first)))), + Poll::Pending => { + self.inner = Some((a, b)); + Poll::Pending + } + }, } - }, + }; + } + if self.rng.gen::() { + poll_wrap!(a, b, Either::Left, Either::Right) + } else { + poll_wrap!(b, a, Either::Right, Either::Left) } } } diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index e00faf4775..5ad694d959 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -29,6 +29,8 @@ extern crate alloc; // Macro re-exports pub use futures_core::ready; pub use pin_utils::pin_mut; +use rand::rngs::SmallRng; +use rand::SeedableRng; #[cfg(feature = "async-await")] #[macro_use] @@ -334,3 +336,18 @@ mod abortable; mod fns; mod unfold_state; + +fn gen_rng() -> SmallRng { + #[cfg(feature = "std")] + { + SmallRng::from_rng(rand::thread_rng()).expect("generating SmallRng via thread_rng failed") + } + #[cfg(all(feature = "getrandom", not(feature = "std")))] + { + SmallRng::from_entropy() + } + #[cfg(not(any(feature = "getrandom", feature = "std")))] + { + SmallRng::seed_from_u64(0) + } +} diff --git a/futures/tests/future_select.rs b/futures/tests/future_select.rs new file mode 100644 index 0000000000..da30a49271 --- /dev/null +++ b/futures/tests/future_select.rs @@ -0,0 +1,16 @@ +use std::future::ready; + +use futures::future::select; +use futures_executor::block_on; + +#[test] +fn is_fair() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = block_on(select(ready(0), ready(1))).factor_first(); + results.push(i); + } + const THRESHOLD: usize = 30; + assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); + assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) +} diff --git a/futures/tests/future_select_ok.rs b/futures/tests/future_select_ok.rs index 8aec00362d..a031f05fc5 100644 --- a/futures/tests/future_select_ok.rs +++ b/futures/tests/future_select_ok.rs @@ -1,30 +1,67 @@ +use std::fmt::Debug; +use std::time::Duration; + use futures::executor::block_on; -use futures::future::{err, ok, select_ok}; +use futures::future::{err, ok, select_ok, Future}; +use futures_channel::oneshot; +use std::thread; #[test] fn ignore_err() { let v = vec![err(1), err(2), ok(3), ok(4)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); - assert_eq!(i, 3); + assert!(i == 3 || i == 4); - assert_eq!(v.len(), 1); + assert!(v.len() < 4); - let (i, v) = block_on(select_ok(v)).ok().unwrap(); - assert_eq!(i, 4); + let (j, v) = block_on(select_ok(v)).ok().unwrap(); + assert!(j == 3 || j == 4); + assert_ne!(j, i); - assert!(v.is_empty()); + assert!(v.len() < 3); } #[test] fn last_err() { - let v = vec![ok(1), err(2), err(3)]; + let (ok_sender, ok_receiver) = oneshot::channel(); + let (first_err_sender, first_err_receiver) = oneshot::channel(); + let (second_err_sender, second_err_receiver) = oneshot::channel(); + async fn await_unwrap(o: impl Future>) -> T { + o.await.unwrap() + } + let v = vec![ + Box::pin(await_unwrap(ok_receiver)), + Box::pin(await_unwrap(first_err_receiver)), + Box::pin(await_unwrap(second_err_receiver)), + ]; + ok_sender.send(Ok(1)).unwrap(); let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 1); assert_eq!(v.len(), 2); + first_err_sender.send(Err(2)).unwrap(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + second_err_sender.send(Err(3)).unwrap(); + }); let i = block_on(select_ok(v)).err().unwrap(); assert_eq!(i, 3); } + +#[test] +fn is_fair() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let v = vec![err(1), err(2), ok(3), ok(4)]; + + let (i, _v) = block_on(select_ok(v)).ok().unwrap(); + results.push(i); + } + const THRESHOLD: usize = 30; + assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD); + assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD); +} diff --git a/futures/tests/future_try_select.rs b/futures/tests/future_try_select.rs new file mode 100644 index 0000000000..382466fc7c --- /dev/null +++ b/futures/tests/future_try_select.rs @@ -0,0 +1,14 @@ +use futures::future::{ok, try_select}; +use futures_executor::block_on; + +#[test] +fn is_fair() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = block_on(try_select(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first(); + results.push(i); + } + const THRESHOLD: usize = 30; + assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); + assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) +}