Skip to content

Commit

Permalink
refactor: remove into_stream and IntoStream
Browse files Browse the repository at this point in the history
  • Loading branch information
yhx-12243 committed Aug 19, 2024
1 parent d28bede commit 9ddeb5b
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 128 deletions.
5 changes: 4 additions & 1 deletion futures-core/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ mod private_try_stream {

/// A convenience for streams that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryStream: Stream + private_try_stream::Sealed {
pub trait TryStream:
Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>
+ private_try_stream::Sealed
{
/// The type of successful values yielded by this future
type Ok;

Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream};

mod try_stream;
pub use self::try_stream::{
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll,
TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile,
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, MapErr, MapOk, OrElse, TryAll, TryAny,
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile,
TryStreamExt, TryTakeWhile, TryUnfold,
};

Expand Down
52 changes: 0 additions & 52 deletions futures-util/src/stream/try_stream/into_stream.rs

This file was deleted.

47 changes: 8 additions & 39 deletions futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,29 @@ delegate_all!(
delegate_all!(
/// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
InspectOk<St, F>(
Inspect<IntoStream<St>, InspectOkFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
Inspect<St, InspectOkFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Inspect::new(x, inspect_ok_fn(f))]
);

delegate_all!(
/// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
InspectErr<St, F>(
Inspect<IntoStream<St>, InspectErrFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
Inspect<St, InspectErrFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Inspect::new(x, inspect_err_fn(f))]
);

mod into_stream;
pub use self::into_stream::IntoStream;

delegate_all!(
/// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
MapOk<St, F>(
Map<IntoStream<St>, MapOkFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
Map<St, MapOkFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Map::new(x, map_ok_fn(f))]
);

delegate_all!(
/// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
MapErr<St, F>(
Map<IntoStream<St>, MapErrFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
Map<St, MapErrFn<F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Map::new(x, map_err_fn(f))]
);

mod or_else;
Expand Down Expand Up @@ -352,34 +349,6 @@ pub trait TryStreamExt: TryStream {
assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
}

/// Wraps a [`TryStream`] into a type that implements
/// [`Stream`](futures_core::stream::Stream)
///
/// [`TryStream`]s currently do not implement the
/// [`Stream`](futures_core::stream::Stream) trait because of limitations
/// of the compiler.
///
/// # Examples
///
/// ```
/// use futures::stream::{Stream, TryStream, TryStreamExt};
///
/// # type T = i32;
/// # type E = ();
/// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
/// # futures::stream::empty()
/// # }
/// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
///
/// take_stream(make_try_stream().into_stream());
/// ```
fn into_stream(self) -> IntoStream<Self>
where
Self: Sized,
{
assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
}

/// Creates a future that attempts to resolve the next item in the stream.
/// If an error is encountered before the next item, the error is returned
/// instead.
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/stream/try_stream/try_buffer_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt};
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
use core::num::NonZeroUsize;
use core::pin::Pin;
use futures_core::future::TryFuture;
Expand All @@ -17,7 +17,7 @@ pin_project! {
where St: TryStream
{
#[pin]
stream: Fuse<IntoStream<St>>,
stream: Fuse<St>,
in_progress_queue: FuturesUnordered<St::Ok>,
max: Option<NonZeroUsize>,
}
Expand All @@ -30,13 +30,13 @@ where
{
pub(super) fn new(stream: St, n: Option<usize>) -> Self {
Self {
stream: IntoStream::new(stream).fuse(),
stream: stream.fuse(),
in_progress_queue: FuturesUnordered::new(),
max: n.and_then(NonZeroUsize::new),
}
}

delegate_access_inner!(stream, St, (. .));
delegate_access_inner!(stream, St, (.));
}

impl<St> Stream for TryBufferUnordered<St>
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/stream/try_stream/try_buffered.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt};
use crate::stream::{Fuse, FuturesOrdered, StreamExt};
use core::num::NonZeroUsize;
use core::pin::Pin;
use futures_core::future::TryFuture;
Expand All @@ -18,7 +18,7 @@ pin_project! {
St::Ok: TryFuture,
{
#[pin]
stream: Fuse<IntoStream<St>>,
stream: Fuse<St>,
in_progress_queue: FuturesOrdered<St::Ok>,
max: Option<NonZeroUsize>,
}
Expand All @@ -31,13 +31,13 @@ where
{
pub(super) fn new(stream: St, n: Option<usize>) -> Self {
Self {
stream: IntoStream::new(stream).fuse(),
stream: stream.fuse(),
in_progress_queue: FuturesOrdered::new(),
max: n.and_then(NonZeroUsize::new),
}
}

delegate_access_inner!(stream, St, (. .));
delegate_access_inner!(stream, St, (.));
}

impl<St> Stream for TryBuffered<St>
Expand Down
12 changes: 4 additions & 8 deletions futures-util/src/stream/try_stream/try_chunks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, IntoStream, StreamExt};
use crate::stream::{Fuse, StreamExt};

use alloc::vec::Vec;
use core::pin::Pin;
Expand All @@ -16,7 +16,7 @@ pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct TryChunks<St: TryStream> {
#[pin]
stream: Fuse<IntoStream<St>>,
stream: Fuse<St>,
items: Vec<St::Ok>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
Expand All @@ -26,19 +26,15 @@ impl<St: TryStream> TryChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self {
stream: IntoStream::new(stream).fuse(),
items: Vec::with_capacity(capacity),
cap: capacity,
}
Self { stream: stream.fuse(), items: Vec::with_capacity(capacity), cap: capacity }
}

fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
let cap = self.cap;
mem::replace(self.project().items, Vec::with_capacity(cap))
}

