diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index d8e8bbabd03857..45ae459d4b7575 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -29,11 +29,16 @@ use aptos_types::{ validator_txn::ValidatorTransaction, }; use async_trait::async_trait; +use claims::assert_some; +use dashmap::DashSet; use std::{collections::BTreeMap, mem, sync::Arc}; pub(crate) struct NodeBroadcastHandler { dag: Arc, + /// Note: The mutex around BTreeMap is to work around Rust Sync semantics. + /// Fine grained concurrency is implemented by the DashSet below. votes_by_round_peer: Mutex>>, + votes_fine_grained_lock: DashSet<(Round, Author)>, signer: Arc, epoch_state: Arc, storage: Arc, @@ -62,6 +67,7 @@ impl NodeBroadcastHandler { Self { dag, votes_by_round_peer: Mutex::new(votes_by_round_peer), + votes_fine_grained_lock: DashSet::with_capacity(epoch_state.verifier.len() * 10), signer, epoch_state, storage, @@ -205,25 +211,39 @@ impl RpcHandler for NodeBroadcastHandler { .remote_peer(*node.author()) .round(node.round())); - let mut votes_by_round_peer_guard = self.votes_by_round_peer.lock(); - let votes_by_peer = votes_by_round_peer_guard - .entry(node.metadata().round()) - .or_default(); - match votes_by_peer.get(node.metadata().author()) { - None => { - let signature = node.sign_vote(&self.signer)?; - let vote = Vote::new(node.metadata().clone(), signature); - - // TODO: make this concurrent - self.storage.save_vote(&node.id(), &vote)?; - votes_by_peer.insert(*node.author(), vote.clone()); - - debug!(LogSchema::new(LogEvent::Vote) - .remote_peer(*node.author()) - .round(node.round())); - Ok(vote) - }, - Some(ack) => Ok(ack.clone()), + if let Some(ack) = self + .votes_by_round_peer + .lock() + .entry(node.round()) + .or_default() + .get(node.author()) + { + return Ok(ack.clone()); } + + ensure!( + self.votes_fine_grained_lock + .insert((node.round(), *node.author())), + "concurrent insertion" + ); + defer!({ + assert_some!(self + .votes_fine_grained_lock + .remove(&(node.round(), *node.author()))); + }); + + let signature = node.sign_vote(&self.signer)?; + let vote = Vote::new(node.metadata().clone(), signature); + self.storage.save_vote(&node.id(), &vote)?; + self.votes_by_round_peer + .lock() + .get_mut(&node.round()) + .expect("must exist") + .insert(*node.author(), vote.clone()); + + debug!(LogSchema::new(LogEvent::Vote) + .remote_peer(*node.author()) + .round(node.round())); + Ok(vote) } }