Skip to content

Commit

Permalink
[dag] stop voting based on pipeline latency
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Feb 6, 2024
1 parent 7c0ada5 commit 0ffbe17
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 285 deletions.
547 changes: 269 additions & 278 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions config/src/config/dag_consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,32 @@ impl Default for DagRoundStateConfig {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DagHealthConfig {
pub chain_backoff_config: Vec<ChainHealthBackoffValues>,
pub voter_pipeline_latency_limit_ms: u64,
pub pipeline_backpressure_config: Vec<PipelineBackpressureValues>,
}

impl Default for DagHealthConfig {
fn default() -> Self {
Self {
chain_backoff_config: Vec::new(),
voter_pipeline_latency_limit_ms: 30_000,
pipeline_backpressure_config: Vec::new(),
}
}
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DagConsensusConfig {
pub node_payload_config: DagPayloadConfig,
pub rb_config: ReliableBroadcastConfig,
pub fetcher_config: DagFetcherConfig,
pub round_state_config: DagRoundStateConfig,
pub chain_backoff_config: Vec<ChainHealthBackoffValues>,
pub pipeline_backpressure_config: Vec<PipelineBackpressureValues>,
pub health_config: DagHealthConfig,
#[serde(default = "QuorumStoreConfig::default_for_dag")]
pub quorum_store: QuorumStoreConfig,
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl OrderedNotifier for OrderedNotifierAdapter {
commit_decision: LedgerInfoWithSignatures| {
block_created_ts
.write()
.split_off(&(commit_decision.commit_info().round() + 1));
.retain(|&round, _| round > commit_decision.commit_info().round());
dag.write()
.commit_callback(commit_decision.commit_info().round());
ledger_info_provider
Expand Down
13 changes: 10 additions & 3 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,17 @@ impl DagBootstrapper {
);

let chain_health: Arc<dyn TChainHealth> = ChainHealthBackoff::new(
ChainHealthBackoffConfig::new(self.config.chain_backoff_config.clone()),
ChainHealthBackoffConfig::new(self.config.health_config.chain_backoff_config.clone()),
commit_history.clone(),
);
let pipeline_health = PipelineLatencyBasedBackpressure::new(
PipelineBackpressureConfig::new(self.config.pipeline_backpressure_config.clone()),
Duration::from_millis(self.config.health_config.voter_pipeline_latency_limit_ms),
PipelineBackpressureConfig::new(
self.config
.health_config
.pipeline_backpressure_config
.clone(),
),
ordered_notifier.clone(),
);
let health_backoff =
Expand All @@ -616,7 +622,7 @@ impl DagBootstrapper {
round_state,
self.onchain_config.dag_ordering_causal_history_window as Round,
self.config.node_payload_config.clone(),
health_backoff,
health_backoff.clone(),
self.quorum_store_enabled,
);
let rb_handler = NodeBroadcastHandler::new(
Expand All @@ -628,6 +634,7 @@ impl DagBootstrapper {
self.config.node_payload_config.clone(),
self.vtxn_config.clone(),
self.features.clone(),
health_backoff,
);
let fetch_handler = FetchRequestHandler::new(dag_store.clone(), self.epoch_state.clone());

Expand Down
2 changes: 2 additions & 0 deletions consensus/src/dag/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub enum NodeBroadcastHandleError {
MissingParents,
#[error("stale round number")]
StaleRound(Round),
#[error("refused to vote")]
VoteRefused,
}

#[derive(Clone, Debug, ThisError, Serialize, Deserialize)]
Expand Down
5 changes: 5 additions & 0 deletions consensus/src/dag/health/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use aptos_consensus_types::common::Round;
use aptos_types::epoch_state::EpochState;
use std::{sync::Arc, time::Duration};

#[derive(Clone)]
pub struct HealthBackoff {
epoch_state: Arc<EpochState>,
chain_health: Arc<dyn TChainHealth>,
Expand Down Expand Up @@ -77,4 +78,8 @@ impl HealthBackoff {
.unwrap_or_default()
.max(pipeline_backoff.unwrap_or_default())
}

pub fn stop_voting(&self) -> bool {
self.pipeline_health.stop_voting()
}
}
14 changes: 14 additions & 0 deletions consensus/src/dag/health/pipeline_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub trait TPipelineHealth: Send + Sync {
fn get_backoff(&self) -> Option<Duration>;

fn get_payload_limits(&self) -> Option<(u64, u64)>;

fn stop_voting(&self) -> bool;
}

pub struct NoPipelineBackpressure {}
Expand All @@ -27,19 +29,26 @@ impl TPipelineHealth for NoPipelineBackpressure {
fn get_payload_limits(&self) -> Option<(u64, u64)> {
None
}

fn stop_voting(&self) -> bool {
false
}
}

pub struct PipelineLatencyBasedBackpressure {
voter_pipeline_latency_limit: Duration,
pipeline_config: PipelineBackpressureConfig,
adapter: Arc<OrderedNotifierAdapter>,
}

impl PipelineLatencyBasedBackpressure {
pub(in crate::dag) fn new(
voter_pipeline_latency_limit: Duration,
pipeline_config: PipelineBackpressureConfig,
adapter: Arc<OrderedNotifierAdapter>,
) -> Arc<Self> {
Arc::new(Self {
voter_pipeline_latency_limit,
pipeline_config,
adapter,
})
Expand All @@ -63,4 +72,9 @@ impl TPipelineHealth for PipelineLatencyBasedBackpressure {
)
})
}

fn stop_voting(&self) -> bool {
let latency = self.adapter.pipeline_pending_latency();
latency > self.voter_pipeline_latency_limit
}
}
10 changes: 10 additions & 0 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use aptos_types::{
use async_trait::async_trait;
use std::{collections::BTreeMap, mem, sync::Arc};

use super::health::HealthBackoff;

pub(crate) struct NodeBroadcastHandler {
dag: Arc<RwLock<Dag>>,
votes_by_round_peer: BTreeMap<Round, BTreeMap<Author, Vote>>,
Expand All @@ -41,6 +43,7 @@ pub(crate) struct NodeBroadcastHandler {
payload_config: DagPayloadConfig,
vtxn_config: ValidatorTxnConfig,
features: Features,
health_backoff: HealthBackoff,
}

impl NodeBroadcastHandler {
Expand All @@ -53,6 +56,7 @@ impl NodeBroadcastHandler {
payload_config: DagPayloadConfig,
vtxn_config: ValidatorTxnConfig,
features: Features,
health_backoff: HealthBackoff,
) -> Self {
let epoch = epoch_state.epoch;
let votes_by_round_peer = read_votes_from_storage(&storage, epoch);
Expand All @@ -67,6 +71,7 @@ impl NodeBroadcastHandler {
payload_config,
vtxn_config,
features,
health_backoff,
}
}

Expand Down Expand Up @@ -189,6 +194,11 @@ impl RpcHandler for NodeBroadcastHandler {
type Response = Vote;

async fn process(&mut self, node: Self::Request) -> anyhow::Result<Self::Response> {
ensure!(
!self.health_backoff.stop_voting(),
NodeBroadcastHandleError::VoteRefused
);

let node = self.validate(node)?;
observe_node(node.timestamp(), NodeStage::NodeReceived);
debug!(LogSchema::new(LogEvent::ReceiveNode)
Expand Down
37 changes: 36 additions & 1 deletion consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::dag::{
dag_fetcher::TFetchRequester,
dag_store::Dag,
errors::NodeBroadcastHandleError,
health::{HealthBackoff, NoChainHealth, NoPipelineBackpressure},
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
tests::{
Expand Down Expand Up @@ -57,6 +58,12 @@ async fn test_node_broadcast_receiver_succeed() {
TEST_DAG_WINDOW,
)));

let health_backoff = HealthBackoff::new(
epoch_state.clone(),
NoChainHealth::new(),
NoPipelineBackpressure::new(),
);

let wellformed_node = new_node(1, 10, signers[0].author(), vec![]);
let equivocating_node = new_node(1, 20, signers[0].author(), vec![]);

Expand All @@ -70,7 +77,11 @@ async fn test_node_broadcast_receiver_succeed() {
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
<<<<<<< HEAD
Features::default(),
=======
health_backoff,
>>>>>>> 85978d5bc8 ([dag] stop voting based on pipeline latency)
);

let expected_result = Vote::new(
Expand Down Expand Up @@ -117,7 +128,15 @@ async fn test_node_broadcast_receiver_failure() {
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
<<<<<<< HEAD
Features::default(),
=======
HealthBackoff::new(
epoch_state.clone(),
NoChainHealth::new(),
NoPipelineBackpressure::new(),
),
>>>>>>> 85978d5bc8 ([dag] stop voting based on pipeline latency)
)
})
.collect();
Expand Down Expand Up @@ -201,7 +220,15 @@ async fn test_node_broadcast_receiver_storage() {
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
<<<<<<< HEAD
Features::default(),
=======
HealthBackoff::new(
epoch_state.clone(),
NoChainHealth::new(),
NoPipelineBackpressure::new(),
),
>>>>>>> 85978d5bc8 ([dag] stop voting based on pipeline latency)
);
let sig = rb_receiver.process(node).await.expect("must succeed");

Expand All @@ -213,12 +240,20 @@ async fn test_node_broadcast_receiver_storage() {
let mut rb_receiver = NodeBroadcastHandler::new(
dag,
signers[3].clone(),
epoch_state,
epoch_state.clone(),
storage.clone(),
Arc::new(MockFetchRequester {}),
DagPayloadConfig::default(),
ValidatorTxnConfig::default_disabled(),
<<<<<<< HEAD
Features::default(),
=======
HealthBackoff::new(
epoch_state,
NoChainHealth::new(),
NoPipelineBackpressure::new(),
),
>>>>>>> 85978d5bc8 ([dag] stop voting based on pipeline latency)
);
assert_ok!(rb_receiver.gc_before_round(2));
assert_eq!(storage.get_votes().unwrap().len(), 0);
Expand Down

0 comments on commit 0ffbe17

Please sign in to comment.