Skip to content

Commit

Permalink
[dag] broadcast nodes within window till all validators ack (#11751)
Browse files Browse the repository at this point in the history
* [dag] broadcast nodes within window till all validators ack
* [dag] use DropGuard to drop handles
* move BoundedVecDeque to aptos-collections
* use BoundedVecDeque in MetadataAdapter
  • Loading branch information
ibalajiarun authored Feb 9, 2024
1 parent e173271 commit 2b3cce4
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 31 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ members = [
"crates/aptos-bcs-utils",
"crates/aptos-bitvec",
"crates/aptos-build-info",
"crates/aptos-collections",
"crates/aptos-compression",
"crates/aptos-crypto",
"crates/aptos-crypto-derive",
Expand Down Expand Up @@ -291,6 +292,7 @@ aptos-build-info = { path = "crates/aptos-build-info" }
aptos-cached-packages = { path = "aptos-move/framework/cached-packages" }
aptos-channels = { path = "crates/channel" }
aptos-cli-common = { path = "crates/aptos-cli-common" }
aptos-collections = { path = "crates/aptos-collections" }
aptos-compression = { path = "crates/aptos-compression" }
aptos-consensus = { path = "consensus" }
aptos-consensus-notifications = { path = "state-sync/inter-component/consensus-notifications" }
Expand Down Expand Up @@ -486,7 +488,10 @@ claims = "0.7"
clap = { version = "4.3.9", features = ["derive", "unstable-styles"] }
clap-verbosity-flag = "2.1.1"
clap_complete = "4.4.1"
cloud-storage = { version = "0.11.1", features = ["global-client", "rustls-tls"], default-features = false }
cloud-storage = { version = "0.11.1", features = [
"global-client",
"rustls-tls",
], default-features = false }
codespan-reporting = "0.11.1"
concurrent-queue = "2.2.0"
console-subscriber = "0.1.8"
Expand Down Expand Up @@ -618,7 +623,11 @@ rand = "0.7.3"
rand_core = "0.5.1"
random_word = "0.3.0"
rayon = "1.5.2"
redis = { version = "0.22.3", features = ["tokio-comp", "script", "connection-manager"] }
redis = { version = "0.22.3", features = [
"tokio-comp",
"script",
"connection-manager",
] }
redis-test = { version = "0.1.1", features = ["aio"] }
regex = "1.9.3"
reqwest = { version = "0.11.11", features = [
Expand All @@ -632,7 +641,7 @@ reqwest-retry = "0.2.1"
ring = { version = "0.16.20", features = ["std"] }
ripemd = "0.1.1"
rocksdb = { version = "0.21.0", features = ["lz4"] }
rstack-self = { version = "0.3.0", features = ["dw"], default_features = false }
rstack-self = { version = "0.3.0", features = ["dw"], default_features = false }
rstest = "0.15.0"
rusty-fork = "0.3.0"
rustversion = "1.0.14"
Expand All @@ -646,7 +655,10 @@ siphasher = "0.3.10"
serde = { version = "1.0.193", features = ["derive", "rc"] }
serde-big-array = "0.5.1"
serde_bytes = "0.11.6"
serde_json = { version = "1.0.81", features = ["preserve_order", "arbitrary_precision"] } # Note: arbitrary_precision is required to parse u256 in JSON
serde_json = { version = "1.0.81", features = [
"preserve_order",
"arbitrary_precision",
] } # Note: arbitrary_precision is required to parse u256 in JSON
serde_repr = "0.1"
serde_merge = "0.1.3"
serde-name = "0.1.1"
Expand Down
10 changes: 9 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = { workspace = true }
aptos-bitvec = { workspace = true }
aptos-bounded-executor = { workspace = true }
aptos-channels = { workspace = true }
aptos-collections = { workspace = true }
aptos-config = { workspace = true }
aptos-consensus-notifications = { workspace = true }
aptos-consensus-types = { workspace = true }
Expand Down Expand Up @@ -97,5 +98,12 @@ tempfile = { workspace = true }

[features]
default = []
fuzzing = ["aptos-consensus-types/fuzzing", "aptos-config/fuzzing", "aptos-crypto/fuzzing", "aptos-mempool/fuzzing", "aptos-types/fuzzing", "aptos-safety-rules/testing"]
fuzzing = [
"aptos-consensus-types/fuzzing",
"aptos-config/fuzzing",
"aptos-crypto/fuzzing",
"aptos-mempool/fuzzing",
"aptos-types/fuzzing",
"aptos-safety-rules/testing",
]
failpoints = ["fail/failpoints"]
16 changes: 5 additions & 11 deletions consensus/src/dag/anchor_election/leader_reputation_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@ use crate::{
},
};
use aptos_bitvec::BitVec;
use aptos_collections::BoundedVecDeque;
use aptos_consensus_types::common::{Author, Round};
use aptos_crypto::HashValue;
use aptos_infallible::Mutex;
use aptos_types::account_config::NewBlockEvent;
use move_core_types::account_address::AccountAddress;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};

pub struct MetadataBackendAdapter {
epoch_to_validators: HashMap<u64, HashMap<Author, usize>>,
window_size: usize,
sliding_window: Mutex<VecDeque<CommitEvent>>,
sliding_window: Mutex<BoundedVecDeque<CommitEvent>>,
}

impl MetadataBackendAdapter {
Expand All @@ -38,19 +36,15 @@ impl MetadataBackendAdapter {
Self {
epoch_to_validators,
window_size,
sliding_window: Mutex::new(VecDeque::new()),
sliding_window: Mutex::new(BoundedVecDeque::new(window_size)),
}
}

pub fn push(&self, event: CommitEvent) {
if !self.epoch_to_validators.contains_key(&event.epoch()) {
return;
}
let mut lock = self.sliding_window.lock();
if lock.len() == self.window_size {
lock.pop_back();
}
lock.push_front(event);
self.sliding_window.lock().push_front(event);
}

// TODO: we should change NewBlockEvent on LeaderReputation to take a trait
Expand Down
23 changes: 8 additions & 15 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ use crate::{
payload_client::PayloadClient,
};
use anyhow::{bail, ensure};
use aptos_collections::BoundedVecDeque;
use aptos_config::config::DagPayloadConfig;
use aptos_consensus_types::common::{Author, Payload, PayloadFilter};
use aptos_crypto::hash::CryptoHash;
use aptos_infallible::Mutex;
use aptos_logger::{debug, error};
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_reliable_broadcast::{DropGuard, ReliableBroadcast};
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{block_info::Round, epoch_state::EpochState};
use aptos_validator_transaction_pool as vtxn_pool;
Expand All @@ -49,7 +50,7 @@ pub(crate) struct DagDriver {
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
rb_abort_handle: Mutex<Option<(AbortHandle, u64)>>,
rb_handles: Mutex<BoundedVecDeque<(DropGuard, u64)>>,
storage: Arc<dyn DAGStorage>,
order_rule: Mutex<OrderRule>,
fetch_requester: Arc<dyn TFetchRequester>,
Expand Down Expand Up @@ -93,7 +94,7 @@ impl DagDriver {
payload_client,
reliable_broadcast,
time_service,
rb_abort_handle: Mutex::new(None),
rb_handles: Mutex::new(BoundedVecDeque::new(window_size_config as usize)),
storage,
order_rule: Mutex::new(order_rule),
fetch_requester,
Expand Down Expand Up @@ -319,13 +320,13 @@ impl DagDriver {
debug!("Finish reliable broadcast for round {}", round);
};
tokio::spawn(Abortable::new(task, abort_registration));
if let Some((prev_handle, prev_round_timestamp)) = self
.rb_abort_handle
if let Some((_handle, prev_round_timestamp)) = self
.rb_handles
.lock()
.replace((abort_handle, timestamp))
.push_back((DropGuard::new(abort_handle), timestamp))
{
// TODO: this observation is inaccurate.
observe_round(prev_round_timestamp, RoundStage::Finished);
prev_handle.abort();
}
}
}
Expand Down Expand Up @@ -355,11 +356,3 @@ impl RpcHandler for DagDriver {
Ok(CertifiedAck::new(epoch))
}
}

impl Drop for DagDriver {
fn drop(&mut self) {
if let Some((handle, _)) = self.rb_abort_handle.lock().as_ref() {
handle.abort()
}
}
}
15 changes: 15 additions & 0 deletions crates/aptos-collections/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "aptos-collections"
description = "Aptos Collections Library"
version = "0.1.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
100 changes: 100 additions & 0 deletions crates/aptos-collections/src/bounded_vec_deque.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright © Aptos Foundation

use std::collections::{
vec_deque::{IntoIter, Iter},
VecDeque,
};

#[derive(Clone)]
pub struct BoundedVecDeque<T> {
inner: VecDeque<T>,
capacity: usize,
}

impl<T> BoundedVecDeque<T> {
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0);
Self {
inner: VecDeque::with_capacity(capacity),
capacity,
}
}

pub fn is_full(&self) -> bool {
self.inner.len() == self.capacity
}

pub fn push_back(&mut self, item: T) -> Option<T> {
let oldest = if self.is_full() {
self.inner.pop_front()
} else {
None
};

self.inner.push_back(item);
assert!(self.inner.len() <= self.capacity);
oldest
}

pub fn push_front(&mut self, item: T) -> Option<T> {
let oldest = if self.is_full() {
self.inner.pop_back()
} else {
None
};

self.inner.push_front(item);
assert!(self.inner.len() <= self.capacity);
oldest
}

pub fn iter(&self) -> Iter<'_, T> {
self.inner.iter()
}
}

impl<T> IntoIterator for BoundedVecDeque<T> {
type IntoIter = IntoIter<T>;
type Item = T;

fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}

#[cfg(test)]
mod tests {
use super::BoundedVecDeque;

#[test]
fn test_bounded_vec_deque_capacity() {
let capacity = 10;
let mut queue = BoundedVecDeque::new(capacity);
for i in 0..capacity {
queue.push_back(i);
}

assert!(queue.is_full());

assert_eq!(queue.push_back(capacity), Some(0));

assert_eq!(queue.push_front(0), Some(capacity));
}

#[test]
fn test_bounded_vec_deque_iter() {
let capacity = 10;
let mut queue = BoundedVecDeque::new(capacity);
for i in 0..capacity {
queue.push_back(i);
}

for (i, item) in queue.iter().enumerate() {
assert_eq!(i, *item);
}

for (i, item) in queue.into_iter().enumerate() {
assert_eq!(i, item);
}
}
}
5 changes: 5 additions & 0 deletions crates/aptos-collections/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright © Aptos Foundation

mod bounded_vec_deque;

pub use bounded_vec_deque::BoundedVecDeque;

0 comments on commit 2b3cce4

Please sign in to comment.