Skip to content

Commit

Permalink
feat: use afit and rpitit to optimize Service trait (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
PureWhiteWu authored Oct 20, 2023
1 parent 18fdd56 commit 1fdf792
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 202 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 10 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Motore is greatly inspired by [`Tower`][tower].

## Overview

Motore uses GAT and TAIT to reduce the mental burden of writing asynchronous code, especially to avoid the overhead of `Box` to make people less anxious.
Motore uses AFIT and RPITIT to reduce the mental burden of writing asynchronous code, especially to avoid the overhead of `Box` to make people less anxious.

The core abstraciton of Motore is the `Service` trait:

Expand All @@ -29,22 +29,14 @@ pub trait Service<Cx, Request> {
/// Errors produced by the service.
type Error;

/// The future response value.
type Future<'cx>: Future<Output = Result<Self::Response, Self::Error>> + Send + 'cx
where
Cx: 'cx,
Self: 'cx;

/// Process the request and return the response asynchronously.
fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx>
where
's: 'cx;
async fn call<'s, 'cx>(&'s self, cx: &'cx mut Cx, req: Request) -> Result<Self::Response, Self::Error>;
}
```

## Getting Started

Combing GAT and `impl_trait_in_assoc_type` together, we can write asynchronous code in a very concise and readable way.
Combing AFIT and RPITIT together, we can write asynchronous code in a very concise and readable way.

```rust
pub struct Timeout<S> {
Expand All @@ -63,20 +55,13 @@ where

type Error = BoxError;

type Future<'cx> = impl Future<Output = Result<S::Response, Self::Error>> + Send + 'cx;

fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx>
where
's: 'cx,
{
async move {
let sleep = tokio::time::sleep(self.duration);
tokio::select! {
r = self.inner.call(cx, req) => {
r.map_err(Into::into)
},
_ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()),
}
async fn call<'s, 'cx>(&'s self, cx: &'cx mut Cx, req: Req) -> Result<Self::Response, Self::Error> {
let sleep = tokio::time::sleep(self.duration);
tokio::select! {
r = self.inner.call(cx, req) => {
r.map_err(Into::into)
},
_ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()),
}
}
}
Expand Down Expand Up @@ -106,10 +91,6 @@ where

## FAQ

### Why do we need GAT?

https://www.cloudwego.io/zh/docs/motore/faq/q1_gat/

### Where's the `poll_ready`(a.k.a. backpressure)?

https://www.cloudwego.io/zh/docs/motore/faq/q2_pull_ready/
Expand Down
7 changes: 5 additions & 2 deletions motore-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "motore-macros"
version = "0.3.1"
version = "0.4.0"
edition = "2021"
description = """
Motore's proc macros.
Expand All @@ -27,6 +27,9 @@ proc-macro2 = "1"
quote = "1"
syn = { version = "1", features = ["full"] }


[dev-dependencies]
motore = { path = "../motore" }

[features]
default = []
service_send = []
32 changes: 20 additions & 12 deletions motore-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub fn service(_args: TokenStream, input: TokenStream) -> TokenStream {
}

fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
let generic_params = &item.generics.params;
let generic_params: &syn::punctuated::Punctuated<syn::GenericParam, syn::token::Comma> =
&item.generics.params;
let call_method = item
.items
.iter_mut()
Expand Down Expand Up @@ -89,7 +90,7 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
}
};

let cx_is_generic = generic_params
let _cx_is_generic = generic_params
.iter()
.filter_map(|p| match p {
syn::GenericParam::Type(t) => Some(t),
Expand Down Expand Up @@ -128,9 +129,16 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
}
};
sig.asyncness = None;
sig.generics = parse_quote!(<'cx, 's>);
sig.generics.where_clause = Some(parse_quote!(where 's: 'cx));
sig.output = parse_quote!(-> Self::Future<'cx>);
sig.generics = parse_quote!(<'s, 'cx>);
// sig.generics.where_clause = Some(parse_quote!(where 's: 'cx));
#[cfg(feature = "service_send")]
{
sig.output = parse_quote!(-> impl ::std::future::Future<Output = Result<Self::Response, Self::Error>> + Send);
}
#[cfg(not(feature = "service_send"))]
{
sig.output = parse_quote!(-> impl ::std::future::Future<Output = Result<Self::Response, Self::Error>>);
}
sig.inputs[0] = parse_quote!(&'s self);
let old_stmts = &call_method.block.stmts;
call_method.block.stmts = vec![parse_quote!(async move { #(#old_stmts)* })];
Expand All @@ -143,14 +151,14 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
type Error = #err_ty;
));

let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter();
// let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter();

item.items.push(parse_quote!(
type Future<'cx> = impl ::std::future::Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
#(#cx_bound)*
Self:'cx;
));
// item.items.push(parse_quote!(
// type Future<'cx> = impl ::std::future::Future<Output = Result<Self::Response,
// Self::Error>> + 'cx where
// #(#cx_bound)*
// Self:'cx;
// ));

Ok(())
}
8 changes: 6 additions & 2 deletions motore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "motore"
version = "0.3.3"
version = "0.4.0"
edition = "2021"
description = """
Motore is a library of modular and reusable components for building robust
Expand All @@ -21,7 +21,7 @@ keywords = ["io", "async", "non-blocking", "futures", "service"]
maintenance = { status = "actively-developed" }

[dependencies]
motore-macros = { path = "../motore-macros", version = "0.3" }
motore-macros = { path = "../motore-macros", version = "0.4" }

futures = "0.3"
tokio = { version = "1", features = ["time", "macros"] }
Expand All @@ -32,7 +32,11 @@ tower = { version = "0.4", optional = true }
http = "0.2"

[features]
default = ["service_send"]
# enable the tower adapter
tower = ["dep:tower"]
# indicates the Service should be Send
service_send = ["motore-macros/service_send"]

[package.metadata.docs.rs]
all-features = true
Expand Down
22 changes: 9 additions & 13 deletions motore/src/layer/layer_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use super::Layer;
/// # Example
///
/// ```rust
/// # #![feature(type_alias_impl_trait)]
/// #
/// # use futures::Future;
/// # use motore::layer::{Layer, layer_fn};
/// # use motore::service::{service_fn, Service, ServiceFn};
/// # use std::convert::Infallible;
Expand All @@ -30,24 +28,22 @@ use super::Layer;
///
/// impl<S, Cx, Request> Service<Cx, Request> for LogService<S>
/// where
/// S: Service<Cx, Request>,
/// Request: fmt::Debug,
/// S: Service<Cx, Request> + Send + Sync,
/// Request: fmt::Debug + Send,
/// Cx: Send,
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type Future<'cx> = S::Future<'cx>
/// where
/// Cx: 'cx,
/// S: 'cx;
///
/// fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx>
/// where
/// 's: 'cx,
/// {
/// async fn call<'s, 'cx>(
/// &'s self,
/// cx: &'cx mut Cx,
/// req: Request,
/// ) -> Result<Self::Response, Self::Error> {
/// // Log the request
/// println!("req = {:?}, target = {:?}", req, self.target);
///
/// self.service.call(cx, req)
/// self.service.call(cx, req).await
/// }
/// }
///
Expand Down
1 change: 0 additions & 1 deletion motore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![feature(impl_trait_in_assoc_type)]
#![doc(
html_logo_url = "https://github.com/cloudwego/motore/raw/main/.github/assets/logo.png?sanitize=true"
)]
Expand Down
29 changes: 22 additions & 7 deletions motore/src/make/make_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ use crate::{sealed::Sealed, UnaryService};
pub trait MakeConnection<Address>: Sealed<(Address,)> {
type Connection: AsyncRead + AsyncWrite + Unpin + Send;
type Error;
type Future<'s>: Future<Output = Result<Self::Connection, Self::Error>> + Send + 's
where
Self: 's,
Address: 's;

fn make_connection(&self, req: Address) -> Self::Future<'_>;
#[cfg(feature = "service_send")]
fn make_connection(
&self,
req: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
#[cfg(not(feature = "service_send"))]
fn make_connection(
&self,
req: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>>;
}

impl<S, Address> Sealed<(Address,)> for S where S: UnaryService<Address> {}
Expand All @@ -28,9 +33,19 @@ where
{
type Connection = S::Response;
type Error = S::Error;
type Future<'s> = impl Future<Output = Result<Self::Connection, Self::Error>> + Send + 's where Self: 's, Address:'s;

fn make_connection(&self, addr: Address) -> Self::Future<'_> {
#[cfg(feature = "service_send")]
fn make_connection(
&self,
addr: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
self.call(addr)
}
#[cfg(not(feature = "service_send"))]
fn make_connection(
&self,
addr: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> {
self.call(addr)
}
}
27 changes: 17 additions & 10 deletions motore/src/service/ext/map_err.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::{Future, TryFutureExt};
use std::future::Future;

use futures::TryFutureExt;

use crate::Service;

Expand All @@ -20,15 +22,20 @@ where

type Error = E;

type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Cx: 'cx,
Self: 'cx;

fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx>
where
's: 'cx,
{
#[cfg(feature = "service_send")]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send {
self.inner.call(cx, req).map_err(self.f.clone())
}
#[cfg(not(feature = "service_send"))]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> {
self.inner.call(cx, req).map_err(self.f.clone())
}
}
26 changes: 16 additions & 10 deletions motore/src/service/ext/map_response.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;
use std::{fmt, future::Future};

use futures::{Future, TryFutureExt};
use futures::TryFutureExt;

use crate::Service;
/// Service returned by the [`map_response`] combinator.
Expand All @@ -19,15 +19,21 @@ where
{
type Response = Response;
type Error = S::Error;
type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Cx: 'cx,
Self: 'cx;

fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx>
where
's: 'cx,
{
#[cfg(feature = "service_send")]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send {
self.inner.call(cx, req).map_ok(self.f.clone())
}
#[cfg(not(feature = "service_send"))]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> {
self.inner.call(cx, req).map_ok(self.f.clone())
}
}
Expand Down
Loading

0 comments on commit 1fdf792

Please sign in to comment.