Skip to content

Commit

Permalink
Rename open_bi and accept_bi to just open and accept (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn authored Jun 26, 2024
2 parents c2520b8 + e42f942 commit 469be36
Show file tree
Hide file tree
Showing 14 changed files with 40 additions and 44 deletions.
4 changes: 2 additions & 2 deletions examples/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,15 @@ async fn _main_unsugared() -> anyhow::Result<()> {
}
let (server, client) = flume::connection::<Service>(1);
let to_string_service = tokio::spawn(async move {
let (mut send, mut recv) = server.accept_bi().await?;
let (mut send, mut recv) = server.accept().await?;
while let Some(item) = recv.next().await {
let item = item?;
println!("server got: {item:?}");
send.send(item.to_string()).await?;
}
anyhow::Ok(())
});
let (mut send, mut recv) = client.open_bi().await?;
let (mut send, mut recv) = client.open().await?;
let print_result_service = tokio::spawn(async move {
while let Some(item) = recv.next().await {
let item = item?;
Expand Down
2 changes: 1 addition & 1 deletion src/pattern/bidi_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
M: BidiStreamingMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, recv) = self.source.open_bi().await.map_err(Error::Open)?;
let (mut send, recv) = self.source.open().await.map_err(Error::Open)?;
send.send(msg).await.map_err(Error::<C>::Send)?;
let send = UpdateSink(send, PhantomData, Arc::clone(&self.map));
let map = Arc::clone(&self.map);
Expand Down
2 changes: 1 addition & 1 deletion src/pattern/client_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
M: ClientStreamingMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, mut recv) = self.source.open_bi().await.map_err(Error::Open)?;
let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
send.send(msg).map_err(Error::Send).await?;
let send = UpdateSink::<S, C, M::Update, SInner>(send, PhantomData, Arc::clone(&self.map));
let map = Arc::clone(&self.map);
Expand Down
2 changes: 1 addition & 1 deletion src/pattern/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
M: RpcMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, mut recv) = self.source.open_bi().await.map_err(Error::Open)?;
let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
send.send(msg).await.map_err(Error::<C>::Send)?;
let res = recv
.next()
Expand Down
2 changes: 1 addition & 1 deletion src/pattern/server_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ where
M: ServerStreamingMsg<SInner>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, recv) = self.source.open_bi().await.map_err(Error::Open)?;
let (mut send, recv) = self.source.open().await.map_err(Error::Open)?;
send.send(msg).map_err(Error::<C>::Send).await?;
let map = Arc::clone(&self.map);
let recv = recv.map(move |x| match x {
Expand Down
2 changes: 1 addition & 1 deletion src/pattern/try_server_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ where
Result<StreamCreated, M::CreateError>: Into<SInner::Res> + TryFrom<SInner::Res>,
{
let msg = self.map.req_into_outer(msg.into());
let (mut send, mut recv) = self.source.open_bi().await.map_err(Error::Open)?;
let (mut send, mut recv) = self.source.open().await.map_err(Error::Open)?;
send.send(msg).map_err(Error::Send).await?;
let map = Arc::clone(&self.map);
let Some(initial) = recv.next().await else {
Expand Down
6 changes: 1 addition & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ impl<S: Service, C: ServiceEndpoint<S>> RpcServer<S, C> {
/// Accepts a new channel from a client. The result is an [Accepting] object that
/// can be used to read the first request.
pub async fn accept(&self) -> result::Result<Accepting<S, C>, RpcServerError<C>> {
let (send, recv) = self
.source
.accept_bi()
.await
.map_err(RpcServerError::Accept)?;
let (send, recv) = self.source.accept().await.map_err(RpcServerError::Accept)?;
Ok(Accepting { send, recv })
}

Expand Down
24 changes: 12 additions & 12 deletions src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ pub trait BoxableConnection<In: RpcMessage, Out: RpcMessage>:
fn clone_box(&self) -> Box<dyn BoxableConnection<In, Out>>;

/// Open a channel to the remote che
fn open_bi_boxed(&self) -> OpenFuture<In, Out>;
fn open_boxed(&self) -> OpenFuture<In, Out>;
}

/// A boxed connection
Expand Down Expand Up @@ -242,8 +242,8 @@ impl<S: Service> ConnectionErrors for Connection<S> {
}

impl<S: Service> super::Connection<S::Res, S::Req> for Connection<S> {
async fn open_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
self.0.open_bi_boxed().await
async fn open(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
self.0.open_boxed().await
}
}

Expand Down Expand Up @@ -290,7 +290,7 @@ impl<In: RpcMessage, Out: RpcMessage> ConnectionErrors for ServerEndpoint<In, Ou
}

impl<In: RpcMessage, Out: RpcMessage> super::ServerEndpoint<In, Out> for ServerEndpoint<In, Out> {
fn accept_bi(
fn accept(
&self,
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::OpenError>> + Send
{
Expand All @@ -308,9 +308,9 @@ impl<S: Service> BoxableConnection<S::Res, S::Req> for super::quinn::QuinnConnec
Box::new(self.clone())
}

fn open_bi_boxed(&self) -> OpenFuture<S::Res, S::Req> {
fn open_boxed(&self) -> OpenFuture<S::Res, S::Req> {
let f = Box::pin(async move {
let (send, recv) = super::Connection::open_bi(self).await?;
let (send, recv) = super::Connection::open(self).await?;
// map the error types to anyhow
let send = send.sink_map_err(anyhow::Error::from);
let recv = recv.map_err(anyhow::Error::from);
Expand All @@ -329,7 +329,7 @@ impl<S: Service> BoxableServerEndpoint<S::Req, S::Res> for super::quinn::QuinnSe

fn accept_bi_boxed(&self) -> AcceptFuture<S::Req, S::Res> {
let f = async move {
let (send, recv) = super::ServerEndpoint::accept_bi(self).await?;
let (send, recv) = super::ServerEndpoint::accept(self).await?;
let send = send.sink_map_err(anyhow::Error::from);
let recv = recv.map_err(anyhow::Error::from);
anyhow::Ok((SendSink::boxed(send), RecvStream::boxed(recv)))
Expand All @@ -348,8 +348,8 @@ impl<S: Service> BoxableConnection<S::Res, S::Req> for super::flume::FlumeConnec
Box::new(self.clone())
}

fn open_bi_boxed(&self) -> OpenFuture<S::Res, S::Req> {
OpenFuture::direct(super::Connection::open_bi(self))
fn open_boxed(&self) -> OpenFuture<S::Res, S::Req> {
OpenFuture::direct(super::Connection::open(self))
}
}

Expand All @@ -360,7 +360,7 @@ impl<S: Service> BoxableServerEndpoint<S::Req, S::Res> for super::flume::FlumeSe
}

fn accept_bi_boxed(&self) -> AcceptFuture<S::Req, S::Res> {
AcceptFuture::direct(super::ServerEndpoint::accept_bi(self))
AcceptFuture::direct(super::ServerEndpoint::accept(self))
}

fn local_addr(&self) -> &[super::LocalAddr] {
Expand Down Expand Up @@ -393,14 +393,14 @@ mod tests {
let client = super::Connection::<FooService>::new(client);
// spawn echo server
tokio::spawn(async move {
while let Ok((mut send, mut recv)) = server.accept_bi().await {
while let Ok((mut send, mut recv)) = server.accept().await {
if let Some(Ok(msg)) = recv.next().await {
send.send(msg).await.ok();
}
}
anyhow::Ok(())
});
if let Ok((mut send, mut recv)) = client.open_bi().await {
if let Ok((mut send, mut recv)) = client.open().await {
send.send(1).await.ok();
let res = recv.next().await;
println!("{:?}", res);
Expand Down
14 changes: 7 additions & 7 deletions src/transport/combined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ impl<A: Connection<S::Res, S::Req>, B: Connection<S::Res, S::Req>, S: Service>
impl<A: Connection<S::Res, S::Req>, B: Connection<S::Res, S::Req>, S: Service>
Connection<S::Res, S::Req> for CombinedConnection<A, B, S>
{
async fn open_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
async fn open(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
let this = self.clone();
// try a first, then b
if let Some(a) = this.a {
let (send, recv) = a.open_bi().await.map_err(OpenBiError::A)?;
let (send, recv) = a.open().await.map_err(OpenBiError::A)?;
Ok((SendSink::A(send), RecvStream::A(recv)))
} else if let Some(b) = this.b {
let (send, recv) = b.open_bi().await.map_err(OpenBiError::B)?;
let (send, recv) = b.open().await.map_err(OpenBiError::B)?;
Ok((SendSink::B(send), RecvStream::B(recv)))
} else {
Err(OpenBiError::NoChannel)
Expand All @@ -322,18 +322,18 @@ impl<A: ServerEndpoint<S::Req, S::Res>, B: ServerEndpoint<S::Req, S::Res>, S: Se
impl<A: ServerEndpoint<S::Req, S::Res>, B: ServerEndpoint<S::Req, S::Res>, S: Service>
ServerEndpoint<S::Req, S::Res> for CombinedServerEndpoint<A, B, S>
{
async fn accept_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
let a_fut = async {
if let Some(a) = &self.a {
let (send, recv) = a.accept_bi().await.map_err(AcceptBiError::A)?;
let (send, recv) = a.accept().await.map_err(AcceptBiError::A)?;
Ok((SendSink::A(send), RecvStream::A(recv)))
} else {
std::future::pending().await
}
};
let b_fut = async {
if let Some(b) = &self.b {
let (send, recv) = b.accept_bi().await.map_err(AcceptBiError::B)?;
let (send, recv) = b.accept().await.map_err(AcceptBiError::B)?;
Ok((SendSink::B(send), RecvStream::B(recv)))
} else {
std::future::pending().await
Expand Down Expand Up @@ -377,7 +377,7 @@ mod tests {
flume::FlumeConnection<Service>,
Service,
>::new(None, None);
let res = channel.open_bi().await;
let res = channel.open().await;
assert!(matches!(res, Err(OpenBiError::NoChannel)));
}
}
6 changes: 3 additions & 3 deletions src/transport/flume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<S: Service> ConnectionErrors for FlumeServerEndpoint<S> {

type Socket<In, Out> = (self::SendSink<Out>, self::RecvStream<In>);

/// Future returned by [FlumeConnection::open_bi]
/// Future returned by [FlumeConnection::open]
pub struct OpenBiFuture<In: RpcMessage, Out: RpcMessage> {
inner: flume::r#async::SendFut<'static, Socket<Out, In>>,
res: Option<Socket<In, Out>>,
Expand Down Expand Up @@ -202,7 +202,7 @@ impl<S: Service> ConnectionCommon<S::Req, S::Res> for FlumeServerEndpoint<S> {

impl<S: Service> ServerEndpoint<S::Req, S::Res> for FlumeServerEndpoint<S> {
#[allow(refining_impl_trait)]
fn accept_bi(&self) -> AcceptBiFuture<S::Req, S::Res> {
fn accept(&self) -> AcceptBiFuture<S::Req, S::Res> {
AcceptBiFuture {
wrapped: self.stream.clone().into_recv_async(),
_p: PhantomData,
Expand All @@ -229,7 +229,7 @@ impl<S: Service> ConnectionCommon<S::Res, S::Req> for FlumeConnection<S> {

impl<S: Service> Connection<S::Res, S::Req> for FlumeConnection<S> {
#[allow(refining_impl_trait)]
fn open_bi(&self) -> OpenBiFuture<S::Res, S::Req> {
fn open(&self) -> OpenBiFuture<S::Res, S::Req> {
let (local_send, remote_recv) = flume::bounded::<S::Req>(128);
let (remote_send, local_recv) = flume::bounded::<S::Res>(128);
let remote_chan = (
Expand Down
4 changes: 2 additions & 2 deletions src/transport/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ impl<S: Service> ConnectionCommon<S::Res, S::Req> for HyperConnection<S> {
}

impl<S: Service> Connection<S::Res, S::Req> for HyperConnection<S> {
async fn open_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
async fn open(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
let (out_tx, out_rx) = flume::bounded::<io::Result<Bytes>>(32);
let req: Request<Body> = Request::post(&self.inner.uri)
.body(Body::wrap_stream(out_rx.into_stream()))
Expand Down Expand Up @@ -619,7 +619,7 @@ impl<S: Service> ServerEndpoint<S::Req, S::Res> for HyperServerEndpoint<S> {
&self.local_addr
}

async fn accept_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), AcceptBiError> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), AcceptBiError> {
let (recv, send) = self
.channel
.recv_async()
Expand Down
2 changes: 1 addition & 1 deletion src/transport/misc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<In: RpcMessage, Out: RpcMessage> ConnectionCommon<In, Out> for DummyServerE
}

impl<In: RpcMessage, Out: RpcMessage> ServerEndpoint<In, Out> for DummyServerEndpoint {
async fn accept_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
futures_lite::future::pending().await
}

Expand Down
6 changes: 3 additions & 3 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ pub trait ConnectionCommon<In, Out>: ConnectionErrors {

/// A connection to a specific remote machine
///
/// A connection can be used to open bidirectional typed channels using [`Connection::open_bi`].
/// A connection can be used to open bidirectional typed channels using [`Connection::open`].
pub trait Connection<In, Out>: ConnectionCommon<In, Out> {
/// Open a channel to the remote che
fn open_bi(
fn open(
&self,
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::OpenError>> + Send;
}
Expand All @@ -64,7 +64,7 @@ pub trait Connection<In, Out>: ConnectionCommon<In, Out> {
pub trait ServerEndpoint<In, Out>: ConnectionCommon<In, Out> {
/// Accept a new typed bidirectional channel on any of the connections we
/// have currently opened.
fn accept_bi(
fn accept(
&self,
) -> impl Future<Output = Result<(Self::SendSink, Self::RecvStream), Self::OpenError>> + Send;

Expand Down
8 changes: 4 additions & 4 deletions src/transport/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl<S: Service> ConnectionCommon<S::Req, S::Res> for QuinnServerEndpoint<S> {
}

impl<S: Service> ServerEndpoint<S::Req, S::Res> for QuinnServerEndpoint<S> {
async fn accept_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), AcceptBiError> {
async fn accept(&self) -> Result<(Self::SendSink, Self::RecvStream), AcceptBiError> {
let (send, recv) = self
.inner
.receiver
Expand Down Expand Up @@ -627,7 +627,7 @@ impl<S: Service> ConnectionCommon<S::Res, S::Req> for QuinnConnection<S> {
}

impl<S: Service> Connection<S::Res, S::Req> for QuinnConnection<S> {
async fn open_bi(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
async fn open(&self) -> Result<(Self::SendSink, Self::RecvStream), Self::OpenError> {
let (sender, receiver) = oneshot::channel();
self.inner
.sender
Expand Down Expand Up @@ -737,10 +737,10 @@ impl<In: DeserializeOwned> Stream for RecvStream<In> {
}
}

/// Error for open_bi. Currently just a quinn::ConnectionError
/// Error for open. Currently just a quinn::ConnectionError
pub type OpenBiError = quinn::ConnectionError;

/// Error for accept_bi. Currently just a quinn::ConnectionError
/// Error for accept. Currently just a quinn::ConnectionError
pub type AcceptBiError = quinn::ConnectionError;

/// CreateChannelError for quinn channels.
Expand Down

0 comments on commit 469be36

Please sign in to comment.