Skip to content

Commit

Permalink
[dag] support parallel rb voting
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Feb 6, 2024
1 parent fe34bba commit 0cbe0ef
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ use aptos_types::{
validator_txn::ValidatorTransaction,
};
use async_trait::async_trait;
use claims::assert_some;
use dashmap::DashSet;
use std::{collections::BTreeMap, mem, sync::Arc};

pub(crate) struct NodeBroadcastHandler {
dag: Arc<DagStore>,
/// Note: The mutex around BTreeMap is to work around Rust Sync semantics.
/// Fine grained concurrency is implemented by the DashSet below.
votes_by_round_peer: Mutex<BTreeMap<Round, BTreeMap<Author, Vote>>>,
votes_fine_grained_lock: DashSet<(Round, Author)>,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
Expand Down Expand Up @@ -62,6 +67,7 @@ impl NodeBroadcastHandler {
Self {
dag,
votes_by_round_peer: Mutex::new(votes_by_round_peer),
votes_fine_grained_lock: DashSet::with_capacity(epoch_state.verifier.len() * 10),
signer,
epoch_state,
storage,
Expand Down Expand Up @@ -205,25 +211,39 @@ impl RpcHandler for NodeBroadcastHandler {
.remote_peer(*node.author())
.round(node.round()));

let mut votes_by_round_peer_guard = self.votes_by_round_peer.lock();
let votes_by_peer = votes_by_round_peer_guard
.entry(node.metadata().round())
.or_default();
match votes_by_peer.get(node.metadata().author()) {
None => {
let signature = node.sign_vote(&self.signer)?;
let vote = Vote::new(node.metadata().clone(), signature);

// TODO: make this concurrent
self.storage.save_vote(&node.id(), &vote)?;
votes_by_peer.insert(*node.author(), vote.clone());

debug!(LogSchema::new(LogEvent::Vote)
.remote_peer(*node.author())
.round(node.round()));
Ok(vote)
},
Some(ack) => Ok(ack.clone()),
if let Some(ack) = self
.votes_by_round_peer
.lock()
.entry(node.round())
.or_default()
.get(node.author())
{
return Ok(ack.clone());
}

ensure!(
self.votes_fine_grained_lock
.insert((node.round(), *node.author())),
"concurrent insertion"
);
defer!({
assert_some!(self
.votes_fine_grained_lock
.remove(&(node.round(), *node.author())));
});

let signature = node.sign_vote(&self.signer)?;
let vote = Vote::new(node.metadata().clone(), signature);
self.storage.save_vote(&node.id(), &vote)?;
self.votes_by_round_peer
.lock()
.get_mut(&node.round())
.expect("must exist")
.insert(*node.author(), vote.clone());

debug!(LogSchema::new(LogEvent::Vote)
.remote_peer(*node.author())
.round(node.round()));
Ok(vote)
}
}

0 comments on commit 0cbe0ef

Please sign in to comment.