Skip to content

Commit

Permalink
support shutdown hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
Yanhao committed Sep 18, 2023
1 parent ee94d93 commit 3f1c660
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions volo-thrift/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{
collections::VecDeque,
marker::PhantomData,
sync::{atomic::Ordering, Arc},
time::Duration,
};

use futures::future::BoxFuture;
use motore::{
layer::{Identity, Layer, Stack},
service::Service,
Expand Down Expand Up @@ -42,6 +44,9 @@ pub struct Server<S, L, Req, MkC, SP> {
#[cfg(feature = "multiplex")]
multiplex: bool,
span_provider: SP,

shutdown_hooks: VecDeque<Box<dyn FnOnce() -> BoxFuture<'static, ()>>>,

_marker: PhantomData<Req>,
}

Expand All @@ -66,12 +71,25 @@ impl<S, Req>
#[cfg(feature = "multiplex")]
multiplex: false,
span_provider: DefaultProvider {},
shutdown_hooks: VecDeque::new(),
_marker: PhantomData,
}
}
}

impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {
/// Register shutdown hook.
///
/// Hook functions will be called just before volo's own gracefull existing code starts,
/// in reverse order of registration.
pub fn register_shutdown_hook(
mut self,
hook: impl FnOnce() -> BoxFuture<'static, ()> + 'static,
) -> Self {
self.shutdown_hooks.push_front(Box::new(hook));
self
}

/// Adds a new inner layer to the server.
///
/// The layer's `Service` should be `Send + Sync + Clone + 'static`.
Expand All @@ -92,6 +110,7 @@ impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {
#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
span_provider: self.span_provider,
shutdown_hooks: self.shutdown_hooks,
_marker: PhantomData,
}
}
Expand All @@ -116,6 +135,7 @@ impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {
#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
span_provider: self.span_provider,
shutdown_hooks: self.shutdown_hooks,
_marker: PhantomData,
}
}
Expand Down Expand Up @@ -144,6 +164,7 @@ impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {
#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
span_provider: self.span_provider,
shutdown_hooks: self.shutdown_hooks,
_marker: PhantomData,
}
}
Expand Down Expand Up @@ -291,6 +312,14 @@ impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {
}
}

if !self.shutdown_hooks.is_empty() {
info!("[VOLO] call shutdown hooks");

for hook in self.shutdown_hooks {
(hook)().await;
}
}

// received signal, graceful shutdown now
info!("[VOLO] received signal, gracefully exiting now");
*exit_flag.write() = true;
Expand Down Expand Up @@ -343,6 +372,7 @@ impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {
#[cfg(feature = "multiplex")]
multiplex: self.multiplex,
span_provider: provider,
shutdown_hooks: self.shutdown_hooks,
_marker: PhantomData,
}
}
Expand Down

0 comments on commit 3f1c660

Please sign in to comment.