Skip to content

Commit

Permalink
feat: add additional Sync bounds to allow for better reuse of streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Mar 7, 2024
1 parent d9b385c commit 54c4ade
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
16 changes: 9 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
Service, ServiceConnection,
};
use futures::{
future::BoxFuture, stream::BoxStream, FutureExt, Sink, SinkExt, Stream, StreamExt, TryFutureExt,
future::BoxFuture, FutureExt, Sink, SinkExt, Stream, StreamExt, TryFutureExt,
};
use pin_project::pin_project;
use std::{
Expand All @@ -19,6 +19,9 @@ use std::{
task::{Context, Poll},
};

/// Sync version of `future::stream::BoxStream`.
pub type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;

/// A client for a specific service
///
/// This is a wrapper around a [ServiceConnection] that serves as the entry point
Expand Down Expand Up @@ -108,7 +111,7 @@ impl<S: Service, C: ServiceConnection<S>> RpcClient<S, C> {
&self,
msg: M,
) -> result::Result<
BoxStream<'static, result::Result<M::Response, StreamingResponseItemError<C>>>,
BoxStreamSync<'static, result::Result<M::Response, StreamingResponseItemError<C>>>,
StreamingResponseError<C>,
>
where
Expand All @@ -130,7 +133,7 @@ impl<S: Service, C: ServiceConnection<S>> RpcClient<S, C> {
Err(e) => Err(StreamingResponseItemError::RecvError(e)),
});
// keep send alive so the request on the server side does not get cancelled
let recv = DeferDrop(recv, send).boxed();
let recv = Box::pin(DeferDrop(recv, send));
Ok(recv)
}

Expand Down Expand Up @@ -180,7 +183,7 @@ impl<S: Service, C: ServiceConnection<S>> RpcClient<S, C> {
) -> result::Result<
(
UpdateSink<S, C, M::Update>,
BoxStream<'static, result::Result<M::Response, BidiItemError<C>>>,
BoxStreamSync<'static, result::Result<M::Response, BidiItemError<C>>>,
),
BidiError<C>,
>
Expand All @@ -191,12 +194,11 @@ impl<S: Service, C: ServiceConnection<S>> RpcClient<S, C> {
let (mut send, recv) = self.source.open_bi().await.map_err(BidiError::Open)?;
send.send(msg).await.map_err(BidiError::<C>::Send)?;
let send = UpdateSink(send, PhantomData);
let recv = recv
let recv = Box::pin(recv
.map(|x| match x {
Ok(x) => M::Response::try_from(x).map_err(|_| BidiItemError::DowncastError),
Err(e) => Err(BidiItemError::RecvError(e)),
})
.boxed();
}));
Ok((send, recv))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/transport/misc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl ConnectionErrors for DummyServerEndpoint {

impl<In: RpcMessage, Out: RpcMessage> ConnectionCommon<In, Out> for DummyServerEndpoint {
type RecvStream = stream::Pending<Result<In, Self::RecvError>>;
type SendSink = Box<dyn Sink<Out, Error = Self::SendError> + Unpin + Send>;
type SendSink = Box<dyn Sink<Out, Error = Self::SendError> + Unpin + Send + Sync>;
}

impl<In: RpcMessage, Out: RpcMessage> ServerEndpoint<In, Out> for DummyServerEndpoint {
Expand Down
4 changes: 2 additions & 2 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ pub trait ConnectionErrors: Debug + Clone + Send + Sync + 'static {
/// Having this as a separate trait is useful when writing generic code that works with both.
pub trait ConnectionCommon<In, Out>: ConnectionErrors {
/// Receive side of a bidirectional typed channel
type RecvStream: Stream<Item = Result<In, Self::RecvError>> + Send + Unpin + 'static;
type RecvStream: Stream<Item = Result<In, Self::RecvError>> + Send + Sync + Unpin + 'static;
/// Send side of a bidirectional typed channel
type SendSink: Sink<Out, Error = Self::SendError> + Send + Unpin + 'static;
type SendSink: Sink<Out, Error = Self::SendError> + Send + Sync + Unpin + 'static;
}

/// A connection to a specific remote machine
Expand Down

0 comments on commit 54c4ade

Please sign in to comment.