Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Feb 6, 2024
1 parent 40c8f65 commit f20234c
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 86 deletions.
6 changes: 3 additions & 3 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
dag_store::PersistentDagStore,
dag_store::DagStore,
observability::counters::{NUM_NODES_PER_BLOCK, NUM_ROUNDS_PER_BLOCK},
};
use crate::{
Expand Down Expand Up @@ -91,7 +91,7 @@ pub(crate) fn compute_initial_block_and_ledger_info(

pub(super) struct OrderedNotifierAdapter {
executor_channel: UnboundedSender<OrderedBlocks>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
parent_block_info: Arc<RwLock<BlockInfo>>,
epoch_state: Arc<EpochState>,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
Expand All @@ -101,7 +101,7 @@ pub(super) struct OrderedNotifierAdapter {
impl OrderedNotifierAdapter {
pub(super) fn new(
executor_channel: UnboundedSender<OrderedBlocks>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
epoch_state: Arc<EpochState>,
parent_block_info: BlockInfo,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
dag_handler::NetworkHandler,
dag_network::TDAGNetworkSender,
dag_state_sync::{DagStateSynchronizer, StateSyncTrigger},
dag_store::PersistentDagStore,
dag_store::DagStore,
health::{ChainHealthBackoff, HealthBackoff, PipelineLatencyBasedBackpressure, TChainHealth},
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
Expand Down Expand Up @@ -70,7 +70,7 @@ use tokio_retry::strategy::ExponentialBackoff;

#[derive(Clone)]
struct BootstrapBaseState {
dag_store: Arc<PersistentDagStore>,
dag_store: Arc<DagStore>,
order_rule: OrderRule,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
ordered_notifier: Arc<OrderedNotifierAdapter>,
Expand Down Expand Up @@ -500,7 +500,7 @@ impl DagBootstrapper {
.saturating_sub(dag_window_size_config),
);

let dag = Arc::new(PersistentDagStore::new(
let dag = Arc::new(DagStore::new(
self.epoch_state.clone(),
self.storage.clone(),
self.payload_manager.clone(),
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{dag_store::PersistentDagStore, health::HealthBackoff};
use super::{dag_store::DagStore, health::HealthBackoff};
use crate::{
dag::{
adapter::TLedgerInfoProvider,
Expand Down Expand Up @@ -44,7 +44,7 @@ use tokio_retry::strategy::ExponentialBackoff;
pub(crate) struct DagDriver {
author: Author,
epoch_state: Arc<EpochState>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
Expand All @@ -65,7 +65,7 @@ impl DagDriver {
pub fn new(
author: Author,
epoch_state: Arc<EpochState>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
Expand Down
14 changes: 7 additions & 7 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{dag_store::PersistentDagStore, DAGRpcResult};
use super::{dag_store::DagStore, DAGRpcResult};
use crate::dag::{
dag_network::{RpcResultWithResponder, TDAGNetworkSender},
errors::FetchRequestHandleError,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl LocalFetchRequest {

pub struct DagFetcherService {
inner: DagFetcher,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
request_rx: Receiver<LocalFetchRequest>,
ordered_authors: Vec<Author>,
}
Expand All @@ -138,7 +138,7 @@ impl DagFetcherService {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
time_service: TimeService,
config: DagFetcherConfig,
) -> (
Expand Down Expand Up @@ -224,7 +224,7 @@ pub trait TDagFetcher: Send {
&self,
remote_request: RemoteFetchRequest,
responders: Vec<Author>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
) -> anyhow::Result<()>;
}

Expand Down Expand Up @@ -257,7 +257,7 @@ impl TDagFetcher for DagFetcher {
&self,
remote_request: RemoteFetchRequest,
responders: Vec<Author>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
) -> anyhow::Result<()> {
debug!(
LogSchema::new(LogEvent::FetchNodes),
Expand Down Expand Up @@ -316,12 +316,12 @@ impl TDagFetcher for DagFetcher {
}

pub struct FetchRequestHandler {
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
author_to_index: HashMap<Author, usize>,
}

impl FetchRequestHandler {
pub fn new(dag: Arc<PersistentDagStore>, epoch_state: Arc<EpochState>) -> Self {
pub fn new(dag: Arc<DagStore>, epoch_state: Arc<EpochState>) -> Self {
Self {
dag,
author_to_index: epoch_state.verifier.address_to_validator_index().clone(),
Expand Down
16 changes: 8 additions & 8 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{
adapter::TLedgerInfoProvider,
dag_fetcher::TDagFetcher,
dag_store::PersistentDagStore,
dag_store::DagStore,
storage::DAGStorage,
types::{CertifiedNodeMessage, RemoteFetchRequest},
ProofNotifier,
Expand Down Expand Up @@ -44,7 +44,7 @@ impl fmt::Display for SyncOutcome {
pub(super) struct StateSyncTrigger {
epoch_state: Arc<EpochState>,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
dag_store: Arc<PersistentDagStore>,
dag_store: Arc<DagStore>,
proof_notifier: Arc<dyn ProofNotifier>,
dag_window_size_config: Round,
}
Expand All @@ -53,7 +53,7 @@ impl StateSyncTrigger {
pub(super) fn new(
epoch_state: Arc<EpochState>,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
dag_store: Arc<PersistentDagStore>,
dag_store: Arc<DagStore>,
proof_notifier: Arc<dyn ProofNotifier>,
dag_window_size_config: Round,
) -> Self {
Expand Down Expand Up @@ -185,9 +185,9 @@ impl DagStateSynchronizer {
pub(crate) fn build_request(
&self,
node: &CertifiedNodeMessage,
current_dag_store: Arc<PersistentDagStore>,
current_dag_store: Arc<DagStore>,
highest_committed_anchor_round: Round,
) -> (RemoteFetchRequest, Vec<Author>, Arc<PersistentDagStore>) {
) -> (RemoteFetchRequest, Vec<Author>, Arc<DagStore>) {
let commit_li = node.ledger_info();

{
Expand All @@ -211,7 +211,7 @@ impl DagStateSynchronizer {
.commit_info()
.round()
.saturating_sub(self.dag_window_size_config);
let sync_dag_store = Arc::new(PersistentDagStore::new_empty(
let sync_dag_store = Arc::new(DagStore::new_empty(
self.epoch_state.clone(),
self.storage.clone(),
self.payload_manager.clone(),
Expand Down Expand Up @@ -239,9 +239,9 @@ impl DagStateSynchronizer {
dag_fetcher: impl TDagFetcher,
request: RemoteFetchRequest,
responders: Vec<Author>,
sync_dag_store: Arc<PersistentDagStore>,
sync_dag_store: Arc<DagStore>,
commit_li: LedgerInfoWithSignatures,
) -> anyhow::Result<PersistentDagStore> {
) -> anyhow::Result<DagStore> {
match dag_fetcher
.fetch(request, responders, sync_dag_store.clone())
.await
Expand Down
35 changes: 21 additions & 14 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl NodeStatus {
}
/// Data structure that stores the in-memory DAG representation, it maintains round based index.
#[derive(Clone)]
pub struct Dag {
pub struct InMemDag {
nodes_by_round: BTreeMap<Round, Vec<Option<NodeStatus>>>,
/// Map between peer id to vector index
author_to_index: HashMap<Author, usize>,
Expand All @@ -51,7 +51,7 @@ pub struct Dag {
window_size: u64,
}

impl Dag {
impl InMemDag {
pub fn new_empty(epoch_state: Arc<EpochState>, start_round: Round, window_size: u64) -> Self {
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
let nodes_by_round = BTreeMap::new();
Expand All @@ -65,19 +65,18 @@ impl Dag {
}

pub(crate) fn lowest_round(&self) -> Round {
*self
.nodes_by_round
.first_key_value()
.map(|(round, _)| round)
.unwrap_or(&self.start_round)
self.start_round
}

pub fn highest_round(&self) -> Round {
// If stale nodes exist on the BTreeMap, ignore their rounds when calculating
// the highest round.
*self
.nodes_by_round
.last_key_value()
.map(|(round, _)| round)
.unwrap_or(&self.start_round)
.max(&self.start_round)
}

/// The highest strong links round is either the highest round or the highest round - 1
Expand All @@ -95,6 +94,14 @@ impl Dag {
}

fn add_validated_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> {
let round = node.round();
ensure!(
round >= self.lowest_round(),
"dag was pruned. given round: {}, lowest round: {}",
round,
self.lowest_round()
);

let node = Arc::new(node);
let round_ref = self
.get_node_ref_mut(node.round(), node.author())
Expand Down Expand Up @@ -373,13 +380,13 @@ impl Dag {
}
}

pub struct PersistentDagStore {
dag: RwLock<Dag>,
pub struct DagStore {
dag: RwLock<InMemDag>,
storage: Arc<dyn DAGStorage>,
payload_manager: Arc<dyn TPayloadManager>,
}

impl PersistentDagStore {
impl DagStore {
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
Expand Down Expand Up @@ -424,7 +431,7 @@ impl PersistentDagStore {
start_round: Round,
window_size: u64,
) -> Self {
let dag = Dag::new_empty(epoch_state, start_round, window_size);
let dag = InMemDag::new_empty(epoch_state, start_round, window_size);
Self {
dag: RwLock::new(dag),
storage,
Expand All @@ -433,7 +440,7 @@ impl PersistentDagStore {
}

pub fn new_for_test(
dag: Dag,
dag: InMemDag,
storage: Arc<dyn DAGStorage>,
payload_manager: Arc<dyn TPayloadManager>,
) -> Self {
Expand Down Expand Up @@ -476,8 +483,8 @@ impl PersistentDagStore {
}
}

impl Deref for PersistentDagStore {
type Target = RwLock<Dag>;
impl Deref for DagStore {
type Target = RwLock<InMemDag>;

fn deref(&self) -> &Self::Target {
&self.dag
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::dag_store::PersistentDagStore;
use super::dag_store::DagStore;
use crate::dag::{
adapter::OrderedNotifier,
anchor_election::AnchorElection,
Expand All @@ -25,7 +25,7 @@ pub struct OrderRule {
epoch_state: Arc<EpochState>,
// TODO: try to share order rule, instead of this Arc.
lowest_unordered_anchor_round: Arc<RwLock<Round>>,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
anchor_election: Arc<dyn AnchorElection>,
notifier: Arc<dyn OrderedNotifier>,
dag_window_size_config: Round,
Expand All @@ -35,7 +35,7 @@ impl OrderRule {
pub fn new(
epoch_state: Arc<EpochState>,
lowest_unordered_anchor_round: Round,
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
anchor_election: Arc<dyn AnchorElection>,
notifier: Arc<dyn OrderedNotifier>,
dag_window_size_config: Round,
Expand Down
35 changes: 15 additions & 20 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::health::HealthBackoff;
use crate::{
dag::{
dag_fetcher::TFetchRequester,
dag_network::RpcHandler,
dag_store::Dag,
errors::NodeBroadcastHandleError,
observability::{
logging::{LogEvent, LogSchema},
tracing::{observe_node, NodeStage},
},
storage::DAGStorage,
types::{Node, NodeCertificate, Vote},
NodeId,
util::is_vtxn_expected,
}
};
use super::{dag_store::PersistentDagStore, health::HealthBackoff};
use super::{dag_store::DagStore, health::HealthBackoff};
use crate::{dag::{
dag_fetcher::TFetchRequester,
dag_network::RpcHandler,
errors::NodeBroadcastHandleError,
observability::{
logging::{LogEvent, LogSchema},
tracing::{observe_node, NodeStage},
},
storage::DAGStorage,
types::{Node, NodeCertificate, Vote},
NodeId,
}, util::is_vtxn_expected};
use anyhow::{bail, ensure};
use aptos_config::config::DagPayloadConfig;
use aptos_consensus_types::common::{Author, Round};
Expand All @@ -33,7 +28,7 @@ use async_trait::async_trait;
use std::{collections::BTreeMap, mem, sync::Arc};

pub(crate) struct NodeBroadcastHandler {
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
votes_by_round_peer: BTreeMap<Round, BTreeMap<Author, Vote>>,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
Expand All @@ -47,7 +42,7 @@ pub(crate) struct NodeBroadcastHandler {

impl NodeBroadcastHandler {
pub fn new(
dag: Arc<PersistentDagStore>,
dag: Arc<DagStore>,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
Expand Down
Loading

0 comments on commit f20234c

Please sign in to comment.