diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index dd9f18096f333..25e982d1b897b 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -235,7 +235,7 @@ impl Deref for SignedBatchInfo { #[derive(Debug, PartialEq)] pub enum SignedBatchInfoError { WrongAuthor, - WrongInfo, + WrongInfo((u64, u64)), DuplicatedSignature, InvalidAuthor, } diff --git a/consensus/src/logging.rs b/consensus/src/logging.rs index 01abc9a95e5ed..20f92b5c992fa 100644 --- a/consensus/src/logging.rs +++ b/consensus/src/logging.rs @@ -22,6 +22,8 @@ pub enum LogEvent { NetworkReceiveProposal, NewEpoch, NewRound, + ProofOfStoreInit, + ProofOfStoreReady, Propose, ReceiveBatchRetrieval, ReceiveBlockRetrieval, diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index 80dd29984e9d7..6ec1aad914ef4 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -13,7 +13,7 @@ use crate::{ use aptos_config::config::QuorumStoreConfig; use aptos_consensus_types::{ common::{TransactionInProgress, TransactionSummary}, - proof_of_store::BatchId, + proof_of_store::{BatchId, BatchInfo}, }; use aptos_logger::prelude::*; use aptos_mempool::QuorumStoreRequest; @@ -28,7 +28,7 @@ use tokio::time::Interval; #[derive(Debug)] pub enum BatchGeneratorCommand { - CommitNotification(u64), + CommitNotification(u64, Vec), ProofExpiration(Vec), Shutdown(tokio::sync::oneshot::Sender<()>), } @@ -420,7 +420,7 @@ impl BatchGenerator { }), Some(cmd) = cmd_rx.recv() => monitor!("batch_generator_handle_command", { match cmd { - BatchGeneratorCommand::CommitNotification(block_timestamp) => { + BatchGeneratorCommand::CommitNotification(block_timestamp, batches) => { trace!( "QS: got clean request from execution, block timestamp {}", block_timestamp @@ -431,10 +431,17 @@ impl BatchGenerator { ); self.latest_block_timestamp = block_timestamp; + for batch_id in batches.iter().map(|b| b.batch_id()) { + if self.remove_batch_in_progress(&batch_id) { + counters::BATCH_IN_PROGRESS_COMMITTED.inc(); + } + } + // Cleans up all batches that expire in timestamp <= block_timestamp. This is // safe since clean request must occur only after execution result is certified. for batch_id in self.batch_expirations.expire(block_timestamp) { if self.remove_batch_in_progress(&batch_id) { + counters::BATCH_IN_PROGRESS_EXPIRED.inc(); debug!( "QS: logical time based expiration batch w. id {} from batches_in_progress, new size {}", batch_id, @@ -445,6 +452,7 @@ impl BatchGenerator { }, BatchGeneratorCommand::ProofExpiration(batch_ids) => { for batch_id in batch_ids { + counters::BATCH_IN_PROGRESS_TIMEOUT.inc(); debug!( "QS: received timeout for proof of store, batch id = {}", batch_id diff --git a/consensus/src/quorum_store/counters.rs b/consensus/src/quorum_store/counters.rs index 543f758171f77..7d269e86f5c53 100644 --- a/consensus/src/quorum_store/counters.rs +++ b/consensus/src/quorum_store/counters.rs @@ -163,6 +163,30 @@ pub static EXCLUDED_TXNS_WHEN_PULL: Lazy = Lazy::new(|| { .unwrap() }); +pub static BATCH_IN_PROGRESS_COMMITTED: Lazy = Lazy::new(|| { + register_int_counter!( + "quorum_store_batch_in_progress_committed", + "Number of batches that are removed from in progress by a commit." + ) + .unwrap() +}); + +pub static BATCH_IN_PROGRESS_EXPIRED: Lazy = Lazy::new(|| { + register_int_counter!( + "quorum_store_batch_in_progress_expired", + "Number of batches that are removed from in progress by a block timestamp expiration." + ) + .unwrap() +}); + +pub static BATCH_IN_PROGRESS_TIMEOUT: Lazy = Lazy::new(|| { + register_int_counter!( + "quorum_store_batch_in_progress_timeout", + "Number of batches that are removed from in progress by a proof collection timeout." + ) + .unwrap() +}); + pub static GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_SAVE: Lazy = Lazy::new( || { register_histogram!( diff --git a/consensus/src/quorum_store/proof_coordinator.rs b/consensus/src/quorum_store/proof_coordinator.rs index 0032be57dd3ef..dd16b60fcde48 100644 --- a/consensus/src/quorum_store/proof_coordinator.rs +++ b/consensus/src/quorum_store/proof_coordinator.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + logging::{LogEvent, LogSchema}, monitor, network::QuorumStoreSender, quorum_store::{ @@ -17,7 +18,7 @@ use aptos_types::{ aggregate_signature::PartialSignatures, validator_verifier::ValidatorVerifier, PeerId, }; use std::{ - collections::{BTreeMap, HashMap}, + collections::{hash_map::Entry, BTreeMap, HashMap}, sync::Arc, time::Duration, }; @@ -29,6 +30,7 @@ use tokio::{ #[derive(Debug)] pub(crate) enum ProofCoordinatorCommand { AppendSignature(SignedBatchInfoMsg), + CommitNotification(Vec), Shutdown(TokioOneshot::Sender<()>), } @@ -57,7 +59,10 @@ impl IncrementalProofState { validator_verifier: &ValidatorVerifier, ) -> Result<(), SignedBatchInfoError> { if signed_batch_info.batch_info() != &self.info { - return Err(SignedBatchInfoError::WrongInfo); + return Err(SignedBatchInfoError::WrongInfo(( + signed_batch_info.batch_id().id, + self.info.batch_id().id, + ))); } if self @@ -124,6 +129,10 @@ impl IncrementalProofState { Err(e) => unreachable!("Cannot aggregate signatures on digest err = {:?}", e), } } + + fn batch_info(&self) -> &BatchInfo { + &self.info + } } pub(crate) struct ProofCoordinator { @@ -186,6 +195,11 @@ impl ProofCoordinator { self.digest_to_time .entry(*signed_batch_info.digest()) .or_insert(chrono::Utc::now().naive_utc().timestamp_micros() as u64); + debug!( + LogSchema::new(LogEvent::ProofOfStoreInit), + digest = signed_batch_info.digest(), + batch_id = signed_batch_info.batch_id().id, + ); Ok(()) } @@ -269,23 +283,39 @@ impl ProofCoordinator { .expect("Failed to send shutdown ack to QuorumStore"); break; }, + ProofCoordinatorCommand::CommitNotification(batches) => { + for batch in batches { + let digest = batch.digest(); + if let Entry::Occupied(existing_proof) = self.digest_to_proof.entry(*digest) { + if batch == *existing_proof.get().batch_info() { + existing_proof.remove(); + } + } + } + }, ProofCoordinatorCommand::AppendSignature(signed_batch_infos) => { let mut proofs = vec![]; for signed_batch_info in signed_batch_infos.take().into_iter() { let peer_id = signed_batch_info.signer(); let digest = *signed_batch_info.digest(); + let batch_id = signed_batch_info.batch_id(); match self.add_signature(signed_batch_info, &validator_verifier) { Ok(result) => { if let Some(proof) = result { - debug!("QS: received quorum of signatures, digest {}", digest); + debug!( + LogSchema::new(LogEvent::ProofOfStoreReady), + digest = digest, + batch_id = batch_id.id, + ); proofs.push(proof); } }, Err(e) => { - // TODO: better error messages - // Can happen if we already garbage collected + // Can happen if we already garbage collected, the commit notification is late, or the peer is misbehaving. if peer_id == self.peer_id { - debug!("QS: could not add signature from self, err = {:?}", e); + info!("QS: could not add signature from self, digest = {}, batch_id = {}, err = {:?}", digest, batch_id, e); + } else { + debug!("QS: could not add signature from peer {}, digest = {}, batch_id = {}, err = {:?}", peer_id, digest, batch_id, e); } }, } diff --git a/consensus/src/quorum_store/quorum_store_coordinator.rs b/consensus/src/quorum_store/quorum_store_coordinator.rs index 14758ca9d5147..089e74c499be8 100644 --- a/consensus/src/quorum_store/quorum_store_coordinator.rs +++ b/consensus/src/quorum_store/quorum_store_coordinator.rs @@ -54,17 +54,25 @@ impl QuorumStoreCoordinator { monitor!("quorum_store_coordinator_loop", { match cmd { CoordinatorCommand::CommitNotification(block_timestamp, batches) => { + // TODO: need a callback or not? + self.proof_coordinator_cmd_tx + .send(ProofCoordinatorCommand::CommitNotification(batches.clone())) + .await + .expect("Failed to send to ProofCoordinator"); + self.proof_manager_cmd_tx .send(ProofManagerCommand::CommitNotification( block_timestamp, - batches, + batches.clone(), )) .await .expect("Failed to send to ProofManager"); - // TODO: need a callback or not? self.batch_generator_cmd_tx - .send(BatchGeneratorCommand::CommitNotification(block_timestamp)) + .send(BatchGeneratorCommand::CommitNotification( + block_timestamp, + batches, + )) .await .expect("Failed to send to BatchGenerator"); }, diff --git a/mempool/src/core_mempool/mempool.rs b/mempool/src/core_mempool/mempool.rs index fb22cbf6753bb..be096d77b01ba 100644 --- a/mempool/src/core_mempool/mempool.rs +++ b/mempool/src/core_mempool/mempool.rs @@ -26,6 +26,7 @@ use aptos_types::{ }; use std::{ collections::{BTreeMap, HashMap, HashSet}, + sync::atomic::Ordering, time::{Duration, Instant, SystemTime}, }; @@ -84,7 +85,7 @@ impl Mempool { is_rejected = true, label = reason_label, ); - self.log_latency(*sender, sequence_number, reason_label); + self.log_commit_rejected_latency(*sender, sequence_number, reason_label); if let Some(ranking_score) = self.transactions.get_ranking_score(sender, sequence_number) { counters::core_mempool_txn_ranking_score( counters::REMOVE_LABEL, @@ -119,7 +120,7 @@ impl Mempool { } pub(crate) fn log_txn_latency( - insertion_info: InsertionInfo, + insertion_info: &InsertionInfo, bucket: &str, stage: &'static str, ) { @@ -133,8 +134,26 @@ impl Mempool { } } - fn log_latency(&self, account: AccountAddress, sequence_number: u64, stage: &'static str) { - if let Some((&insertion_info, bucket)) = self + fn log_consensus_pulled_latency(&self, account: AccountAddress, sequence_number: u64) { + if let Some((insertion_info, bucket)) = self + .transactions + .get_insertion_info_and_bucket(&account, sequence_number) + { + let prev_count = insertion_info + .consensus_pulled_counter + .fetch_add(1, Ordering::Relaxed); + Self::log_txn_latency(insertion_info, bucket, counters::CONSENSUS_PULLED_LABEL); + counters::CORE_MEMPOOL_TXN_CONSENSUS_PULLED.observe((prev_count + 1) as f64); + } + } + + fn log_commit_rejected_latency( + &self, + account: AccountAddress, + sequence_number: u64, + stage: &'static str, + ) { + if let Some((insertion_info, bucket)) = self .transactions .get_insertion_info_and_bucket(&account, sequence_number) { @@ -148,7 +167,7 @@ impl Mempool { sequence_number: u64, block_timestamp: Duration, ) { - if let Some((&insertion_info, bucket)) = self + if let Some((insertion_info, bucket)) = self .transactions .get_insertion_info_and_bucket(&account, sequence_number) { @@ -393,11 +412,7 @@ impl Mempool { counters::mempool_service_transactions(counters::GET_BLOCK_LABEL, block.len()); counters::MEMPOOL_SERVICE_BYTES_GET_BLOCK.observe(total_bytes as f64); for transaction in &block { - self.log_latency( - transaction.sender(), - transaction.sequence_number(), - counters::CONSENSUS_PULLED_LABEL, - ); + self.log_consensus_pulled_latency(transaction.sender(), transaction.sequence_number()); } block } diff --git a/mempool/src/core_mempool/transaction.rs b/mempool/src/core_mempool/transaction.rs index d6850f5c7b129..ed741ce654100 100644 --- a/mempool/src/core_mempool/transaction.rs +++ b/mempool/src/core_mempool/transaction.rs @@ -8,6 +8,7 @@ use aptos_types::{account_address::AccountAddress, transaction::SignedTransactio use serde::{Deserialize, Serialize}; use std::{ mem::size_of, + sync::{atomic::AtomicU8, Arc}, time::{Duration, SystemTime}, }; @@ -107,10 +108,11 @@ pub enum SubmittedBy { PeerValidator, } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Clone)] pub struct InsertionInfo { pub insertion_time: SystemTime, pub submitted_by: SubmittedBy, + pub consensus_pulled_counter: Arc, } impl InsertionInfo { @@ -129,6 +131,7 @@ impl InsertionInfo { Self { insertion_time, submitted_by, + consensus_pulled_counter: Arc::new(AtomicU8::new(0)), } } diff --git a/mempool/src/core_mempool/transaction_store.rs b/mempool/src/core_mempool/transaction_store.rs index cb298dc2f36b4..a53af74136ad9 100644 --- a/mempool/src/core_mempool/transaction_store.rs +++ b/mempool/src/core_mempool/transaction_store.rs @@ -379,7 +379,7 @@ impl TransactionStore { fn log_ready_transaction( ranking_score: u64, bucket: &str, - insertion_info: InsertionInfo, + insertion_info: &InsertionInfo, broadcast_ready: bool, ) { if let Ok(time_delta) = SystemTime::now().duration_since(insertion_info.insertion_time) { @@ -445,7 +445,7 @@ impl TransactionStore { Self::log_ready_transaction( txn.ranking_score, self.timeline_index.get_bucket(txn.ranking_score), - txn.insertion_info, + &txn.insertion_info, process_broadcast_ready, ); } @@ -606,7 +606,7 @@ impl TransactionStore { } let bucket = self.timeline_index.get_bucket(txn.ranking_score); Mempool::log_txn_latency( - txn.insertion_info, + &txn.insertion_info, bucket, BROADCAST_BATCHED_LABEL, ); diff --git a/mempool/src/counters.rs b/mempool/src/counters.rs index 7602d7f0d2490..5d5f14a58da55 100644 --- a/mempool/src/counters.rs +++ b/mempool/src/counters.rs @@ -110,6 +110,8 @@ const RANKING_SCORE_BUCKETS: &[f64] = &[ 10000.0, 14678.0, 21544.0, 31623.0, 46416.0, 68129.0, 100000.0, 146780.0, 215443.0, ]; +const TXN_CONSENSUS_PULLED_BUCKETS: &[f64] = &[1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 25.0, 50.0, 100.0]; + static TRANSACTION_COUNT_BUCKETS: Lazy> = Lazy::new(|| { exponential_buckets( /*start=*/ 1.5, /*factor=*/ 1.5, /*count=*/ 20, @@ -267,6 +269,15 @@ pub static CORE_MEMPOOL_GC_LATENCY: Lazy = Lazy::new(|| { .unwrap() }); +pub static CORE_MEMPOOL_TXN_CONSENSUS_PULLED: Lazy = Lazy::new(|| { + register_histogram!( + "aptos_core_mempool_txn_consensus_pulled", + "Number of times a txn was pulled from core mempool by consensus", + TXN_CONSENSUS_PULLED_BUCKETS.to_vec() + ) + .unwrap() +}); + /// Counter of pending network events to Mempool pub static PENDING_MEMPOOL_NETWORK_EVENTS: Lazy = Lazy::new(|| { register_int_counter_vec!(