Skip to content

Commit

Permalink
pimp macro to add shutdown hook
Browse files Browse the repository at this point in the history
  • Loading branch information
hhalex committed Feb 25, 2025
1 parent 140bd91 commit bbb1cab
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::{
sync::Arc,
};
use storage::{DataProposalVerdict, LaneEntry, Storage};
use tokio::task::{futures, JoinHandle, JoinSet};
use tokio::task::JoinSet;
// Pick one of the two implementations
// use storage_memory::LanesStorage;
use storage_fjall::LanesStorage;
Expand Down Expand Up @@ -113,7 +113,7 @@ pub struct MempoolStore {
pub struct Mempool {
bus: MempoolBusClient,
file: Option<PathBuf>,
blocker: JoinSet<()>,
blocker: JoinSet<Result<()>>,
conf: SharedConf,
crypto: SharedBlstCrypto,
metrics: MempoolMetrics,
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Module for Mempool {
bus,
file: Some(ctx.common.config.data_directory.clone()),
conf: ctx.common.config.clone(),
blocker: JoinSet::new()
blocker: JoinSet::new(),
metrics,
crypto: Arc::clone(&ctx.node.crypto),
lanes: LanesStorage::new(
Expand Down Expand Up @@ -265,6 +265,12 @@ impl Mempool {

module_handle_messages! {
on_bus self.bus,
on_shutdown {
// Waiting all proof txs being processed
let mut join_set: JoinSet<Result<()>> = JoinSet::new();
std::mem::swap(&mut self.blocker, &mut join_set);
join_set.join_all().await;
},
listen<SignedByValidator<MempoolNetMessage>> cmd => {
let _ = self.handle_net_message(cmd)
.log_error("Handling MempoolNetMessage in Mempool");
Expand Down Expand Up @@ -974,14 +980,14 @@ impl Mempool {
let kc = self.known_contracts.clone();
let sender: &tokio::sync::broadcast::Sender<InternalMempoolEvent> = self.bus.get();
let sender = sender.clone();
let t = tokio::task::spawn_blocking(move || {
let tx =
Self::process_proof_tx(kc, tx).log_error("Error processing proof tx")?;
self.blocker.spawn_blocking(move || {
let tx = Self::process_proof_tx(kc, tx)
.log_error("Processing proof tx in blocker")?;
sender
.send(InternalMempoolEvent::OnProcessedNewTx(tx))
.log_warn("sending processed TX")
.log_warn("sending processed TX")?;
Ok(())
});
while !t.is_finished() {}

return Ok(());
}
Expand Down Expand Up @@ -1242,6 +1248,7 @@ pub mod test {
bus,
file: None,
conf: SharedConf::default(),
blocker: JoinSet::new(),
crypto: Arc::new(crypto),
metrics: MempoolMetrics::global("id".to_string()),
lanes,
Expand Down
16 changes: 16 additions & 0 deletions src/utils/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ pub mod signal {

#[macro_export]
macro_rules! module_handle_messages {
(on_bus $bus:expr, on_shutdown $on_shutdown:block, $($rest:tt)*) => {
{
let mut shutdown_receiver = unsafe { &mut *Pick::<tokio::sync::broadcast::Receiver<$crate::utils::modules::signal::ShutdownModule>>::splitting_get_mut(&mut $bus) };
let mut should_shutdown = false;
$crate::handle_messages! {
on_bus $bus,
$($rest)*
Ok(_) = $crate::utils::modules::signal::async_receive_shutdown::<Self>(&mut should_shutdown, &mut shutdown_receiver) => {
tracing::debug!("Break signal received for module {}", std::any::type_name::<Self>());
$on_shutdown;
break;
}
}
should_shutdown
}
};
(on_bus $bus:expr, $($rest:tt)*) => {
{
let mut shutdown_receiver = unsafe { &mut *Pick::<tokio::sync::broadcast::Receiver<$crate::utils::modules::signal::ShutdownModule>>::splitting_get_mut(&mut $bus) };
Expand Down

0 comments on commit bbb1cab

Please sign in to comment.