Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dag] cache votes without re-counting #11808

Merged
merged 5 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use aptos_validator_transaction_pool as vtxn_pool;
use async_trait::async_trait;
use futures::{
executor::block_on,
future::{AbortHandle, Abortable},
FutureExt,
future::{join, AbortHandle, Abortable},
};
use futures_channel::oneshot;
use std::{collections::HashSet, sync::Arc, time::Duration};
use tokio_retry::strategy::ExponentialBackoff;

Expand Down Expand Up @@ -284,8 +284,9 @@ impl DagDriver {
let rb = self.reliable_broadcast.clone();
let rb2 = self.reliable_broadcast.clone();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let (tx, rx) = oneshot::channel();
let signature_builder =
SignatureBuilder::new(node.metadata().clone(), self.epoch_state.clone());
SignatureBuilder::new(node.metadata().clone(), self.epoch_state.clone(), tx);
let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len());
let latest_ledger_info = self.ledger_info_provider.clone();

Expand All @@ -298,7 +299,12 @@ impl DagDriver {
defer!( observe_round(timestamp, RoundStage::NodeBroadcasted); );
rb.broadcast(node, signature_builder).await
};
let core_task = node_broadcast.then(move |certificate| {
let certified_broadcast = async move {
let Ok(certificate) = rx.await else {
error!("channel closed before receiving ceritifcate");
return;
};

debug!(
LogSchema::new(LogEvent::BroadcastCertifiedNode),
id = node_clone.id()
Expand All @@ -311,8 +317,9 @@ impl DagDriver {
certified_node,
latest_ledger_info.get_latest_ledger_info(),
);
rb2.broadcast(certified_node_msg, cert_ack_set)
});
rb2.broadcast(certified_node_msg, cert_ack_set).await
};
let core_task = join(node_broadcast, certified_broadcast);
let author = self.author;
let task = async move {
debug!("{} Start reliable broadcast for round {}", author, round);
Expand Down
88 changes: 69 additions & 19 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::types::{DagSnapshotBitmask, NodeMetadata};
use super::{
types::{DagSnapshotBitmask, NodeMetadata},
Node,
};
use crate::{
dag::{
storage::DAGStorage,
Expand All @@ -23,19 +26,23 @@ use std::{

#[derive(Clone)]
pub enum NodeStatus {
Unordered(Arc<CertifiedNode>),
Unordered {
node: Arc<CertifiedNode>,
aggregated_weak_voting_power: u128,
aggregated_strong_voting_power: u128,
},
Ordered(Arc<CertifiedNode>),
}

impl NodeStatus {
pub fn as_node(&self) -> &Arc<CertifiedNode> {
match self {
NodeStatus::Unordered(node) | NodeStatus::Ordered(node) => node,
NodeStatus::Unordered { node, .. } | NodeStatus::Ordered(node) => node,
}
}

pub fn mark_as_ordered(&mut self) {
assert!(matches!(self, NodeStatus::Unordered(_)));
assert!(matches!(self, NodeStatus::Unordered { .. }));
*self = NodeStatus::Ordered(self.as_node().clone());
}
}
Expand Down Expand Up @@ -107,7 +114,12 @@ impl InMemDag {
.get_node_ref_mut(node.round(), node.author())
.expect("must be present");
ensure!(round_ref.is_none(), "race during insertion");
*round_ref = Some(NodeStatus::Unordered(node.clone()));
*round_ref = Some(NodeStatus::Unordered {
node: node.clone(),
aggregated_weak_voting_power: 0,
aggregated_strong_voting_power: 0,
});
self.update_votes(&node, true);
Ok(())
}

Expand Down Expand Up @@ -149,6 +161,39 @@ impl InMemDag {
Ok(())
}

pub fn update_votes(&mut self, node: &Node, update_link_power: bool) {
if node.round() <= self.lowest_round() {
return;
}

let voting_power = self
.epoch_state
.verifier
.get_voting_power(node.author())
.expect("must exist");

for parent in node.parents_metadata() {
let node_status = self
.get_node_ref_mut(parent.round(), parent.author())
.expect("must exist");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add another expect here instead of unreachable below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a mutable reference to node_status. If i expect, then I wont get a reference to the Option in BTreeMap.

match node_status {
Some(NodeStatus::Unordered {
aggregated_weak_voting_power,
aggregated_strong_voting_power,
..
}) => {
if update_link_power {
*aggregated_strong_voting_power += voting_power as u128;
} else {
*aggregated_weak_voting_power += voting_power as u128;
}
},
Some(NodeStatus::Ordered(_)) => {},
None => unreachable!("parents must exist before voting for a node"),
}
}
}

pub fn exists(&self, metadata: &NodeMetadata) -> bool {
self.get_node_ref_by_metadata(metadata).is_some()
}
Expand Down Expand Up @@ -211,24 +256,29 @@ impl InMemDag {
.map(|node_status| node_status.as_node())
}

// TODO: I think we can cache votes in the NodeStatus::Unordered
pub fn check_votes_for_node(
&self,
metadata: &NodeMetadata,
validator_verifier: &ValidatorVerifier,
) -> bool {
self.get_round_iter(metadata.round() + 1)
.map(|next_round_iter| {
let votes = next_round_iter
.filter(|node_status| {
node_status
.as_node()
.parents()
.iter()
.any(|cert| cert.metadata() == metadata)
})
.map(|node_status| node_status.as_node().author());
validator_verifier.check_voting_power(votes, false).is_ok()
self.get_node_ref_by_metadata(metadata)
.map(|node_status| match node_status {
NodeStatus::Unordered {
aggregated_weak_voting_power,
aggregated_strong_voting_power,
..
} => {
validator_verifier
.check_aggregated_voting_power(*aggregated_weak_voting_power, true)
.is_ok()
|| validator_verifier
.check_aggregated_voting_power(*aggregated_strong_voting_power, false)
.is_ok()
},
NodeStatus::Ordered(_) => {
error!("checking voting power for Ordered node");
true
},
})
.unwrap_or(false)
}
Expand Down Expand Up @@ -260,7 +310,7 @@ impl InMemDag {
.flat_map(|(_, round_ref)| round_ref.iter_mut())
.flatten()
.filter(move |node_status| {
matches!(node_status, NodeStatus::Unordered(_))
matches!(node_status, NodeStatus::Unordered { .. })
&& reachable_filter(node_status.as_node())
})
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl OrderRule {
.reachable(
Some(current_anchor.metadata().clone()).iter(),
Some(*self.lowest_unordered_anchor_round.read()),
|node_status| matches!(node_status, NodeStatus::Unordered(_)),
|node_status| matches!(node_status, NodeStatus::Unordered { .. }),
)
// skip the current anchor itself
.skip(1)
Expand Down
9 changes: 9 additions & 0 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ impl NodeBroadcastHandler {
}

fn validate(&self, node: Node) -> anyhow::Result<Node> {
ensure!(
node.epoch() == self.epoch_state.epoch,
"different epoch {}, current {}",
node.epoch(),
self.epoch_state.epoch
);

let num_vtxns = node.validator_txns().len() as u64;
ensure!(num_vtxns <= self.vtxn_config.per_block_limit_txn_count());
for vtxn in node.validator_txns() {
Expand Down Expand Up @@ -239,6 +246,8 @@ impl RpcHandler for NodeBroadcastHandler {
.expect("must exist")
.insert(*node.author(), vote.clone());

self.dag.write().update_votes(&node, false);

debug!(LogSchema::new(LogEvent::Vote)
.remote_peer(*node.author())
.round(node.round()));
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(crate) fn new_node(
parents: Vec<NodeCertificate>,
) -> Node {
Node::new(
0,
1,
round,
author,
timestamp,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ async fn test_node_broadcast_receiver_storage() {
let sig = rb_receiver.process(node).await.expect("must succeed");

assert_ok_eq!(storage.get_votes(), vec![(
NodeId::new(0, 1, signers[0].author()),
NodeId::new(1, 1, signers[0].author()),
sig
)],);

Expand Down
63 changes: 41 additions & 22 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ use aptos_types::{
validator_txn::ValidatorTransaction,
validator_verifier::ValidatorVerifier,
};
use futures_channel::oneshot;
use serde::{Deserialize, Serialize};
use std::{
cmp::min,
collections::HashSet,
fmt::{Display, Formatter},
ops::Deref,
ops::{Deref, DerefMut},
sync::Arc,
};

Expand Down Expand Up @@ -343,7 +344,7 @@ impl Node {
}
}

#[derive(Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone)]
#[derive(Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone, PartialOrd, Ord)]
pub struct NodeId {
epoch: u64,
round: Round,
Expand Down Expand Up @@ -534,47 +535,65 @@ impl TryFrom<DAGRpcResult> for Vote {

pub struct SignatureBuilder {
metadata: NodeMetadata,
partial_signatures: Mutex<PartialSignatures>,
inner: Mutex<(PartialSignatures, Option<oneshot::Sender<NodeCertificate>>)>,
epoch_state: Arc<EpochState>,
}

impl SignatureBuilder {
pub fn new(metadata: NodeMetadata, epoch_state: Arc<EpochState>) -> Arc<Self> {
pub fn new(
metadata: NodeMetadata,
epoch_state: Arc<EpochState>,
tx: oneshot::Sender<NodeCertificate>,
) -> Arc<Self> {
Arc::new(Self {
metadata,
partial_signatures: Mutex::new(PartialSignatures::empty()),
inner: Mutex::new((PartialSignatures::empty(), Some(tx))),
epoch_state,
})
}
}

impl BroadcastStatus<DAGMessage, DAGRpcResult> for Arc<SignatureBuilder> {
type Aggregated = NodeCertificate;
type Aggregated = ();
type Message = Node;
type Response = Vote;

/// Processes the [Vote]s received for a given [Node]. Once a supermajority voting power
/// is reached, this method sends [NodeCertificate] into a channel. It will only return
/// successfully when [Vote]s are received from all the peers.
fn add(&self, peer: Author, ack: Self::Response) -> anyhow::Result<Option<Self::Aggregated>> {
ensure!(self.metadata == ack.metadata, "Digest mismatch");
ack.verify(peer, &self.epoch_state.verifier)?;
debug!(LogSchema::new(LogEvent::ReceiveVote)
.remote_peer(peer)
.round(self.metadata.round()));
let mut signatures_lock = self.partial_signatures.lock();
signatures_lock.add_signature(peer, ack.signature);
Ok(self
.epoch_state
.verifier
.check_voting_power(signatures_lock.signatures().keys(), true)
.ok()
.map(|_| {
let aggregated_signature = self
.epoch_state
.verifier
.aggregate_signatures(&signatures_lock)
.expect("Signature aggregation should succeed");
observe_node(self.metadata.timestamp(), NodeStage::CertAggregated);
NodeCertificate::new(self.metadata.clone(), aggregated_signature)
}))
let mut guard = self.inner.lock();
let (partial_signatures, tx) = guard.deref_mut();
partial_signatures.add_signature(peer, ack.signature);

if tx.is_some()
&& self
.epoch_state
.verifier
.check_voting_power(partial_signatures.signatures().keys(), true)
.is_ok()
{
let aggregated_signature = self
.epoch_state
.verifier
.aggregate_signatures(partial_signatures)
.expect("Signature aggregation should succeed");
observe_node(self.metadata.timestamp(), NodeStage::CertAggregated);
let certificate = NodeCertificate::new(self.metadata.clone(), aggregated_signature);

_ = tx.take().expect("must exist").send(certificate);
}

if partial_signatures.signatures().len() == self.epoch_state.verifier.len() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comments here to explain why the aggregation is through a channel? I assume it's because we want to keep the rb alive even after cert?

Ok(Some(()))
} else {
Ok(None)
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions types/src/validator_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,14 @@ impl ValidatorVerifier {
check_super_majority: bool,
) -> std::result::Result<u128, VerifyError> {
let aggregated_voting_power = self.sum_voting_power(authors)?;
self.check_aggregated_voting_power(aggregated_voting_power, check_super_majority)
}

pub fn check_aggregated_voting_power(
&self,
aggregated_voting_power: u128,
check_super_majority: bool,
) -> std::result::Result<u128, VerifyError> {
let target = if check_super_majority {
self.quorum_voting_power
} else {
Expand Down
Loading