Skip to content

Commit

Permalink
feat: support disable default timeout layer
Browse files Browse the repository at this point in the history
  • Loading branch information
PureWhiteWu committed Jul 31, 2023
1 parent 0f0b9c4 commit 174b70d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 20 deletions.
2 changes: 1 addition & 1 deletion volo-thrift/src/client/layer/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
None => self.inner.call(cx, req).await.map_err(Into::into),
}
} else {
self.inner.call(cx, req).await.map_err(Into::into)
unreachable!("rpc_info.config should never be None")
}
}
}
Expand Down
73 changes: 54 additions & 19 deletions volo-thrift/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub struct ClientBuilder<IL, OL, MkClient, Req, Resp, MkT, MkC, LB> {
mk_lb: LB,
_marker: PhantomData<(*const Req, *const Resp)>,

disable_timeout_layer: bool,

#[cfg(feature = "multiplex")]
multiplex: bool,
}
Expand Down Expand Up @@ -93,6 +95,8 @@ impl<C, Req, Resp>
mk_lb: LbConfig::new(WeightedRandomBalance::new(), DummyDiscover {}),
_marker: PhantomData,

disable_timeout_layer: false,

#[cfg(feature = "multiplex")]
multiplex: false,
}
Expand Down Expand Up @@ -120,6 +124,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB, DISC>
make_codec: self.make_codec,
mk_lb: self.mk_lb.load_balance(load_balance),

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand All @@ -143,6 +149,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB, DISC>
make_codec: self.make_codec,
mk_lb: self.mk_lb.discover(discover),

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand Down Expand Up @@ -198,6 +206,13 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
self
}

/// Disable the default timeout layer.
#[doc(hidden)]
pub fn disable_timeout_layer(mut self) -> Self {
self.disable_timeout_layer = true;
self
}

pub fn mk_load_balance<NLB>(
self,
mk_load_balance: NLB,
Expand All @@ -216,6 +231,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
make_codec: self.make_codec,
mk_lb: mk_load_balance,

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand Down Expand Up @@ -247,6 +264,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
make_codec,
mk_lb: self.mk_lb,

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand All @@ -272,6 +291,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
make_codec: self.make_codec,
mk_lb: self.mk_lb,

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand Down Expand Up @@ -299,7 +320,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
///
/// After we call `.layer_inner(baz)`, we will get: foo -> bar -> baz.
///
/// The overall order for layers is: Timeout -> outer -> LoadBalance -> [inner] -> transport.
/// The overall order for layers is: outer -> LoadBalance -> [inner] -> transport.
pub fn layer_inner<Inner>(
self,
layer: Inner,
Expand All @@ -318,6 +339,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
make_codec: self.make_codec,
mk_lb: self.mk_lb,

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand Down Expand Up @@ -354,6 +377,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
make_codec: self.make_codec,
mk_lb: self.mk_lb,

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand All @@ -371,7 +396,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
///
/// After we call `.layer_outer_front(baz)`, we will get: baz -> foo -> bar.
///
/// The overall order for layers is: Timeout -> [outer] -> LoadBalance -> inner -> transport.
/// The overall order for layers is: [outer] -> Timeout -> LoadBalance -> inner -> transport.
pub fn layer_outer_front<Outer>(
self,
layer: Outer,
Expand All @@ -390,6 +415,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
make_codec: self.make_codec,
mk_lb: self.mk_lb,

disable_timeout_layer: self.disable_timeout_layer,

#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
}
Expand All @@ -413,6 +440,8 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
make_codec: self.make_codec,
mk_lb: self.mk_lb,

disable_timeout_layer: self.disable_timeout_layer,

multiplex,
}
}
Expand Down Expand Up @@ -467,13 +496,22 @@ where
impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT, MkC, LB>
where
C: volo::client::MkClient<
Client<BoxCloneService<ClientContext, Req, Option<Resp>, crate::Error>>,
Client<
BoxCloneService<
ClientContext,
Req,
Option<Resp>,
<OL::Service as Service<ClientContext, Req>>::Error,
>,
>,
>,
LB: MkLbLayer,
LB::Layer: Layer<IL::Service>,
<LB::Layer as Layer<IL::Service>>::Service:
Service<ClientContext, Req, Response = Option<Resp>> + 'static + Send + Clone + Sync,
<<LB::Layer as Layer<IL::Service>>::Service as Service<ClientContext, Req>>::Error: Into<Error>,
<LB::Layer as Layer<IL::Service>>::Service: Service<ClientContext, Req, Response = Option<Resp>, Error = Error>
+ 'static
+ Send
+ Clone
+ Sync,
for<'cx> <<LB::Layer as Layer<IL::Service>>::Service as Service<ClientContext, Req>>::Future<'cx>:
Send,
Req: EntryMessage + Send + 'static + Sync + Clone,
Expand All @@ -485,14 +523,7 @@ where
for<'cx> <IL::Service as Service<ClientContext, Req>>::Future<'cx>: Send,
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
OL: Layer<
BoxCloneService<
ClientContext,
Req,
Option<Resp>,
<<LB::Layer as Layer<IL::Service>>::Service as Service<ClientContext, Req>>::Error,
>,
>,
OL: Layer<BoxCloneService<ClientContext, Req, Option<Resp>, Error>>,
OL::Service:
Service<ClientContext, Req, Response = Option<Resp>> + 'static + Send + Clone + Sync,
for<'cx> <OL::Service as Service<ClientContext, Req>>::Future<'cx>: Send,
Expand Down Expand Up @@ -528,11 +559,15 @@ where
},
};

let transport = TimeoutLayer::new().layer(self.outer_layer.layer(BoxCloneService::new(
self.mk_lb.make().layer(self.inner_layer.layer(msg_svc)),
)));

let transport = BoxCloneService::new(transport);
let transport = if !self.disable_timeout_layer {
BoxCloneService::new(self.outer_layer.layer(BoxCloneService::new(
TimeoutLayer::new().layer(self.mk_lb.make().layer(self.inner_layer.layer(msg_svc))),
)))
} else {
BoxCloneService::new(self.outer_layer.layer(BoxCloneService::new(
self.mk_lb.make().layer(self.inner_layer.layer(msg_svc)),
)))
};

self.mk_client.mk_client(Client {
inner: Arc::new(ClientInner {
Expand Down

0 comments on commit 174b70d

Please sign in to comment.