Skip to content

Commit

Permalink
[Quorum Store] improve commit notification logic (#11444) (#11454)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho authored Dec 21, 2023
1 parent 2473a85 commit e7eff82
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 27 deletions.
2 changes: 1 addition & 1 deletion consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl Deref for SignedBatchInfo {
#[derive(Debug, PartialEq)]
pub enum SignedBatchInfoError {
WrongAuthor,
WrongInfo,
WrongInfo((u64, u64)),
DuplicatedSignature,
InvalidAuthor,
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum LogEvent {
NetworkReceiveProposal,
NewEpoch,
NewRound,
ProofOfStoreInit,
ProofOfStoreReady,
Propose,
ReceiveBatchRetrieval,
ReceiveBlockRetrieval,
Expand Down
14 changes: 11 additions & 3 deletions consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
use aptos_config::config::QuorumStoreConfig;
use aptos_consensus_types::{
common::{TransactionInProgress, TransactionSummary},
proof_of_store::BatchId,
proof_of_store::{BatchId, BatchInfo},
};
use aptos_logger::prelude::*;
use aptos_mempool::QuorumStoreRequest;
Expand All @@ -28,7 +28,7 @@ use tokio::time::Interval;

#[derive(Debug)]
pub enum BatchGeneratorCommand {
CommitNotification(u64),
CommitNotification(u64, Vec<BatchInfo>),
ProofExpiration(Vec<BatchId>),
Shutdown(tokio::sync::oneshot::Sender<()>),
}
Expand Down Expand Up @@ -420,7 +420,7 @@ impl BatchGenerator {
}),
Some(cmd) = cmd_rx.recv() => monitor!("batch_generator_handle_command", {
match cmd {
BatchGeneratorCommand::CommitNotification(block_timestamp) => {
BatchGeneratorCommand::CommitNotification(block_timestamp, batches) => {
trace!(
"QS: got clean request from execution, block timestamp {}",
block_timestamp
Expand All @@ -431,10 +431,17 @@ impl BatchGenerator {
);
self.latest_block_timestamp = block_timestamp;

for batch_id in batches.iter().map(|b| b.batch_id()) {
if self.remove_batch_in_progress(&batch_id) {
counters::BATCH_IN_PROGRESS_COMMITTED.inc();
}
}

// Cleans up all batches that expire in timestamp <= block_timestamp. This is
// safe since clean request must occur only after execution result is certified.
for batch_id in self.batch_expirations.expire(block_timestamp) {
if self.remove_batch_in_progress(&batch_id) {
counters::BATCH_IN_PROGRESS_EXPIRED.inc();
debug!(
"QS: logical time based expiration batch w. id {} from batches_in_progress, new size {}",
batch_id,
Expand All @@ -445,6 +452,7 @@ impl BatchGenerator {
},
BatchGeneratorCommand::ProofExpiration(batch_ids) => {
for batch_id in batch_ids {
counters::BATCH_IN_PROGRESS_TIMEOUT.inc();
debug!(
"QS: received timeout for proof of store, batch id = {}",
batch_id
Expand Down
24 changes: 24 additions & 0 deletions consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,30 @@ pub static EXCLUDED_TXNS_WHEN_PULL: Lazy<Histogram> = Lazy::new(|| {
.unwrap()
});

pub static BATCH_IN_PROGRESS_COMMITTED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"quorum_store_batch_in_progress_committed",
"Number of batches that are removed from in progress by a commit."
)
.unwrap()
});

pub static BATCH_IN_PROGRESS_EXPIRED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"quorum_store_batch_in_progress_expired",
"Number of batches that are removed from in progress by a block timestamp expiration."
)
.unwrap()
});

pub static BATCH_IN_PROGRESS_TIMEOUT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"quorum_store_batch_in_progress_timeout",
"Number of batches that are removed from in progress by a proof collection timeout."
)
.unwrap()
});

pub static GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_SAVE: Lazy<Histogram> = Lazy::new(
|| {
register_histogram!(
Expand Down
42 changes: 36 additions & 6 deletions consensus/src/quorum_store/proof_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
logging::{LogEvent, LogSchema},
monitor,
network::QuorumStoreSender,
quorum_store::{
Expand All @@ -17,7 +18,7 @@ use aptos_types::{
aggregate_signature::PartialSignatures, validator_verifier::ValidatorVerifier, PeerId,
};
use std::{
collections::{BTreeMap, HashMap},
collections::{hash_map::Entry, BTreeMap, HashMap},
sync::Arc,
time::Duration,
};
Expand All @@ -29,6 +30,7 @@ use tokio::{
#[derive(Debug)]
pub(crate) enum ProofCoordinatorCommand {
AppendSignature(SignedBatchInfoMsg),
CommitNotification(Vec<BatchInfo>),
Shutdown(TokioOneshot::Sender<()>),
}

Expand Down Expand Up @@ -57,7 +59,10 @@ impl IncrementalProofState {
validator_verifier: &ValidatorVerifier,
) -> Result<(), SignedBatchInfoError> {
if signed_batch_info.batch_info() != &self.info {
return Err(SignedBatchInfoError::WrongInfo);
return Err(SignedBatchInfoError::WrongInfo((
signed_batch_info.batch_id().id,
self.info.batch_id().id,
)));
}

if self
Expand Down Expand Up @@ -124,6 +129,10 @@ impl IncrementalProofState {
Err(e) => unreachable!("Cannot aggregate signatures on digest err = {:?}", e),
}
}

fn batch_info(&self) -> &BatchInfo {
&self.info
}
}

pub(crate) struct ProofCoordinator {
Expand Down Expand Up @@ -186,6 +195,11 @@ impl ProofCoordinator {
self.digest_to_time
.entry(*signed_batch_info.digest())
.or_insert(chrono::Utc::now().naive_utc().timestamp_micros() as u64);
debug!(
LogSchema::new(LogEvent::ProofOfStoreInit),
digest = signed_batch_info.digest(),
batch_id = signed_batch_info.batch_id().id,
);
Ok(())
}

Expand Down Expand Up @@ -269,23 +283,39 @@ impl ProofCoordinator {
.expect("Failed to send shutdown ack to QuorumStore");
break;
},
ProofCoordinatorCommand::CommitNotification(batches) => {
for batch in batches {
let digest = batch.digest();
if let Entry::Occupied(existing_proof) = self.digest_to_proof.entry(*digest) {
if batch == *existing_proof.get().batch_info() {
existing_proof.remove();
}
}
}
},
ProofCoordinatorCommand::AppendSignature(signed_batch_infos) => {
let mut proofs = vec![];
for signed_batch_info in signed_batch_infos.take().into_iter() {
let peer_id = signed_batch_info.signer();
let digest = *signed_batch_info.digest();
let batch_id = signed_batch_info.batch_id();
match self.add_signature(signed_batch_info, &validator_verifier) {
Ok(result) => {
if let Some(proof) = result {
debug!("QS: received quorum of signatures, digest {}", digest);
debug!(
LogSchema::new(LogEvent::ProofOfStoreReady),
digest = digest,
batch_id = batch_id.id,
);
proofs.push(proof);
}
},
Err(e) => {
// TODO: better error messages
// Can happen if we already garbage collected
// Can happen if we already garbage collected, the commit notification is late, or the peer is misbehaving.
if peer_id == self.peer_id {
debug!("QS: could not add signature from self, err = {:?}", e);
info!("QS: could not add signature from self, digest = {}, batch_id = {}, err = {:?}", digest, batch_id, e);
} else {
debug!("QS: could not add signature from peer {}, digest = {}, batch_id = {}, err = {:?}", peer_id, digest, batch_id, e);
}
},
}
Expand Down
14 changes: 11 additions & 3 deletions consensus/src/quorum_store/quorum_store_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,25 @@ impl QuorumStoreCoordinator {
monitor!("quorum_store_coordinator_loop", {
match cmd {
CoordinatorCommand::CommitNotification(block_timestamp, batches) => {
// TODO: need a callback or not?
self.proof_coordinator_cmd_tx
.send(ProofCoordinatorCommand::CommitNotification(batches.clone()))
.await
.expect("Failed to send to ProofCoordinator");

self.proof_manager_cmd_tx
.send(ProofManagerCommand::CommitNotification(
block_timestamp,
batches,
batches.clone(),
))
.await
.expect("Failed to send to ProofManager");
// TODO: need a callback or not?

self.batch_generator_cmd_tx
.send(BatchGeneratorCommand::CommitNotification(block_timestamp))
.send(BatchGeneratorCommand::CommitNotification(
block_timestamp,
batches,
))
.await
.expect("Failed to send to BatchGenerator");
},
Expand Down
35 changes: 25 additions & 10 deletions mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use aptos_types::{
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::atomic::Ordering,
time::{Duration, Instant, SystemTime},
};

Expand Down Expand Up @@ -84,7 +85,7 @@ impl Mempool {
is_rejected = true,
label = reason_label,
);
self.log_latency(*sender, sequence_number, reason_label);
self.log_commit_rejected_latency(*sender, sequence_number, reason_label);
if let Some(ranking_score) = self.transactions.get_ranking_score(sender, sequence_number) {
counters::core_mempool_txn_ranking_score(
counters::REMOVE_LABEL,
Expand Down Expand Up @@ -119,7 +120,7 @@ impl Mempool {
}

pub(crate) fn log_txn_latency(
insertion_info: InsertionInfo,
insertion_info: &InsertionInfo,
bucket: &str,
stage: &'static str,
) {
Expand All @@ -133,8 +134,26 @@ impl Mempool {
}
}

fn log_latency(&self, account: AccountAddress, sequence_number: u64, stage: &'static str) {
if let Some((&insertion_info, bucket)) = self
fn log_consensus_pulled_latency(&self, account: AccountAddress, sequence_number: u64) {
if let Some((insertion_info, bucket)) = self
.transactions
.get_insertion_info_and_bucket(&account, sequence_number)
{
let prev_count = insertion_info
.consensus_pulled_counter
.fetch_add(1, Ordering::Relaxed);
Self::log_txn_latency(insertion_info, bucket, counters::CONSENSUS_PULLED_LABEL);
counters::CORE_MEMPOOL_TXN_CONSENSUS_PULLED.observe((prev_count + 1) as f64);
}
}

fn log_commit_rejected_latency(
&self,
account: AccountAddress,
sequence_number: u64,
stage: &'static str,
) {
if let Some((insertion_info, bucket)) = self
.transactions
.get_insertion_info_and_bucket(&account, sequence_number)
{
Expand All @@ -148,7 +167,7 @@ impl Mempool {
sequence_number: u64,
block_timestamp: Duration,
) {
if let Some((&insertion_info, bucket)) = self
if let Some((insertion_info, bucket)) = self
.transactions
.get_insertion_info_and_bucket(&account, sequence_number)
{
Expand Down Expand Up @@ -393,11 +412,7 @@ impl Mempool {
counters::mempool_service_transactions(counters::GET_BLOCK_LABEL, block.len());
counters::MEMPOOL_SERVICE_BYTES_GET_BLOCK.observe(total_bytes as f64);
for transaction in &block {
self.log_latency(
transaction.sender(),
transaction.sequence_number(),
counters::CONSENSUS_PULLED_LABEL,
);
self.log_consensus_pulled_latency(transaction.sender(), transaction.sequence_number());
}
block
}
Expand Down
5 changes: 4 additions & 1 deletion mempool/src/core_mempool/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use aptos_types::{account_address::AccountAddress, transaction::SignedTransactio
use serde::{Deserialize, Serialize};
use std::{
mem::size_of,
sync::{atomic::AtomicU8, Arc},
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -107,10 +108,11 @@ pub enum SubmittedBy {
PeerValidator,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone)]
pub struct InsertionInfo {
pub insertion_time: SystemTime,
pub submitted_by: SubmittedBy,
pub consensus_pulled_counter: Arc<AtomicU8>,
}

impl InsertionInfo {
Expand All @@ -129,6 +131,7 @@ impl InsertionInfo {
Self {
insertion_time,
submitted_by,
consensus_pulled_counter: Arc::new(AtomicU8::new(0)),
}
}

Expand Down
6 changes: 3 additions & 3 deletions mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl TransactionStore {
fn log_ready_transaction(
ranking_score: u64,
bucket: &str,
insertion_info: InsertionInfo,
insertion_info: &InsertionInfo,
broadcast_ready: bool,
) {
if let Ok(time_delta) = SystemTime::now().duration_since(insertion_info.insertion_time) {
Expand Down Expand Up @@ -445,7 +445,7 @@ impl TransactionStore {
Self::log_ready_transaction(
txn.ranking_score,
self.timeline_index.get_bucket(txn.ranking_score),
txn.insertion_info,
&txn.insertion_info,
process_broadcast_ready,
);
}
Expand Down Expand Up @@ -606,7 +606,7 @@ impl TransactionStore {
}
let bucket = self.timeline_index.get_bucket(txn.ranking_score);
Mempool::log_txn_latency(
txn.insertion_info,
&txn.insertion_info,
bucket,
BROADCAST_BATCHED_LABEL,
);
Expand Down
11 changes: 11 additions & 0 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ const RANKING_SCORE_BUCKETS: &[f64] = &[
10000.0, 14678.0, 21544.0, 31623.0, 46416.0, 68129.0, 100000.0, 146780.0, 215443.0,
];

const TXN_CONSENSUS_PULLED_BUCKETS: &[f64] = &[1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 25.0, 50.0, 100.0];

static TRANSACTION_COUNT_BUCKETS: Lazy<Vec<f64>> = Lazy::new(|| {
exponential_buckets(
/*start=*/ 1.5, /*factor=*/ 1.5, /*count=*/ 20,
Expand Down Expand Up @@ -267,6 +269,15 @@ pub static CORE_MEMPOOL_GC_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});

pub static CORE_MEMPOOL_TXN_CONSENSUS_PULLED: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_core_mempool_txn_consensus_pulled",
"Number of times a txn was pulled from core mempool by consensus",
TXN_CONSENSUS_PULLED_BUCKETS.to_vec()
)
.unwrap()
});

/// Counter of pending network events to Mempool
pub static PENDING_MEMPOOL_NETWORK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down

0 comments on commit e7eff82

Please sign in to comment.