delegate_access_inner!(stream, St, (. .));
delegate_access_inner!(stream, St, (.));
}

type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
Expand Down
6 changes: 2 additions & 4 deletions futures-util/src/stream/try_stream/try_flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use crate::future::Either;
use crate::stream::stream::flatten_unordered::{
FlattenUnorderedWithFlowController, FlowController, FlowStep,
};
use crate::stream::IntoStream;
use crate::TryStreamExt;

delegate_all!(
/// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
Expand Down Expand Up @@ -128,15 +126,15 @@ where
{
// Item is either an inner stream or a stream containing a single error.
// This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
type Item = Either<St::Ok, SingleStreamResult<St::Ok>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.project().stream.try_poll_next(cx));

let out = match item {
Some(res) => match res {
// Emit successful inner stream as is
Ok(stream) => Either::Left(stream.into_stream()),
Ok(stream) => Either::Left(stream),
// Wrap an error into a stream containing a single item
err @ Err(_) => {
let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/try_stream/try_forward.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, IntoStream, Stream, TryStream};
use crate::stream::{Fuse, Stream, TryStream};
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
Expand All @@ -15,14 +15,14 @@ pin_project! {
#[pin]
sink: Option<Si>,
#[pin]
stream: Fuse<IntoStream<St>>,
stream: Fuse<St>,
buffered_item: Option<Item>,
}
}

impl<St, Si, Item> TryForward<St, Si, Item> {
pub(crate) fn new(stream: St, sink: Si) -> Self {
Self { sink: Some(sink), stream: Fuse::new(IntoStream::new(stream)), buffered_item: None }
Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None }
}
}

Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/stream/try_stream/try_ready_chunks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, IntoStream, StreamExt};
use crate::stream::{Fuse, StreamExt};

use alloc::vec::Vec;
use core::fmt;
Expand All @@ -15,7 +15,7 @@ pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct TryReadyChunks<St: TryStream> {
#[pin]
stream: Fuse<IntoStream<St>>,
stream: Fuse<St>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}
Expand All @@ -24,10 +24,10 @@ impl<St: TryStream> TryReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self { stream: IntoStream::new(stream).fuse(), cap: capacity }
Self { stream: stream.fuse(), cap: capacity }
}

delegate_access_inner!(stream, St, (. .));
delegate_access_inner!(stream, St, (.));
}

type TryReadyChunksStreamError<St> =
Expand Down
7 changes: 0 additions & 7 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1361,13 +1361,6 @@ mod stream {
// IntoAsyncRead requires `St: Unpin`
// assert_not_impl!(IntoAsyncRead<PinnedTryStream<Vec<u8>, io::Error>>: Unpin);

assert_impl!(IntoStream<()>: Send);
assert_not_impl!(IntoStream<*const ()>: Send);
assert_impl!(IntoStream<()>: Sync);
assert_not_impl!(IntoStream<*const ()>: Sync);
assert_impl!(IntoStream<()>: Unpin);
assert_not_impl!(IntoStream<PhantomPinned>: Unpin);

assert_impl!(Iter<()>: Send);
assert_not_impl!(Iter<*const ()>: Send);
assert_impl!(Iter<()>: Sync);
Expand Down

0 comments on commit 9ddeb5b

Please sign in to comment.