Skip to content

Commit

Permalink
[dag] cache votes without re-counting
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jan 28, 2024
1 parent 1b0b98e commit 9604403
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 24 deletions.
88 changes: 68 additions & 20 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::types::{DagSnapshotBitmask, NodeMetadata};
use super::{
types::{DagSnapshotBitmask, NodeMetadata},
Node, NodeId,
};
use crate::{
dag::{
storage::DAGStorage,
Expand All @@ -16,26 +19,29 @@ use aptos_infallible::RwLock;
use aptos_logger::{debug, error, warn};
use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
ops::Deref,
sync::Arc,
};

#[derive(Clone)]
pub enum NodeStatus {
Unordered(Arc<CertifiedNode>),
Unordered {
node: Arc<CertifiedNode>,
accumulated_voting_power: u128,
},
Ordered(Arc<CertifiedNode>),
}

impl NodeStatus {
pub fn as_node(&self) -> &Arc<CertifiedNode> {
match self {
NodeStatus::Unordered(node) | NodeStatus::Ordered(node) => node,
NodeStatus::Unordered { node, .. } | NodeStatus::Ordered(node) => node,
}
}

pub fn mark_as_ordered(&mut self) {
assert!(matches!(self, NodeStatus::Unordered(_)));
assert!(matches!(self, NodeStatus::Unordered { .. }));
*self = NodeStatus::Ordered(self.as_node().clone());
}
}
Expand All @@ -49,6 +55,7 @@ pub struct Dag {
epoch_state: Arc<EpochState>,
/// The window we maintain between highest committed round and initial round
window_size: u64,
voted_nodes: BTreeMap<Round, BTreeSet<NodeId>>,
}

impl Dag {
Expand All @@ -61,6 +68,7 @@ impl Dag {
start_round,
epoch_state,
window_size,
voted_nodes: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -100,7 +108,11 @@ impl Dag {
.get_node_ref_mut(node.round(), node.author())
.expect("must be present");
ensure!(round_ref.is_none(), "race during insertion");
*round_ref = Some(NodeStatus::Unordered(node.clone()));
*round_ref = Some(NodeStatus::Unordered {
node: node.clone(),
accumulated_voting_power: 0,
});
self.update_votes(&node);
Ok(())
}

Expand Down Expand Up @@ -142,6 +154,40 @@ impl Dag {
Ok(())
}

pub fn update_votes(&mut self, node: &Node) {
if !self
.voted_nodes
.entry(node.round())
.or_default()
.insert(node.id())
{
return;
}

if node.round() <= self.lowest_round() {
return;
}

for parent in node.parents_metadata() {
let voting_power = self
.epoch_state
.verifier
.get_voting_power(node.author())
.expect("must exist");
let node_status = self
.get_node_ref_mut(parent.round(), parent.author())
.expect("must exist");
match node_status {
Some(NodeStatus::Unordered {
accumulated_voting_power,
..
}) => *accumulated_voting_power += voting_power as u128,
Some(NodeStatus::Ordered(_)) => {},
None => unreachable!("parents must exist before voting for a node"),
}
}
}

pub fn exists(&self, metadata: &NodeMetadata) -> bool {
self.get_node_ref_by_metadata(metadata).is_some()
}
Expand Down Expand Up @@ -204,24 +250,23 @@ impl Dag {
.map(|node_status| node_status.as_node())
}

// TODO: I think we can cache votes in the NodeStatus::Unordered
pub fn check_votes_for_node(
&self,
metadata: &NodeMetadata,
validator_verifier: &ValidatorVerifier,
) -> bool {
self.get_round_iter(metadata.round() + 1)
.map(|next_round_iter| {
let votes = next_round_iter
.filter(|node_status| {
node_status
.as_node()
.parents()
.iter()
.any(|cert| cert.metadata() == metadata)
})
.map(|node_status| node_status.as_node().author());
validator_verifier.check_voting_power(votes, false).is_ok()
self.get_node_ref_by_metadata(metadata)
.map(|node_status| match node_status {
NodeStatus::Unordered {
accumulated_voting_power,
..
} => validator_verifier
.check_aggregated_voting_power(*accumulated_voting_power, false)
.is_ok(),
NodeStatus::Ordered(_) => {
error!("checking voting power for Ordered node");
true
},
})
.unwrap_or(false)
}
Expand Down Expand Up @@ -253,7 +298,7 @@ impl Dag {
.flat_map(|(_, round_ref)| round_ref.iter_mut())
.flatten()
.filter(move |node_status| {
matches!(node_status, NodeStatus::Unordered(_))
matches!(node_status, NodeStatus::Unordered { .. })
&& reachable_filter(node_status.as_node())
})
}
Expand Down Expand Up @@ -335,6 +380,9 @@ impl Dag {
}

pub(super) fn prune(&mut self) -> BTreeMap<u64, Vec<Option<NodeStatus>>> {
let to_keep = self.voted_nodes.split_off(&self.start_round);
_ = std::mem::replace(&mut self.voted_nodes, to_keep);

let to_keep = self.nodes_by_round.split_off(&self.start_round);
let to_prune = std::mem::replace(&mut self.nodes_by_round, to_keep);
debug!(
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl OrderRule {
.reachable(
Some(current_anchor.metadata().clone()).iter(),
Some(*self.lowest_unordered_anchor_round.read()),
|node_status| matches!(node_status, NodeStatus::Unordered(_)),
|node_status| matches!(node_status, NodeStatus::Unordered{ .. }),
)
// skip the current anchor itself
.skip(1)
Expand Down
9 changes: 9 additions & 0 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ impl NodeBroadcastHandler {
}

fn validate(&self, node: Node) -> anyhow::Result<Node> {
ensure!(
node.epoch() == self.epoch_state.epoch,
"different epoch {}, current {}",
node.epoch(),
self.epoch_state.epoch
);

let num_vtxns = node.validator_txns().len() as u64;
ensure!(num_vtxns <= self.vtxn_config.per_block_limit_txn_count());
let vtxn_total_bytes = node
Expand Down Expand Up @@ -224,6 +231,8 @@ impl RpcHandler for NodeBroadcastHandler {
.expect("must exist")
.insert(*node.author(), vote.clone());

self.dag.write().update_votes(&node);

debug!(LogSchema::new(LogEvent::Vote)
.remote_peer(*node.author())
.round(node.round()));
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(crate) fn new_node(
parents: Vec<NodeCertificate>,
) -> Node {
Node::new(
0,
1,
round,
author,
timestamp,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async fn test_node_broadcast_receiver_storage() {
let sig = rb_receiver.process(node).await.expect("must succeed");

assert_ok_eq!(storage.get_votes(), vec![(
NodeId::new(0, 1, signers[0].author()),
NodeId::new(1, 1, signers[0].author()),
sig
)],);

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl Node {
}
}

#[derive(Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone)]
#[derive(Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone, PartialOrd, Ord)]
pub struct NodeId {
epoch: u64,
round: Round,
Expand Down
7 changes: 7 additions & 0 deletions types/src/validator_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,14 @@ impl ValidatorVerifier {
check_super_majority: bool,
) -> std::result::Result<u128, VerifyError> {
let aggregated_voting_power = self.sum_voting_power(authors)?;
self.check_aggregated_voting_power(aggregated_voting_power, check_super_majority)
}

pub fn check_aggregated_voting_power(
&self,
aggregated_voting_power: u128,
check_super_majority: bool,
) -> std::result::Result<u128, VerifyError> {
let target = if check_super_majority {
self.quorum_voting_power
} else {
Expand Down

0 comments on commit 9604403

Please sign in to comment.