diff --git a/src/client.rs b/src/client.rs index a5fb86f..03863b7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,9 +6,7 @@ use crate::{ transport::ConnectionErrors, Service, ServiceConnection, }; -use futures::{ - future::BoxFuture, stream::BoxStream, FutureExt, Sink, SinkExt, Stream, StreamExt, TryFutureExt, -}; +use futures::{future::BoxFuture, FutureExt, Sink, SinkExt, Stream, StreamExt, TryFutureExt}; use pin_project::pin_project; use std::{ error, @@ -19,6 +17,9 @@ use std::{ task::{Context, Poll}, }; +/// Sync version of `future::stream::BoxStream`. +pub type BoxStreamSync<'a, T> = Pin + Send + Sync + 'a>>; + /// A client for a specific service /// /// This is a wrapper around a [ServiceConnection] that serves as the entry point @@ -108,7 +109,7 @@ impl> RpcClient { &self, msg: M, ) -> result::Result< - BoxStream<'static, result::Result>>, + BoxStreamSync<'static, result::Result>>, StreamingResponseError, > where @@ -130,7 +131,7 @@ impl> RpcClient { Err(e) => Err(StreamingResponseItemError::RecvError(e)), }); // keep send alive so the request on the server side does not get cancelled - let recv = DeferDrop(recv, send).boxed(); + let recv = Box::pin(DeferDrop(recv, send)); Ok(recv) } @@ -180,7 +181,7 @@ impl> RpcClient { ) -> result::Result< ( UpdateSink, - BoxStream<'static, result::Result>>, + BoxStreamSync<'static, result::Result>>, ), BidiError, > @@ -191,12 +192,10 @@ impl> RpcClient { let (mut send, recv) = self.source.open_bi().await.map_err(BidiError::Open)?; send.send(msg).await.map_err(BidiError::::Send)?; let send = UpdateSink(send, PhantomData); - let recv = recv - .map(|x| match x { - Ok(x) => M::Response::try_from(x).map_err(|_| BidiItemError::DowncastError), - Err(e) => Err(BidiItemError::RecvError(e)), - }) - .boxed(); + let recv = Box::pin(recv.map(|x| match x { + Ok(x) => M::Response::try_from(x).map_err(|_| BidiItemError::DowncastError), + Err(e) => Err(BidiItemError::RecvError(e)), + })); Ok((send, recv)) } } diff --git a/src/transport/misc/mod.rs b/src/transport/misc/mod.rs index f48e531..598b68c 100644 --- a/src/transport/misc/mod.rs +++ b/src/transport/misc/mod.rs @@ -24,7 +24,7 @@ impl ConnectionErrors for DummyServerEndpoint { impl ConnectionCommon for DummyServerEndpoint { type RecvStream = stream::Pending>; - type SendSink = Box + Unpin + Send>; + type SendSink = Box + Unpin + Send + Sync>; } impl ServerEndpoint for DummyServerEndpoint { diff --git a/src/transport/mod.rs b/src/transport/mod.rs index d8e0f03..9d03b3e 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -34,9 +34,9 @@ pub trait ConnectionErrors: Debug + Clone + Send + Sync + 'static { /// Having this as a separate trait is useful when writing generic code that works with both. pub trait ConnectionCommon: ConnectionErrors { /// Receive side of a bidirectional typed channel - type RecvStream: Stream> + Send + Unpin + 'static; + type RecvStream: Stream> + Send + Sync + Unpin + 'static; /// Send side of a bidirectional typed channel - type SendSink: Sink + Send + Unpin + 'static; + type SendSink: Sink + Send + Sync + Unpin + 'static; } /// A connection to a specific remote machine