From a302539011a5fcaeb48623fe81df014780404925 Mon Sep 17 00:00:00 2001 From: Pure White Date: Thu, 19 Oct 2023 14:38:15 +0800 Subject: [PATCH] feat: use afit and rpitit to optimize Service trait --- Cargo.lock | 4 +- README.md | 39 ++--- motore-macros/Cargo.toml | 7 +- motore-macros/src/lib.rs | 32 ++-- motore/Cargo.toml | 8 +- motore/src/layer/layer_fn.rs | 22 ++- motore/src/lib.rs | 1 - motore/src/make/make_connection.rs | 29 +++- motore/src/service/ext/map_err.rs | 27 ++-- motore/src/service/ext/map_response.rs | 26 ++-- motore/src/service/mod.rs | 202 ++++++++++++++++++------- motore/src/service/service_fn.rs | 18 +-- motore/src/service/tower_adapter.rs | 62 ++++++-- motore/src/timeout.rs | 33 ++-- motore/src/utils/either.rs | 24 +-- 15 files changed, 332 insertions(+), 202 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba1da44..cc0f641 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,7 +206,7 @@ dependencies = [ [[package]] name = "motore" -version = "0.3.3" +version = "0.4.0" dependencies = [ "futures", "http", @@ -218,7 +218,7 @@ dependencies = [ [[package]] name = "motore-macros" -version = "0.3.1" +version = "0.4.0" dependencies = [ "motore", "proc-macro2", diff --git a/README.md b/README.md index 9402f2f..f1bd2b3 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Motore is greatly inspired by [`Tower`][tower]. ## Overview -Motore uses GAT and TAIT to reduce the mental burden of writing asynchronous code, especially to avoid the overhead of `Box` to make people less anxious. +Motore uses AFIT and RPITIT to reduce the mental burden of writing asynchronous code, especially to avoid the overhead of `Box` to make people less anxious. The core abstraciton of Motore is the `Service` trait: @@ -29,22 +29,14 @@ pub trait Service { /// Errors produced by the service. type Error; - /// The future response value. - type Future<'cx>: Future> + Send + 'cx - where - Cx: 'cx, - Self: 'cx; - /// Process the request and return the response asynchronously. - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx; + async fn call<'s, 'cx>(&'s self, cx: &'cx mut Cx, req: Request) -> Result; } ``` ## Getting Started -Combing GAT and `impl_trait_in_assoc_type` together, we can write asynchronous code in a very concise and readable way. +Combing AFIT and RPITIT together, we can write asynchronous code in a very concise and readable way. ```rust pub struct Timeout { @@ -63,20 +55,13 @@ where type Error = BoxError; - type Future<'cx> = impl Future> + Send + 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let sleep = tokio::time::sleep(self.duration); - tokio::select! { - r = self.inner.call(cx, req) => { - r.map_err(Into::into) - }, - _ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()), - } + async fn call<'s, 'cx>(&'s self, cx: &'cx mut Cx, req: Req) -> Result { + let sleep = tokio::time::sleep(self.duration); + tokio::select! { + r = self.inner.call(cx, req) => { + r.map_err(Into::into) + }, + _ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()), } } } @@ -106,10 +91,6 @@ where ## FAQ -### Why do we need GAT? - -https://www.cloudwego.io/zh/docs/motore/faq/q1_gat/ - ### Where's the `poll_ready`(a.k.a. backpressure)? https://www.cloudwego.io/zh/docs/motore/faq/q2_pull_ready/ diff --git a/motore-macros/Cargo.toml b/motore-macros/Cargo.toml index c5528aa..bfbe9ea 100644 --- a/motore-macros/Cargo.toml +++ b/motore-macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "motore-macros" -version = "0.3.1" +version = "0.4.0" edition = "2021" description = """ Motore's proc macros. @@ -27,6 +27,9 @@ proc-macro2 = "1" quote = "1" syn = { version = "1", features = ["full"] } - [dev-dependencies] motore = { path = "../motore" } + +[features] +default = [] +service_send = [] diff --git a/motore-macros/src/lib.rs b/motore-macros/src/lib.rs index 7716d24..6e2f338 100644 --- a/motore-macros/src/lib.rs +++ b/motore-macros/src/lib.rs @@ -42,7 +42,8 @@ pub fn service(_args: TokenStream, input: TokenStream) -> TokenStream { } fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { - let generic_params = &item.generics.params; + let generic_params: &syn::punctuated::Punctuated = + &item.generics.params; let call_method = item .items .iter_mut() @@ -89,7 +90,7 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { } }; - let cx_is_generic = generic_params + let _cx_is_generic = generic_params .iter() .filter_map(|p| match p { syn::GenericParam::Type(t) => Some(t), @@ -128,9 +129,16 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { } }; sig.asyncness = None; - sig.generics = parse_quote!(<'cx, 's>); - sig.generics.where_clause = Some(parse_quote!(where 's: 'cx)); - sig.output = parse_quote!(-> Self::Future<'cx>); + sig.generics = parse_quote!(<'s, 'cx>); + // sig.generics.where_clause = Some(parse_quote!(where 's: 'cx)); + #[cfg(feature = "service_send")] + { + sig.output = parse_quote!(-> impl ::std::future::Future> + Send); + } + #[cfg(not(feature = "service_send"))] + { + sig.output = parse_quote!(-> impl ::std::future::Future>); + } sig.inputs[0] = parse_quote!(&'s self); let old_stmts = &call_method.block.stmts; call_method.block.stmts = vec![parse_quote!(async move { #(#old_stmts)* })]; @@ -143,14 +151,14 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { type Error = #err_ty; )); - let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter(); + // let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter(); - item.items.push(parse_quote!( - type Future<'cx> = impl ::std::future::Future> + 'cx - where - #(#cx_bound)* - Self:'cx; - )); + // item.items.push(parse_quote!( + // type Future<'cx> = impl ::std::future::Future> + 'cx where + // #(#cx_bound)* + // Self:'cx; + // )); Ok(()) } diff --git a/motore/Cargo.toml b/motore/Cargo.toml index 9b20808..2732733 100644 --- a/motore/Cargo.toml +++ b/motore/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "motore" -version = "0.3.3" +version = "0.4.0" edition = "2021" description = """ Motore is a library of modular and reusable components for building robust @@ -21,7 +21,7 @@ keywords = ["io", "async", "non-blocking", "futures", "service"] maintenance = { status = "actively-developed" } [dependencies] -motore-macros = { path = "../motore-macros", version = "0.3" } +motore-macros = { path = "../motore-macros", version = "0.4" } futures = "0.3" tokio = { version = "1", features = ["time", "macros"] } @@ -32,7 +32,11 @@ tower = { version = "0.4", optional = true } http = "0.2" [features] +default = ["service_send"] +# enable the tower adapter tower = ["dep:tower"] +# indicates the Service should be Send +service_send = ["motore-macros/service_send"] [package.metadata.docs.rs] all-features = true diff --git a/motore/src/layer/layer_fn.rs b/motore/src/layer/layer_fn.rs index e0bd869..999667b 100644 --- a/motore/src/layer/layer_fn.rs +++ b/motore/src/layer/layer_fn.rs @@ -14,9 +14,7 @@ use super::Layer; /// # Example /// /// ```rust -/// # #![feature(type_alias_impl_trait)] /// # -/// # use futures::Future; /// # use motore::layer::{Layer, layer_fn}; /// # use motore::service::{service_fn, Service, ServiceFn}; /// # use std::convert::Infallible; @@ -30,24 +28,22 @@ use super::Layer; /// /// impl Service for LogService /// where -/// S: Service, -/// Request: fmt::Debug, +/// S: Service + Send + Sync, +/// Request: fmt::Debug + Send, +/// Cx: Send, /// { /// type Response = S::Response; /// type Error = S::Error; -/// type Future<'cx> = S::Future<'cx> -/// where -/// Cx: 'cx, -/// S: 'cx; /// -/// fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> -/// where -/// 's: 'cx, -/// { +/// async fn call<'s, 'cx>( +/// &'s self, +/// cx: &'cx mut Cx, +/// req: Request, +/// ) -> Result { /// // Log the request /// println!("req = {:?}, target = {:?}", req, self.target); /// -/// self.service.call(cx, req) +/// self.service.call(cx, req).await /// } /// } /// diff --git a/motore/src/lib.rs b/motore/src/lib.rs index 47731f2..f6ee39f 100644 --- a/motore/src/lib.rs +++ b/motore/src/lib.rs @@ -1,5 +1,4 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -#![feature(impl_trait_in_assoc_type)] #![doc( html_logo_url = "https://github.com/cloudwego/motore/raw/main/.github/assets/logo.png?sanitize=true" )] diff --git a/motore/src/make/make_connection.rs b/motore/src/make/make_connection.rs index 5242a02..6ec588f 100644 --- a/motore/src/make/make_connection.rs +++ b/motore/src/make/make_connection.rs @@ -11,12 +11,17 @@ use crate::{sealed::Sealed, UnaryService}; pub trait MakeConnection
: Sealed<(Address,)> { type Connection: AsyncRead + AsyncWrite + Unpin + Send; type Error; - type Future<'s>: Future> + Send + 's - where - Self: 's, - Address: 's; - fn make_connection(&self, req: Address) -> Self::Future<'_>; + #[cfg(feature = "service_send")] + fn make_connection( + &self, + req: Address, + ) -> impl Future> + Send; + #[cfg(not(feature = "service_send"))] + fn make_connection( + &self, + req: Address, + ) -> impl Future>; } impl Sealed<(Address,)> for S where S: UnaryService
{} @@ -28,9 +33,19 @@ where { type Connection = S::Response; type Error = S::Error; - type Future<'s> = impl Future> + Send + 's where Self: 's, Address:'s; - fn make_connection(&self, addr: Address) -> Self::Future<'_> { + #[cfg(feature = "service_send")] + fn make_connection( + &self, + addr: Address, + ) -> impl Future> + Send { + self.call(addr) + } + #[cfg(not(feature = "service_send"))] + fn make_connection( + &self, + addr: Address, + ) -> impl Future> { self.call(addr) } } diff --git a/motore/src/service/ext/map_err.rs b/motore/src/service/ext/map_err.rs index b3d01da..992925f 100644 --- a/motore/src/service/ext/map_err.rs +++ b/motore/src/service/ext/map_err.rs @@ -1,4 +1,6 @@ -use futures::{Future, TryFutureExt}; +use std::future::Future; + +use futures::TryFutureExt; use crate::Service; @@ -20,15 +22,20 @@ where type Error = E; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { + self.inner.call(cx, req).map_err(self.f.clone()) + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> { self.inner.call(cx, req).map_err(self.f.clone()) } } diff --git a/motore/src/service/ext/map_response.rs b/motore/src/service/ext/map_response.rs index 84760dc..71ecfc8 100644 --- a/motore/src/service/ext/map_response.rs +++ b/motore/src/service/ext/map_response.rs @@ -1,6 +1,6 @@ -use std::fmt; +use std::{fmt, future::Future}; -use futures::{Future, TryFutureExt}; +use futures::TryFutureExt; use crate::Service; /// Service returned by the [`map_response`] combinator. @@ -19,15 +19,21 @@ where { type Response = Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { + self.inner.call(cx, req).map_ok(self.f.clone()) + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> { self.inner.call(cx, req).map_ok(self.f.clone()) } } diff --git a/motore/src/service/mod.rs b/motore/src/service/mod.rs index b79266e..17dc584 100644 --- a/motore/src/service/mod.rs +++ b/motore/src/service/mod.rs @@ -6,7 +6,10 @@ use std::{fmt, future::Future, sync::Arc}; +#[cfg(feature = "service_send")] use futures::future::BoxFuture; +#[cfg(not(feature = "service_send"))] +use futures::future::LocalBoxFuture as BoxFuture; mod ext; mod service_fn; @@ -45,10 +48,6 @@ pub use tower_adapter::*; /// As an example, here is how an HTTP request is processed by a server: /// /// ```rust -/// #![feature(impl_trait_in_assoc_type)] -/// -/// use std::future::Future; -/// /// use http::{Request, Response, StatusCode}; /// use motore::Service; /// @@ -60,12 +59,12 @@ pub use tower_adapter::*; /// { /// type Response = Response>; /// type Error = http::Error; -/// type Future<'cx> = impl Future> + 'cx; /// -/// fn call<'cx, 's>(&'s self, _cx: &'cx mut Cx, _req: Request>) -> Self::Future<'cx> -/// where -/// 's: 'cx, -/// { +/// async fn call<'s, 'cx>( +/// &'s self, +/// _cx: &'cx mut Cx, +/// _req: Request>, +/// ) -> Result { /// // create the body /// let body: Vec = "hello, world!\n".as_bytes().to_owned(); /// // Create the HTTP response @@ -73,8 +72,8 @@ pub use tower_adapter::*; /// .status(StatusCode::OK) /// .body(body) /// .expect("Unable to create `http::Response`"); -/// // create a response in a future. -/// async { Ok(resp) } +/// // create a response in a future and return +/// async { Ok(resp) }.await /// } /// } /// ``` @@ -94,16 +93,21 @@ pub trait Service { /// Errors produced by the service. type Error; - /// The future response value. - type Future<'cx>: Future> + Send + 'cx - where - Cx: 'cx, - Self: 'cx; + /// Process the request and return the response asynchronously. + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future> + Send; /// Process the request and return the response asynchronously. - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx; + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future>; } macro_rules! impl_service_ref { @@ -116,12 +120,20 @@ macro_rules! impl_service_ref { type Error = T::Error; - type Future<'cx> = T::Future<'cx> where Cx: 'cx, Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { + (&**self).call(cx, req) + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> { (&**self).call(cx, req) } } @@ -141,9 +153,15 @@ macro_rules! impl_unary_service_ref { type Error = T::Error; - type Future<'s> = T::Future<'s> where Self: 's; - - fn call(&self, req: Req) -> Self::Future<'_> { + #[cfg(feature = "service_send")] + fn call( + &self, + req: Req, + ) -> impl Future> + Send { + (&**self).call(req) + } + #[cfg(not(feature = "service_send"))] + fn call(&self, req: Req) -> impl Future> { (&**self).call(req) } } @@ -155,11 +173,13 @@ pub trait UnaryService { type Response; type Error; - type Future<'s>: Future> + Send + 's - where - Self: 's; - - fn call(&self, req: Request) -> Self::Future<'_>; + #[cfg(feature = "service_send")] + fn call( + &self, + req: Request, + ) -> impl Future> + Send; + #[cfg(not(feature = "service_send"))] + fn call(&self, req: Request) -> impl Future>; } impl_unary_service_ref!(Arc); @@ -176,11 +196,28 @@ pub struct BoxService { impl BoxService { /// Create a new `BoxService`. + #[cfg(feature = "service_send")] pub fn new(s: S) -> Self where S: Service + Send + Sync + 'static, T: 'static, - for<'cx> S::Future<'cx>: Send, + { + let raw = Box::into_raw(Box::new(s)) as *mut (); + BoxService { + raw, + vtable: ServiceVtable { + call: call::, + drop: drop::, + }, + } + } + + /// Create a new `BoxService`. + #[cfg(not(feature = "service_send"))] + pub fn new(s: S) -> Self + where + S: Service + 'static, + T: 'static, { let raw = Box::into_raw(Box::new(s)) as *mut (); BoxService { @@ -210,14 +247,20 @@ impl Service for BoxService { type Error = E; - type Future<'cx> = BoxFuture<'cx, Result> - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: T) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> + Send { + unsafe { (self.vtable.call)(self.raw, cx, req) } + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> { unsafe { (self.vtable.call)(self.raw, cx, req) } } } @@ -225,8 +268,9 @@ impl Service for BoxService { /// # Safety /// /// The contained `Service` must be `Send` and `Sync` required by the bounds of `new` and `clone`. +#[cfg(feature = "service_send")] unsafe impl Send for BoxService {} - +#[cfg(feature = "service_send")] unsafe impl Sync for BoxService {} struct ServiceVtable { @@ -241,6 +285,20 @@ struct ServiceVtable { /// /// This is similar to [`BoxService`](BoxService) except the resulting /// service implements [`Clone`]. +#[cfg(feature = "service_send")] +pub struct BoxCloneService { + raw: *mut (), + vtable: CloneServiceVtable, +} + +/// A [`Clone`] boxed [`Service`]. +/// +/// [`BoxCloneService`] turns a service into a trait object, allowing the +/// response future type to be dynamic, and allowing the service to be cloned. +/// +/// This is similar to [`BoxService`](BoxService) except the resulting +/// service implements [`Clone`]. +#[cfg(not(feature = "service_send"))] pub struct BoxCloneService { raw: *mut (), vtable: CloneServiceVtable, @@ -248,11 +306,29 @@ pub struct BoxCloneService { impl BoxCloneService { /// Create a new `BoxCloneService`. + #[cfg(feature = "service_send")] pub fn new(s: S) -> Self where S: Service + Clone + Send + Sync + 'static, T: 'static, - for<'cx> S::Future<'cx>: Send, + { + let raw = Box::into_raw(Box::new(s)) as *mut (); + BoxCloneService { + raw, + vtable: CloneServiceVtable { + call: call::, + clone: clone::, + drop: drop::, + }, + } + } + + /// Create a new `BoxCloneService`. + #[cfg(not(feature = "service_send"))] + pub fn new(s: S) -> Self + where + S: Service + Clone + 'static, + T: 'static, { let raw = Box::into_raw(Box::new(s)) as *mut (); BoxCloneService { @@ -289,14 +365,20 @@ impl Service for BoxCloneService { type Error = E; - type Future<'cx> = BoxFuture<'cx, Result> - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: T) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> + Send { + unsafe { (self.vtable.call)(self.raw, cx, req) } + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> { unsafe { (self.vtable.call)(self.raw, cx, req) } } } @@ -304,8 +386,9 @@ impl Service for BoxCloneService { /// # Safety /// /// The contained `Service` must be `Send` and `Sync` required by the bounds of `new` and `clone`. +#[cfg(feature = "service_send")] unsafe impl Send for BoxCloneService {} - +#[cfg(feature = "service_send")] unsafe impl Sync for BoxCloneService {} struct CloneServiceVtable { @@ -322,18 +405,27 @@ fn call( where Req: 'static, S: Service + 'static, - for<'cx> S::Future<'cx>: Send, { let fut = S::call(unsafe { (raw as *mut S).as_mut().unwrap() }, cx, req); Box::pin(fut) } +#[cfg(feature = "service_send")] fn clone + 'static + Sync>( raw: *mut (), ) -> BoxCloneService where Req: 'static, - for<'cx> S::Future<'cx>: Send, +{ + BoxCloneService::new(S::clone(unsafe { (raw as *mut S).as_ref().unwrap() })) +} + +#[cfg(not(feature = "service_send"))] +fn clone + 'static>( + raw: *mut (), +) -> BoxCloneService +where + Req: 'static, { BoxCloneService::new(S::clone(unsafe { (raw as *mut S).as_ref().unwrap() })) } diff --git a/motore/src/service/service_fn.rs b/motore/src/service/service_fn.rs index 60cb840..5ad80c8 100644 --- a/motore/src/service/service_fn.rs +++ b/motore/src/service/service_fn.rs @@ -11,9 +11,6 @@ use crate::service::Service; /// # Example /// /// ```rust -/// # #![feature(type_alias_impl_trait)] -/// # -/// # use futures::Future; /// # use motore::service::{service_fn, Service, ServiceFn}; /// # use motore::BoxError; /// # @@ -51,15 +48,12 @@ where { type Response = R; type Error = E; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future> { (self.f).call(cx, req) } } diff --git a/motore/src/service/tower_adapter.rs b/motore/src/service/tower_adapter.rs index 4fe26a8..a7d7c92 100644 --- a/motore/src/service/tower_adapter.rs +++ b/motore/src/service/tower_adapter.rs @@ -21,7 +21,11 @@ use std::{ task::{Context, Poll}, }; -use futures::Future; +#[cfg(feature = "service_send")] +use futures::future::BoxFuture; +#[cfg(not(feature = "service_send"))] +use futures::future::LocalBoxFuture; +use futures::{Future, FutureExt}; use crate::Service; @@ -58,16 +62,19 @@ impl Tower { } } +#[cfg(feature = "service_send")] impl tower::Service for Tower where - S: Service + Clone, + S: Service + Clone + 'static + Send, F: FnOnce(TowerReq) -> (Cx, MotoreReq) + Clone, + MotoreReq: 'static + Send, + Cx: 'static + Send, { type Response = S::Response; type Error = S::Error; - type Future = impl Future>; + type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -76,7 +83,32 @@ where fn call(&mut self, req: TowerReq) -> Self::Future { let inner = self.inner.clone(); let (mut cx, r) = (self.f.clone())(req); - async move { inner.call(&mut cx, r).await } + async move { inner.call(&mut cx, r).await }.boxed() + } +} + +#[cfg(not(feature = "service_send"))] +impl tower::Service for Tower +where + S: Service + Clone + 'static, + F: FnOnce(TowerReq) -> (Cx, MotoreReq) + Clone, + MotoreReq: 'static, + Cx: 'static, +{ + type Response = S::Response; + + type Error = S::Error; + + type Future = LocalBoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: TowerReq) -> Self::Future { + let inner = self.inner.clone(); + let (mut cx, r) = (self.f.clone())(req); + async move { inner.call(&mut cx, r).await }.boxed_local() } } @@ -145,15 +177,21 @@ where type Error = S::Error; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: MotoreReq, + ) -> impl Future> + Send { + self.inner.clone().call((self.f.clone())(cx, req)) + } - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: MotoreReq) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: MotoreReq, + ) -> impl Future> { self.inner.clone().call((self.f.clone())(cx, req)) } } diff --git a/motore/src/timeout.rs b/motore/src/timeout.rs index 3d14f52..db8ce68 100644 --- a/motore/src/timeout.rs +++ b/motore/src/timeout.rs @@ -4,8 +4,6 @@ use std::time::Duration; -use futures::Future; - use crate::{layer::Layer, service::Service, BoxError}; #[derive(Clone)] @@ -31,25 +29,22 @@ where type Error = BoxError; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - match self.duration { - Some(duration) => { - let sleep = tokio::time::sleep(duration); - tokio::select! { - r = self.inner.call(cx, req) => { - r.map_err(Into::into) - }, - _ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()), - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> Result { + match self.duration { + Some(duration) => { + let sleep = tokio::time::sleep(duration); + tokio::select! { + r = self.inner.call(cx, req) => { + r.map_err(Into::into) + }, + _ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()), } - None => self.inner.call(cx, req).await.map_err(Into::into), } + None => self.inner.call(cx, req).await.map_err(Into::into), } } } diff --git a/motore/src/utils/either.rs b/motore/src/utils/either.rs index 17101ca..4c9698c 100644 --- a/motore/src/utils/either.rs +++ b/motore/src/utils/either.rs @@ -1,5 +1,3 @@ -use futures::Future; - use crate::{layer::Layer, service::Service}; /// Combine two different service types into a single type. @@ -39,20 +37,14 @@ where type Error = A::Error; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - match self { - Either::A(s) => s.call(cx, req).await, - Either::B(s) => s.call(cx, req).await, - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> Result { + match self { + Either::A(s) => s.call(cx, req).await, + Either::B(s) => s.call(cx, req).await, } } }