Skip to content

Commit

Permalink
add graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Artyom Sidorenko committed Oct 18, 2022
1 parent 1c92e3a commit d334428
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
2 changes: 1 addition & 1 deletion wavesexchange_warp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wavesexchange_warp"
version = "0.14.0"
version = "0.14.2"
authors = ["Dmitry Shuranov <[email protected]>"]
edition = "2021"

Expand Down
32 changes: 26 additions & 6 deletions wavesexchange_warp/src/endpoints/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::{
livez as livez_fn, readyz as readyz_fn, startz as startz_fn, Checkz,
};
use futures::future::join;
use futures::future::BoxFuture;
use lazy_static::lazy_static;
use prometheus::{core::Collector, HistogramOpts, HistogramVec, IntCounter, Registry, TextEncoder};
use std::{fmt::Debug, future::Future};
Expand Down Expand Up @@ -76,6 +77,7 @@ pub struct MetricsWarpBuilder {
livez: DeepBoxedFilter<LivenessReply>,
readyz: DeepBoxedFilter<LivenessReply>,
startz: DeepBoxedFilter<LivenessReply>,
graceful_shutdown_signal: Option<BoxFuture<'static, ()>>,
}

impl MetricsWarpBuilder {
Expand All @@ -89,6 +91,7 @@ impl MetricsWarpBuilder {
livez: livez_fn().boxed(),
readyz: readyz_fn().boxed(),
startz: startz_fn().boxed(),
graceful_shutdown_signal: None,
}
}

Expand Down Expand Up @@ -158,6 +161,14 @@ impl MetricsWarpBuilder {
self
}

pub fn with_graceful_shutdown<F>(mut self, signal: F) -> Self
where
F: Future<Output = ()> + Send + 'static,
{
self.graceful_shutdown_signal = Some(Box::pin(signal));
self
}

/// Run warp instance(s) on current thread
pub async fn run_blocking(mut self) {
self = self
Expand All @@ -172,6 +183,7 @@ impl MetricsWarpBuilder {
livez,
readyz,
startz,
graceful_shutdown_signal,
} = self;

let host = [0, 0, 0, 0];
Expand All @@ -186,12 +198,20 @@ impl MetricsWarpBuilder {

match main_routes {
Some(routes) => {
join(
warp::serve(routes.with(warp::log::custom(estimate_request)))
.run((host, main_routes_port)),
warp_metrics_instance,
)
.await;
let warp_main_instance_prepared =
warp::serve(routes.with(warp::log::custom(estimate_request)));
match graceful_shutdown_signal {
Some(signal) => {
let (_addr, warp_main_instance) = warp_main_instance_prepared
.bind_with_graceful_shutdown((host, main_routes_port), signal);
join(warp_main_instance, warp_metrics_instance).await;
}
None => {
let warp_main_instance =
warp_main_instance_prepared.run((host, main_routes_port));
join(warp_main_instance, warp_metrics_instance).await;
}
}
}
None => warp_metrics_instance.await,
}
Expand Down
34 changes: 34 additions & 0 deletions wavesexchange_warp/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,37 @@ async fn test_run_metrics_warp() {
assert!(metrics.contains(r#"response_duration_count{code="200",method="GET"} 1"#));
assert!(metrics.contains(r#"response_duration_count{code="404",method="GET"} 1"#));
}

#[tokio::test]
async fn test_graceful_shutdown() {
use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel::<()>();
let main_port = 8081;
let url = format!("http://0.0.0.0:{main_port}");
let routes = warp::path!("hello").and_then(|| async { Ok::<_, Infallible>("Hello, world!") });

let warps = MetricsWarpBuilder::new()
.with_main_routes(routes)
.with_graceful_shutdown(async {
let _ = rx.await.unwrap();
})
.with_main_routes_port(main_port)
.run_blocking();

spawn(warps);
time::sleep(Duration::from_secs(1)).await; // wait for server

let hello = reqwest::get(format!("{url}/hello"))
.await
.unwrap()
.text()
.await
.unwrap();
assert_eq!(hello, "Hello, world!");

tx.send(()).unwrap();

let error = reqwest::get(format!("{url}/hello")).await.unwrap_err();
assert!(error.is_connect());
}

0 comments on commit d334428

Please sign in to comment.