Skip to content

Commit

Permalink
Add fairness to futures_util::{select, try_select, select_ok}, fixes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
Xaeroxe committed Sep 7, 2022
1 parent 67462f7 commit ceb1595
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 31 deletions.
4 changes: 3 additions & 1 deletion futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ Common utilities and extension traits for the futures-rs library.

[features]
default = ["std", "async-await", "async-await-macro"]
std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
std = ["alloc", "futures-core/std", "futures-task/std", "rand/std", "rand/std_rng", "slab"]
getrandom = ["rand/getrandom"]
alloc = ["futures-core/alloc", "futures-task/alloc"]
async-await = []
async-await-macro = ["async-await", "futures-macro"]
Expand Down Expand Up @@ -42,6 +43,7 @@ futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
pin-project-lite = "0.2.6"
rand = { version = "0.8.5", default-features = false, features = ["small_rng"] }

[dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
Expand Down
24 changes: 19 additions & 5 deletions futures-util/src/future/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use crate::future::{Either, FutureExt};
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use rand::rngs::SmallRng;
use rand::Rng;

/// Future for the [`select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Select<A, B> {
inner: Option<(A, B)>,
rng: SmallRng,
}

impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
Expand All @@ -22,6 +25,9 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
/// selected.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::factor_first` method to
/// conveniently extract out the value at the end.
Expand Down Expand Up @@ -88,6 +94,7 @@ where
{
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
inner: Some((future1, future2)),
rng: crate::gen_rng(),
})
}

Expand All @@ -101,14 +108,21 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");

if let Poll::Ready(val) = a.poll_unpin(cx) {
return Poll::Ready(Either::Left((val, b)));
macro_rules! poll_wrap {
($to_poll:expr, $unpolled:expr, $wrap:expr) => {
if let Poll::Ready(val) = $to_poll.poll_unpin(cx) {
return Poll::Ready($wrap((val, $unpolled)));
}
};
}

if let Poll::Ready(val) = b.poll_unpin(cx) {
return Poll::Ready(Either::Right((val, a)));
if self.rng.gen::<bool>() {
poll_wrap!(a, b, Either::Left);
poll_wrap!(b, a, Either::Right);
} else {
poll_wrap!(b, a, Either::Right);
poll_wrap!(a, b, Either::Left);
}

self.inner = Some((a, b));
Poll::Pending
}
Expand Down
31 changes: 23 additions & 8 deletions futures-util/src/future/select_ok.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ use core::mem;
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::task::{Context, Poll};
use rand::prelude::SliceRandom;
use rand::rngs::SmallRng;

/// Future for the [`select_ok`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SelectOk<Fut> {
inner: Vec<Fut>,
rng: SmallRng,
}

impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
Expand All @@ -26,6 +29,17 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Note for users migrating from 0.3 to 0.4
///
/// This function used to be biased in favor of futures that appeared earlier in the
/// iterator. This is no longer the case, the futures are now shuffled prior to being polled.
/// This prevents starvation problems. It also has the side effect that the returned `Vec`
/// of remaining futures may be longer than it was in version 0.3, because of this shuffling.
/// Some futures that would have been polled and had errors get dropped, may now instead
/// remain in the collection without being polled.
///
/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro.
///
/// # Panics
///
/// This function will panic if the iterator specified contains no items.
Expand All @@ -34,7 +48,7 @@ where
I: IntoIterator,
I::Item: TryFuture + Unpin,
{
let ret = SelectOk { inner: iter.into_iter().collect() };
let ret = SelectOk { inner: iter.into_iter().collect(), rng: crate::gen_rng() };
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
assert_future::<
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
Expand All @@ -46,24 +60,25 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { inner, rng } = &mut *self;
inner.shuffle(rng);
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
loop {
let item =
self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
});
let item = inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
});
match item {
Some((idx, res)) => {
// always remove Ok or Err, if it's not the last Err continue looping
drop(self.inner.remove(idx));
drop(inner.remove(idx));
match res {
Ok(e) => {
let rest = mem::take(&mut self.inner);
return Poll::Ready(Ok((e, rest)));
}
Err(e) => {
if self.inner.is_empty() {
if inner.is_empty() {
return Poll::Ready(Err(e));
}
}
Expand Down
36 changes: 26 additions & 10 deletions futures-util/src/future/try_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use crate::future::{Either, TryFutureExt};
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::task::{Context, Poll};
use rand::rngs::SmallRng;
use rand::Rng;

/// Future for the [`try_select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct TrySelect<A, B> {
inner: Option<(A, B)>,
rng: SmallRng,
}

impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
Expand All @@ -24,6 +27,9 @@ type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::E
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
/// selected.
///
/// Also note that if both this and the second future have the same
/// success/error type you can use the `Either::factor_first` method to
/// conveniently extract out the value at the end.
Expand Down Expand Up @@ -57,6 +63,7 @@ where
{
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
inner: Some((future1, future2)),
rng: crate::gen_rng(),
})
}

Expand All @@ -69,17 +76,26 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
match a.try_poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))),
Poll::Pending => match b.try_poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))),
Poll::Pending => {
self.inner = Some((a, b));
Poll::Pending
macro_rules! poll_wrap {
($poll_first:expr, $poll_second:expr, $wrap_first:expr, $wrap_second:expr) => {
match $poll_first.try_poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_first((x, $poll_second)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_first((x, $poll_second)))),
Poll::Pending => match $poll_second.try_poll_unpin(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_second((x, $poll_first)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_second((x, $poll_first)))),
Poll::Pending => {
self.inner = Some((a, b));
Poll::Pending
}
},
}
},
};
}
if self.rng.gen::<bool>() {
poll_wrap!(a, b, Either::Left, Either::Right)
} else {
poll_wrap!(b, a, Either::Right, Either::Left)
}
}
}
17 changes: 17 additions & 0 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ extern crate alloc;
// Macro re-exports
pub use futures_core::ready;
pub use pin_utils::pin_mut;
use rand::rngs::SmallRng;
use rand::SeedableRng;

