Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fairness to futures_util::{select, try_select, select_ok} #2641

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-macro/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ fn select_inner(input: TokenStream, random: bool) -> TokenStream {

let shuffle = if random {
quote! {
__futures_crate::async_await::shuffle(&mut __select_arr);
__futures_crate::shuffle(&mut __select_arr);
}
} else {
quote!()
Expand Down
2 changes: 1 addition & 1 deletion futures-macro/src/stream_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) fn stream_select(input: TokenStream) -> Result<TokenStream, syn::Erro
let mut any_pending = false;
{
let mut stream_array = [#(#field_idents.as_mut().map(|f| StreamEnum::#generic_idents(f)).unwrap_or(StreamEnum::None)),*];
__futures_crate::async_await::shuffle(&mut stream_array);
__futures_crate::shuffle(&mut stream_array);

for mut s in stream_array {
if let StreamEnum::None = s {
Expand Down
7 changes: 0 additions & 7 deletions futures-util/src/async_await/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ mod stream_select_mod;
#[cfg(feature = "async-await-macro")]
pub use self::stream_select_mod::*;

#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
mod random;
#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
pub use self::random::*;

#[doc(hidden)]
#[inline(always)]
pub fn assert_unpin<T: Unpin>(_: &T) {}
Expand Down
9 changes: 6 additions & 3 deletions futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ mod join_all;
#[cfg(feature = "alloc")]
pub use self::join_all::{join_all, JoinAll};

mod select_strategy;
pub use select_strategy::{Biased, Fair, IsBiased};

mod select;
pub use self::select::{select, Select};
pub use self::select::{select, select_biased, Select};

#[cfg(feature = "alloc")]
mod select_all;
Expand All @@ -102,12 +105,12 @@ mod try_join_all;
pub use self::try_join_all::{try_join_all, TryJoinAll};

mod try_select;
pub use self::try_select::{try_select, TrySelect};
pub use self::try_select::{try_select, try_select_biased, TrySelect};

#[cfg(feature = "alloc")]
mod select_ok;
#[cfg(feature = "alloc")]
pub use self::select_ok::{select_ok, SelectOk};
pub use self::select_ok::{select_ok, select_ok_biased, SelectOk};

mod either;
pub use self::either::Either;
Expand Down
124 changes: 114 additions & 10 deletions futures-util/src/future/select.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use super::assert_future;
use super::{assert_future, Biased, Fair, IsBiased};
use crate::future::{Either, FutureExt};
use core::marker::PhantomData;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};

/// Future for the [`select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Select<A, B> {
pub struct Select<A, B, BIASED = Fair> {
inner: Option<(A, B)>,
_phantom: PhantomData<BIASED>,
}

impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
impl<A: Unpin, B: Unpin, BIASED> Unpin for Select<A, B, BIASED> {}

/// Waits for either one of two differently-typed futures to complete.
///
Expand All @@ -22,6 +24,10 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
/// selected, unless the std feature is not enabled. If std is not enabled, the first
/// argument will always win.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::factor_first` method to
/// conveniently extract out the value at the end.
Expand Down Expand Up @@ -81,20 +87,103 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
/// })
/// }
/// ```
pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>
pub fn select<A, B>(future1: A, future2: B) -> Select<A, B, Fair>
where
A: Future + Unpin,
B: Future + Unpin,
{
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
inner: Some((future1, future2)),
_phantom: PhantomData,
})
}

/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one.
///
/// This function will return a new future which awaits for either one of both
/// futures to complete. The returned future will finish with both the value
/// resolved and a future representing the completion of the other work.
///
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// If both futures are ready when this is polled, the winner will always be the first argument.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::factor_first` method to
/// conveniently extract out the value at the end.
///
/// # Examples
///
/// A simple example
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::{
/// pin_mut,
/// future::Either,
/// future::self,
/// };
///
/// // These two futures have different types even though their outputs have the same type.
/// let future1 = async {
/// future::pending::<()>().await; // will never finish
/// 1
/// };
/// let future2 = async {
/// future::ready(2).await
/// };
///
/// // 'select_biased' requires Future + Unpin bounds
/// pin_mut!(future1);
/// pin_mut!(future2);
///
/// let value = match future::select_biased(future1, future2).await {
/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
/// // `_` represents `future2`
/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2`
/// // `_` represents `future1`
/// };
///
/// assert!(value == 2);
/// # });
/// ```
///
/// A more complex example
///
/// ```
/// use futures::future::{self, Either, Future, FutureExt};
///
/// // A poor-man's join implemented on top of select
///
/// fn join<A, B>(a: A, b: B) -> impl Future<Output=(A::Output, B::Output)>
/// where A: Future + Unpin,
/// B: Future + Unpin,
/// {
/// future::select_biased(a, b).then(|either| {
/// match either {
/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(),
/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(),
/// }
/// })
/// }
/// ```
pub fn select_biased<A, B>(future1: A, future2: B) -> Select<A, B, Biased>
where
A: Future + Unpin,
B: Future + Unpin,
{
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
inner: Some((future1, future2)),
_phantom: PhantomData,
})
}

impl<A, B> Future for Select<A, B>
impl<A, B, BIASED> Future for Select<A, B, BIASED>
where
A: Future + Unpin,
B: Future + Unpin,
BIASED: IsBiased,
{
type Output = Either<(A::Output, B), (B::Output, A)>;

Expand All @@ -111,22 +200,37 @@ where

let (a, b) = self.inner.as_mut().expect("cannot poll Select twice");

if let Poll::Ready(val) = a.poll_unpin(cx) {
return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1)));
macro_rules! poll_wrap {
($to_poll:expr, $unpolled:expr, $wrap:expr) => {
if let Poll::Ready(val) = $to_poll.poll_unpin(cx) {
return Poll::Ready($wrap((val, $unpolled)));
}
};
}

