diff --git a/futures-core/src/stream.rs b/futures-core/src/stream.rs index ad5350b79..79aa5cba5 100644 --- a/futures-core/src/stream.rs +++ b/futures-core/src/stream.rs @@ -164,7 +164,10 @@ mod private_try_stream { /// A convenience for streams that return `Result` values that includes /// a variety of adapters tailored to such futures. -pub trait TryStream: Stream + private_try_stream::Sealed { +pub trait TryStream: + Stream::Ok, ::Error>> + + private_try_stream::Sealed +{ /// The type of successful values yielded by this future type Ok; diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 789e1ad22..23d28bf44 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -52,8 +52,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ - try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll, - TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, + try_unfold, AndThen, ErrInto, InspectErr, InspectOk, MapErr, MapOk, OrElse, TryAll, TryAny, + TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; diff --git a/futures-util/src/stream/try_stream/into_stream.rs b/futures-util/src/stream/try_stream/into_stream.rs deleted file mode 100644 index 2126258af..000000000 --- a/futures-util/src/stream/try_stream/into_stream.rs +++ /dev/null @@ -1,52 +0,0 @@ -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_project_lite::pin_project; - -pin_project! { - /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. - #[derive(Debug)] - #[must_use = "streams do nothing unless polled"] - pub struct IntoStream { - #[pin] - stream: St, - } -} - -impl IntoStream { - #[inline] - pub(super) fn new(stream: St) -> Self { - Self { stream } - } - - delegate_access_inner!(stream, St, ()); -} - -impl FusedStream for IntoStream { - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl Stream for IntoStream { - type Item = Result; - - #[inline] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.try_poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl, Item> Sink for IntoStream { - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index fe9317d7a..970ee0c64 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -36,32 +36,29 @@ delegate_all!( delegate_all!( /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. InspectOk( - Inspect, InspectOkFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] + Inspect> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Inspect::new(x, inspect_ok_fn(f))] ); delegate_all!( /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. InspectErr( - Inspect, InspectErrFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] + Inspect> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Inspect::new(x, inspect_err_fn(f))] ); -mod into_stream; -pub use self::into_stream::IntoStream; - delegate_all!( /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. MapOk( - Map, MapOkFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] + Map> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Map::new(x, map_ok_fn(f))] ); delegate_all!( /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. MapErr( - Map, MapErrFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] + Map> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Map::new(x, map_err_fn(f))] ); mod or_else; @@ -352,34 +349,6 @@ pub trait TryStreamExt: TryStream { assert_stream::, _>(InspectErr::new(self, f)) } - /// Wraps a [`TryStream`] into a type that implements - /// [`Stream`](futures_core::stream::Stream) - /// - /// [`TryStream`]s currently do not implement the - /// [`Stream`](futures_core::stream::Stream) trait because of limitations - /// of the compiler. - /// - /// # Examples - /// - /// ``` - /// use futures::stream::{Stream, TryStream, TryStreamExt}; - /// - /// # type T = i32; - /// # type E = (); - /// fn make_try_stream() -> impl TryStream { // ... } - /// # futures::stream::empty() - /// # } - /// fn take_stream(stream: impl Stream>) { /* ... */ } - /// - /// take_stream(make_try_stream().into_stream()); - /// ``` - fn into_stream(self) -> IntoStream - where - Self: Sized, - { - assert_stream::, _>(IntoStream::new(self)) - } - /// Creates a future that attempts to resolve the next item in the stream. /// If an error is encountered before the next item, the error is returned /// instead. diff --git a/futures-util/src/stream/try_stream/try_buffer_unordered.rs b/futures-util/src/stream/try_stream/try_buffer_unordered.rs index 1d3c38b1d..af6994912 100644 --- a/futures-util/src/stream/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/stream/try_stream/try_buffer_unordered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt}; +use crate::stream::{Fuse, FuturesUnordered, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; use futures_core::future::TryFuture; @@ -17,7 +17,7 @@ pin_project! { where St: TryStream { #[pin] - stream: Fuse>, + stream: Fuse, in_progress_queue: FuturesUnordered, max: Option, } @@ -30,13 +30,13 @@ where { pub(super) fn new(stream: St, n: Option) -> Self { Self { - stream: IntoStream::new(stream).fuse(), + stream: stream.fuse(), in_progress_queue: FuturesUnordered::new(), max: n.and_then(NonZeroUsize::new), } } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } impl Stream for TryBufferUnordered diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index 6b9dd0f7e..4c98615eb 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt}; +use crate::stream::{Fuse, FuturesOrdered, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; use futures_core::future::TryFuture; @@ -18,7 +18,7 @@ pin_project! { St::Ok: TryFuture, { #[pin] - stream: Fuse>, + stream: Fuse, in_progress_queue: FuturesOrdered, max: Option, } @@ -31,13 +31,13 @@ where { pub(super) fn new(stream: St, n: Option) -> Self { Self { - stream: IntoStream::new(stream).fuse(), + stream: stream.fuse(), in_progress_queue: FuturesOrdered::new(), max: n.and_then(NonZeroUsize::new), } } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } impl Stream for TryBuffered diff --git a/futures-util/src/stream/try_stream/try_chunks.rs b/futures-util/src/stream/try_stream/try_chunks.rs index ec53f4bd1..a6184ee44 100644 --- a/futures-util/src/stream/try_stream/try_chunks.rs +++ b/futures-util/src/stream/try_stream/try_chunks.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, IntoStream, StreamExt}; +use crate::stream::{Fuse, StreamExt}; use alloc::vec::Vec; use core::pin::Pin; @@ -16,7 +16,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct TryChunks { #[pin] - stream: Fuse>, + stream: Fuse, items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } @@ -26,11 +26,7 @@ impl TryChunks { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { - stream: IntoStream::new(stream).fuse(), - items: Vec::with_capacity(capacity), - cap: capacity, - } + Self { stream: stream.fuse(), items: Vec::with_capacity(capacity), cap: capacity } } fn take(self: Pin<&mut Self>) -> Vec { @@ -38,7 +34,7 @@ impl TryChunks { mem::replace(self.project().items, Vec::with_capacity(cap)) } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } type TryChunksStreamError = TryChunksError<::Ok, ::Error>; diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index a74dfc451..9987b0dc3 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -13,8 +13,6 @@ use crate::future::Either; use crate::stream::stream::flatten_unordered::{ FlattenUnorderedWithFlowController, FlowController, FlowStep, }; -use crate::stream::IntoStream; -use crate::TryStreamExt; delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. @@ -128,7 +126,7 @@ where { // Item is either an inner stream or a stream containing a single error. // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. - type Item = Either, SingleStreamResult>; + type Item = Either>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let item = ready!(self.project().stream.try_poll_next(cx)); @@ -136,7 +134,7 @@ where let out = match item { Some(res) => match res { // Emit successful inner stream as is - Ok(stream) => Either::Left(stream.into_stream()), + Ok(stream) => Either::Left(stream), // Wrap an error into a stream containing a single item err @ Err(_) => { let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); diff --git a/futures-util/src/stream/try_stream/try_forward.rs b/futures-util/src/stream/try_stream/try_forward.rs index 52c28afa3..74cc33d4e 100644 --- a/futures-util/src/stream/try_stream/try_forward.rs +++ b/futures-util/src/stream/try_stream/try_forward.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, IntoStream, Stream, TryStream}; +use crate::stream::{Fuse, Stream, TryStream}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::ready; @@ -15,14 +15,14 @@ pin_project! { #[pin] sink: Option, #[pin] - stream: Fuse>, + stream: Fuse, buffered_item: Option, } } impl TryForward { pub(crate) fn new(stream: St, sink: Si) -> Self { - Self { sink: Some(sink), stream: Fuse::new(IntoStream::new(stream)), buffered_item: None } + Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None } } } diff --git a/futures-util/src/stream/try_stream/try_ready_chunks.rs b/futures-util/src/stream/try_stream/try_ready_chunks.rs index 8b1470ea2..891079093 100644 --- a/futures-util/src/stream/try_stream/try_ready_chunks.rs +++ b/futures-util/src/stream/try_stream/try_ready_chunks.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, IntoStream, StreamExt}; +use crate::stream::{Fuse, StreamExt}; use alloc::vec::Vec; use core::fmt; @@ -15,7 +15,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct TryReadyChunks { #[pin] - stream: Fuse>, + stream: Fuse, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } } @@ -24,10 +24,10 @@ impl TryReadyChunks { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { stream: IntoStream::new(stream).fuse(), cap: capacity } + Self { stream: stream.fuse(), cap: capacity } } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } type TryReadyChunksStreamError = diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 75ed663f9..43ba31a4f 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1361,13 +1361,6 @@ mod stream { // IntoAsyncRead requires `St: Unpin` // assert_not_impl!(IntoAsyncRead, io::Error>>: Unpin); - assert_impl!(IntoStream<()>: Send); - assert_not_impl!(IntoStream<*const ()>: Send); - assert_impl!(IntoStream<()>: Sync); - assert_not_impl!(IntoStream<*const ()>: Sync); - assert_impl!(IntoStream<()>: Unpin); - assert_not_impl!(IntoStream: Unpin); - assert_impl!(Iter<()>: Send); assert_not_impl!(Iter<*const ()>: Send); assert_impl!(Iter<()>: Sync);