#[cfg(feature = "async-await")]
#[macro_use]
Expand Down Expand Up @@ -334,3 +336,18 @@ mod abortable;

mod fns;
mod unfold_state;

fn gen_rng() -> SmallRng {
#[cfg(feature = "std")]
{
SmallRng::from_rng(rand::thread_rng()).expect("generating SmallRng via thread_rng failed")
}
#[cfg(all(feature = "getrandom", not(feature = "std")))]
{
SmallRng::from_entropy()
}
#[cfg(not(any(feature = "getrandom", feature = "std")))]
{
SmallRng::seed_from_u64(0)
}
}
16 changes: 16 additions & 0 deletions futures/tests/future_select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::future::ready;

use futures::future::select;
use futures_executor::block_on;

#[test]
fn is_fair() {
let mut results = Vec::with_capacity(100);
for _ in 0..100 {
let (i, _) = block_on(select(ready(0), ready(1))).factor_first();
results.push(i);
}
const THRESHOLD: usize = 30;
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
}
51 changes: 44 additions & 7 deletions futures/tests/future_select_ok.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,67 @@
use std::fmt::Debug;
use std::time::Duration;

use futures::executor::block_on;
use futures::future::{err, ok, select_ok};
use futures::future::{err, ok, select_ok, Future};
use futures_channel::oneshot;
use std::thread;

#[test]
fn ignore_err() {
let v = vec![err(1), err(2), ok(3), ok(4)];

let (i, v) = block_on(select_ok(v)).ok().unwrap();
assert_eq!(i, 3);
assert!(i == 3 || i == 4);

assert_eq!(v.len(), 1);
assert!(v.len() < 4);

let (i, v) = block_on(select_ok(v)).ok().unwrap();
assert_eq!(i, 4);
let (j, v) = block_on(select_ok(v)).ok().unwrap();
assert!(j == 3 || j == 4);
assert_ne!(j, i);

assert!(v.is_empty());
assert!(v.len() < 3);
}

#[test]
fn last_err() {
let v = vec![ok(1), err(2), err(3)];
let (ok_sender, ok_receiver) = oneshot::channel();
let (first_err_sender, first_err_receiver) = oneshot::channel();
let (second_err_sender, second_err_receiver) = oneshot::channel();
async fn await_unwrap<T, E: Debug>(o: impl Future<Output = Result<T, E>>) -> T {
o.await.unwrap()
}
let v = vec![
Box::pin(await_unwrap(ok_receiver)),
Box::pin(await_unwrap(first_err_receiver)),
Box::pin(await_unwrap(second_err_receiver)),
];
ok_sender.send(Ok(1)).unwrap();

let (i, v) = block_on(select_ok(v)).ok().unwrap();
assert_eq!(i, 1);

assert_eq!(v.len(), 2);
first_err_sender.send(Err(2)).unwrap();

thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
second_err_sender.send(Err(3)).unwrap();
});

let i = block_on(select_ok(v)).err().unwrap();
assert_eq!(i, 3);
}

#[test]
fn is_fair() {
let mut results = Vec::with_capacity(100);
for _ in 0..100 {
let v = vec![err(1), err(2), ok(3), ok(4)];

let (i, _v) = block_on(select_ok(v)).ok().unwrap();
results.push(i);
}
const THRESHOLD: usize = 30;
assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD);
assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD);
}
14 changes: 14 additions & 0 deletions futures/tests/future_try_select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use futures::future::{ok, try_select};
use futures_executor::block_on;

#[test]
fn is_fair() {
let mut results = Vec::with_capacity(100);
for _ in 0..100 {
let (i, _) = block_on(try_select(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first();
results.push(i);
}
const THRESHOLD: usize = 30;
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
}

0 comments on commit ceb1595

Please sign in to comment.