Skip to content

Commit

Permalink
[dag] concurrent dag store
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jan 23, 2024
1 parent 39e4b84 commit 60bacf0
Show file tree
Hide file tree
Showing 20 changed files with 556 additions and 491 deletions.
533 changes: 262 additions & 271 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion config/src/config/dag_consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use super::{
config_sanitizer::ConfigSanitizer, node_config_loader::NodeType, ChainHealthBackoffValues,
Error, NodeConfig, QuorumStoreConfig, PipelineBackpressureValues,
Error, NodeConfig, PipelineBackpressureValues, QuorumStoreConfig,
};
use aptos_types::chain_id::ChainId;
use serde::{Deserialize, Serialize};
Expand Down
13 changes: 7 additions & 6 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::observability::counters::{NUM_NODES_PER_BLOCK, NUM_ROUNDS_PER_BLOCK};
use super::{
dag_store::PersistentDagStore,
observability::counters::{NUM_NODES_PER_BLOCK, NUM_ROUNDS_PER_BLOCK},
};
use crate::{
consensusdb::{CertifiedNodeSchema, ConsensusDB, DagVoteSchema, NodeSchema},
counters::update_counters_for_committed_blocks,
dag::{
dag_store::Dag,
storage::{CommitEvent, DAGStorage},
CertifiedNode, Node, NodeId, Vote,
},
Expand Down Expand Up @@ -89,7 +91,7 @@ pub(crate) fn compute_initial_block_and_ledger_info(

pub(super) struct OrderedNotifierAdapter {
executor_channel: UnboundedSender<OrderedBlocks>,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
parent_block_info: Arc<RwLock<BlockInfo>>,
epoch_state: Arc<EpochState>,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
Expand All @@ -99,7 +101,7 @@ pub(super) struct OrderedNotifierAdapter {
impl OrderedNotifierAdapter {
pub(super) fn new(
executor_channel: UnboundedSender<OrderedBlocks>,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
epoch_state: Arc<EpochState>,
parent_block_info: BlockInfo,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
Expand Down Expand Up @@ -207,8 +209,7 @@ impl OrderedNotifier for OrderedNotifierAdapter {
block_created_ts
.write()
.retain(|&round, _| round > commit_decision.commit_info().round());
dag.write()
.commit_callback(commit_decision.commit_info().round());
dag.commit_callback(commit_decision.commit_info().round());
ledger_info_provider
.write()
.notify_commit_proof(commit_decision);
Expand Down
14 changes: 7 additions & 7 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
dag_handler::NetworkHandler,
dag_network::TDAGNetworkSender,
dag_state_sync::{DagStateSynchronizer, StateSyncTrigger},
dag_store::Dag,
dag_store::PersistentDagStore,
health::{ChainHealthBackoff, HealthBackoff, PipelineLatencyBasedBackpressure, TChainHealth},
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
Expand Down Expand Up @@ -60,7 +60,7 @@ use futures_channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use std::{collections::HashMap, fmt, sync::Arc, time::Duration};
use std::{collections::HashMap, fmt, ops::Deref, sync::Arc, time::Duration};
use tokio::{
runtime::Handle,
select,
Expand All @@ -70,7 +70,7 @@ use tokio_retry::strategy::ExponentialBackoff;

#[derive(Clone)]
struct BootstrapBaseState {
dag_store: Arc<RwLock<Dag>>,
dag_store: Arc<PersistentDagStore>,
order_rule: OrderRule,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
ordered_notifier: Arc<OrderedNotifierAdapter>,
Expand Down Expand Up @@ -131,7 +131,7 @@ impl ActiveMode {
) -> Option<Mode> {
info!(
LogSchema::new(LogEvent::ActiveMode)
.round(self.base_state.dag_store.read().highest_round()),
.round(self.base_state.dag_store.deref().read().highest_round()),
highest_committed_round = self
.base_state
.ledger_info_provider
Expand Down Expand Up @@ -497,13 +497,13 @@ impl DagBootstrapper {
.saturating_sub(dag_window_size_config),
);

let dag = Arc::new(RwLock::new(Dag::new(
let dag = Arc::new(PersistentDagStore::new(
self.epoch_state.clone(),
self.storage.clone(),
self.payload_manager.clone(),
initial_round,
dag_window_size_config,
)));
));

let ordered_notifier = Arc::new(OrderedNotifierAdapter::new(
self.ordered_nodes_tx.clone(),
Expand Down Expand Up @@ -590,7 +590,7 @@ impl DagBootstrapper {
);

let chain_health: Arc<dyn TChainHealth> = ChainHealthBackoff::new(
ChainHealthBackoffConfig::new(self.config.chain_backoff_config.clone()),
ChainHealthBackoffConfig::new(self.config.health_config.chain_backoff_config.clone()),
commit_history.clone(),
);
let pipeline_health = PipelineLatencyBasedBackpressure::new(
Expand Down
28 changes: 12 additions & 16 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::health::HealthBackoff;
use super::{dag_store::PersistentDagStore, health::HealthBackoff};
use crate::{
dag::{
adapter::TLedgerInfoProvider,
dag_fetcher::TFetchRequester,
dag_store::Dag,
errors::DagDriverError,
observability::{
counters::{self, NODE_PAYLOAD_SIZE, NUM_TXNS_PER_NODE},
Expand All @@ -28,7 +27,6 @@ use anyhow::bail;
use aptos_config::config::DagPayloadConfig;
use aptos_consensus_types::common::{Author, Payload, PayloadFilter};
use aptos_crypto::hash::CryptoHash;
use aptos_infallible::RwLock;
use aptos_logger::{debug, error};
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_time_service::{TimeService, TimeServiceTrait};
Expand All @@ -46,7 +44,7 @@ use tokio_retry::strategy::ExponentialBackoff;
pub(crate) struct DagDriver {
author: Author,
epoch_state: Arc<EpochState>,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
Expand All @@ -67,7 +65,7 @@ impl DagDriver {
pub fn new(
author: Author,
epoch_state: Arc<EpochState>,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
Expand Down Expand Up @@ -127,28 +125,26 @@ impl DagDriver {

async fn add_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> {
let (highest_strong_link_round, strong_links) = {
let mut dag_writer = self.dag.write();

if !dag_writer.all_exists(node.parents_metadata()) {
if !self.dag.read().all_exists(node.parents_metadata()) {
if let Err(err) = self.fetch_requester.request_for_certified_node(node) {
error!("request to fetch failed: {}", err);
}
bail!(DagDriverError::MissingParents);
}

dag_writer.add_node(node)?;
self.dag.add_node(node)?;

let dag = self.dag.read();
let highest_strong_links_round =
dag_writer.highest_strong_links_round(&self.epoch_state.verifier);
dag.highest_strong_links_round(&self.epoch_state.verifier);
(
highest_strong_links_round,
// unwrap is for round 0
dag_writer
.get_strong_links_for_round(
highest_strong_links_round,
&self.epoch_state.verifier,
)
.unwrap_or_default(),
dag.get_strong_links_for_round(
highest_strong_links_round,
&self.epoch_state.verifier,
)
.unwrap_or_default(),
)
};

Expand Down
19 changes: 8 additions & 11 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::DAGRpcResult;
use super::{dag_store::PersistentDagStore, DAGRpcResult};
use crate::dag::{
dag_network::{RpcResultWithResponder, TDAGNetworkSender},
dag_store::Dag,
errors::FetchRequestHandleError,
observability::logging::{LogEvent, LogSchema},
types::{CertifiedNode, FetchResponse, Node, NodeMetadata, RemoteFetchRequest},
Expand All @@ -14,7 +13,6 @@ use anyhow::{anyhow, ensure};
use aptos_bitvec::BitVec;
use aptos_config::config::DagFetcherConfig;
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_logger::{debug, error, info};
use aptos_time_service::TimeService;
use aptos_types::epoch_state::EpochState;
Expand Down Expand Up @@ -131,7 +129,7 @@ impl LocalFetchRequest {

pub struct DagFetcherService {
inner: DagFetcher,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
request_rx: Receiver<LocalFetchRequest>,
ordered_authors: Vec<Author>,
}
Expand All @@ -140,7 +138,7 @@ impl DagFetcherService {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
time_service: TimeService,
config: DagFetcherConfig,
) -> (
Expand Down Expand Up @@ -226,7 +224,7 @@ pub trait TDagFetcher: Send {
&self,
remote_request: RemoteFetchRequest,
responders: Vec<Author>,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
) -> anyhow::Result<()>;
}

Expand Down Expand Up @@ -259,7 +257,7 @@ impl TDagFetcher for DagFetcher {
&self,
remote_request: RemoteFetchRequest,
responders: Vec<Author>,
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
) -> anyhow::Result<()> {
debug!(
LogSchema::new(LogEvent::FetchNodes),
Expand Down Expand Up @@ -289,9 +287,8 @@ impl TDagFetcher for DagFetcher {
let certified_nodes = fetch_response.certified_nodes();
// TODO: support chunk response or fallback to state sync
{
let mut dag_writer = dag.write();
for node in certified_nodes.into_iter().rev() {
if let Err(e) = dag_writer.add_node(node) {
if let Err(e) = dag.add_node(node) {
error!(error = ?e, "failed to add node");
}
}
Expand Down Expand Up @@ -319,12 +316,12 @@ impl TDagFetcher for DagFetcher {
}

pub struct FetchRequestHandler {
dag: Arc<RwLock<Dag>>,
dag: Arc<PersistentDagStore>,
author_to_index: HashMap<Author, usize>,
}

impl FetchRequestHandler {
pub fn new(dag: Arc<RwLock<Dag>>, epoch_state: Arc<EpochState>) -> Self {
pub fn new(dag: Arc<PersistentDagStore>, epoch_state: Arc<EpochState>) -> Self {
Self {
dag,
author_to_index: epoch_state.verifier.address_to_validator_index().clone(),
Expand Down
21 changes: 10 additions & 11 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{
adapter::TLedgerInfoProvider,
dag_fetcher::TDagFetcher,
dag_store::Dag,
dag_store::PersistentDagStore,
storage::DAGStorage,
types::{CertifiedNodeMessage, RemoteFetchRequest},
ProofNotifier,
Expand All @@ -15,7 +15,6 @@ use crate::{
use anyhow::ensure;
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::RwLock;
use aptos_logger::{debug, error};
use aptos_time_service::TimeService;
use aptos_types::{
Expand Down Expand Up @@ -45,7 +44,7 @@ impl fmt::Display for SyncOutcome {
pub(super) struct StateSyncTrigger {
epoch_state: Arc<EpochState>,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
dag_store: Arc<RwLock<Dag>>,
dag_store: Arc<PersistentDagStore>,
proof_notifier: Arc<dyn ProofNotifier>,
dag_window_size_config: Round,
}
Expand All @@ -54,7 +53,7 @@ impl StateSyncTrigger {
pub(super) fn new(
epoch_state: Arc<EpochState>,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
dag_store: Arc<RwLock<Dag>>,
dag_store: Arc<PersistentDagStore>,
proof_notifier: Arc<dyn ProofNotifier>,
dag_window_size_config: Round,
) -> Self {
Expand Down Expand Up @@ -186,9 +185,9 @@ impl DagStateSynchronizer {
pub(crate) fn build_request(
&self,
node: &CertifiedNodeMessage,
current_dag_store: Arc<RwLock<Dag>>,
current_dag_store: Arc<PersistentDagStore>,
highest_committed_anchor_round: Round,
) -> (RemoteFetchRequest, Vec<Author>, Arc<RwLock<Dag>>) {
) -> (RemoteFetchRequest, Vec<Author>, Arc<PersistentDagStore>) {
let commit_li = node.ledger_info();

{
Expand All @@ -212,13 +211,13 @@ impl DagStateSynchronizer {
.commit_info()
.round()
.saturating_sub(self.dag_window_size_config);
let sync_dag_store = Arc::new(RwLock::new(Dag::new_empty(
let sync_dag_store = Arc::new(PersistentDagStore::new_empty(
self.epoch_state.clone(),
self.storage.clone(),
self.payload_manager.clone(),
start_round,
self.dag_window_size_config,
)));
));
let bitmask = { sync_dag_store.read().bitmask(target_round) };
let request = RemoteFetchRequest::new(
self.epoch_state.epoch,
Expand All @@ -240,9 +239,9 @@ impl DagStateSynchronizer {
dag_fetcher: impl TDagFetcher,
request: RemoteFetchRequest,
responders: Vec<Author>,
sync_dag_store: Arc<RwLock<Dag>>,
sync_dag_store: Arc<PersistentDagStore>,
commit_li: LedgerInfoWithSignatures,
) -> anyhow::Result<Dag> {
) -> anyhow::Result<PersistentDagStore> {
match dag_fetcher
.fetch(request, responders, sync_dag_store.clone())
.await
Expand All @@ -256,7 +255,7 @@ impl DagStateSynchronizer {

self.state_computer.sync_to(commit_li).await?;

Ok(Arc::into_inner(sync_dag_store).unwrap().into_inner())
Ok(Arc::into_inner(sync_dag_store).unwrap())
}
}

Expand Down
Loading

0 comments on commit 60bacf0

Please sign in to comment.