if let Poll::Ready(val) = b.poll_unpin(cx) {
return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0)));
#[cfg(feature = "std")]
if BIASED::IS_BIASED || crate::gen_index(2) == 0 {
poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left);
poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right);
} else {
poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right);
poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left);
}

#[cfg(not(feature = "std"))]
{
poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left);
poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right);
}
Poll::Pending
}
}

impl<A, B> FusedFuture for Select<A, B>
impl<A, B, BIASED> FusedFuture for Select<A, B, BIASED>
where
A: Future + Unpin,
B: Future + Unpin,
BIASED: IsBiased,
{
fn is_terminated(&self) -> bool {
self.inner.is_none()
Expand Down
75 changes: 62 additions & 13 deletions futures-util/src/future/select_ok.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::assert_future;
use super::{Biased, Fair, IsBiased};
use crate::future::TryFutureExt;
use alloc::vec::Vec;
use core::iter::FromIterator;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
Expand All @@ -10,11 +12,12 @@ use futures_core::task::{Context, Poll};
/// Future for the [`select_ok`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SelectOk<Fut> {
pub struct SelectOk<Fut, BIASED = Fair> {
inner: Vec<Fut>,
_phantom: PhantomData<BIASED>,
}

impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
impl<Fut: Unpin, BIASED> Unpin for SelectOk<Fut, BIASED> {}

/// Creates a new future which will select the first successful future over a list of futures.
///
Expand All @@ -26,44 +29,90 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Note for users migrating from 0.3 to 0.4
///
/// This function used to be biased in favor of futures that appeared earlier in the
/// iterator. This is no longer the case, the futures are now shuffled prior to being polled.
/// This prevents starvation problems. It also has the side effect that the returned `Vec`
/// of remaining futures may be longer than it was in version 0.3, because of this shuffling.
/// Some futures that would have been polled and had errors get dropped, may now instead
/// remain in the collection without being polled.
///
/// If you were relying on this biased behavior, consider switching to the [`select_ok_biased`] function.
///
/// # Panics
///
/// This function will panic if the iterator specified contains no items.
pub fn select_ok<I>(iter: I) -> SelectOk<I::Item, Fair>
where
I: IntoIterator,
I::Item: TryFuture + Unpin,
{
let ret = SelectOk { inner: iter.into_iter().collect(), _phantom: PhantomData };
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
assert_future::<
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
_,
>(ret)
}

/// Creates a new future which will select the first successful future over a list of futures.
///
/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike
/// `select_all`, this will only return the first successful completion, or the last
/// failure. This is useful in contexts where any success is desired and failures
/// are ignored, unless all the futures fail.
///
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// If multiple futures are ready at the same time this function is biased towards
/// entries that are earlier in the list.
///
/// # Panics
///
/// This function will panic if the iterator specified contains no items.
pub fn select_ok<I>(iter: I) -> SelectOk<I::Item>
pub fn select_ok_biased<I>(iter: I) -> SelectOk<I::Item, Biased>
where
I: IntoIterator,
I::Item: TryFuture + Unpin,
{
let ret = SelectOk { inner: iter.into_iter().collect() };
let ret = SelectOk { inner: iter.into_iter().collect(), _phantom: PhantomData };
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
assert_future::<
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
_,
>(ret)
}

impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
impl<Fut: TryFuture + Unpin, BIASED: IsBiased> Future for SelectOk<Fut, BIASED> {
type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { inner, _phantom } = &mut *self;
#[cfg(feature = "std")]
{
if !BIASED::IS_BIASED {
crate::shuffle(inner);
}
}
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
loop {
let item =
self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
});
let item = inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
});
match item {
Some((idx, res)) => {
// always remove Ok or Err, if it's not the last Err continue looping
drop(self.inner.remove(idx));
drop(inner.remove(idx));
match res {
Ok(e) => {
let rest = mem::take(&mut self.inner);
let rest = mem::take(inner);
return Poll::Ready(Ok((e, rest)));
}
Err(e) => {
if self.inner.is_empty() {
if inner.is_empty() {
return Poll::Ready(Err(e));
}
}
Expand Down
25 changes: 25 additions & 0 deletions futures-util/src/future/select_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/// When used with a select future, this will make the future biased.
/// When multiple futures are ready, the winner will be the first one
/// specified.
#[derive(Debug)]
pub struct Biased;

/// When used with a select future, this will make the future fair.
/// When multiple futures are ready, the winner will be pseudo-randomly
/// selected. This is the default behavior.
#[derive(Debug)]
pub struct Fair;

/// Reports whether the type is an instance of [`Biased`] or not.
pub trait IsBiased {
/// Contains the answer to our question: is this biased?
const IS_BIASED: bool;
}

impl IsBiased for Biased {
const IS_BIASED: bool = true;
}

impl IsBiased for Fair {
const IS_BIASED: bool = false;
}
Loading