diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index f57c5c50cbf5d..a0cc1cffea0bc 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -12,8 +12,6 @@ use aptos_crypto::{ HashValue, }; use aptos_crypto_derive::CryptoHasher; -use aptos_executor_types::ExecutorResult; -use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_types::{ account_address::AccountAddress, transaction::SignedTransaction, @@ -25,10 +23,8 @@ use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, fmt::{self, Write}, - sync::Arc, u64, }; -use tokio::sync::oneshot; /// The round of a block is a consensus-internal counter, which starts with 0 and increases /// monotonically. It is used for the protocol safety and liveness (please see the detailed @@ -127,40 +123,14 @@ pub struct RejectedTransactionSummary { pub reason: DiscardedVMStatus, } -#[derive(Debug)] -pub enum DataStatus { - Cached(Vec), - Requested( - Vec<( - HashValue, - oneshot::Receiver>>, - )>, - ), -} - -impl DataStatus { - pub fn extend(&mut self, other: DataStatus) { - match (self, other) { - (DataStatus::Requested(v1), DataStatus::Requested(v2)) => v1.extend(v2), - (_, _) => unreachable!(), - } - } - - pub fn take(&mut self) -> DataStatus { - std::mem::replace(self, DataStatus::Requested(vec![])) - } -} - #[derive(Deserialize, Serialize, Clone, Debug)] pub struct ProofWithData { pub proofs: Vec, - #[serde(skip)] - pub status: Arc>>, } impl PartialEq for ProofWithData { fn eq(&self, other: &Self) -> bool { - self.proofs == other.proofs && Arc::as_ptr(&self.status) == Arc::as_ptr(&other.status) + self.proofs == other.proofs } } @@ -168,10 +138,7 @@ impl Eq for ProofWithData {} impl ProofWithData { pub fn new(proofs: Vec) -> Self { - Self { - proofs, - status: Arc::new(Mutex::new(None)), - } + Self { proofs } } pub fn empty() -> Self { @@ -180,14 +147,7 @@ impl ProofWithData { #[allow(clippy::unwrap_used)] pub fn extend(&mut self, other: ProofWithData) { - let other_data_status = other.status.lock().as_mut().unwrap().take(); self.proofs.extend(other.proofs); - let mut status = self.status.lock(); - if status.is_none() { - *status = Some(other_data_status); - } else { - status.as_mut().unwrap().extend(other_data_status); - } } pub fn len(&self) -> usize { diff --git a/consensus/consensus-types/src/payload.rs b/consensus/consensus-types/src/payload.rs index d4dd7b26db86a..915082ae84693 100644 --- a/consensus/consensus-types/src/payload.rs +++ b/consensus/consensus-types/src/payload.rs @@ -3,19 +3,12 @@ use crate::proof_of_store::{BatchInfo, ProofOfStore}; use anyhow::ensure; -use aptos_executor_types::ExecutorResult; -use aptos_infallible::Mutex; use aptos_types::{transaction::SignedTransaction, PeerId}; use core::fmt; -use futures::{ - future::{BoxFuture, Shared}, - FutureExt, -}; use serde::{Deserialize, Serialize}; use std::{ fmt::Debug, ops::{Deref, DerefMut}, - sync::Arc, }; pub type OptBatches = BatchPointer; @@ -32,37 +25,9 @@ pub trait TDataInfo { fn signers(&self, ordered_authors: &[PeerId]) -> Vec; } -pub struct DataFetchFut { - pub iteration: u32, - pub responders: Vec>>>, - pub fut: Shared>>>, -} - -impl fmt::Debug for DataFetchFut { - fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Ok(()) - } -} - -impl DataFetchFut { - pub fn extend(&mut self, other: DataFetchFut) { - let self_fut = self.fut.clone(); - self.fut = async move { - let result1 = self_fut.await?; - let result2 = other.fut.await?; - let result = [result1, result2].concat(); - Ok(result) - } - .boxed() - .shared(); - } -} - #[derive(Deserialize, Serialize, Clone, Debug)] pub struct BatchPointer { pub batch_summary: Vec, - #[serde(skip)] - pub data_fut: Arc>>, } impl BatchPointer @@ -72,21 +37,11 @@ where pub fn new(metadata: Vec) -> Self { Self { batch_summary: metadata, - data_fut: Arc::new(Mutex::new(None)), } } pub fn extend(&mut self, other: BatchPointer) { - let other_data_status = other.data_fut.lock().take().expect("must be initialized"); self.batch_summary.extend(other.batch_summary); - let mut status = self.data_fut.lock(); - *status = match &mut *status { - None => Some(other_data_status), - Some(status) => { - status.extend(other_data_status); - return; - }, - }; } pub fn num_txns(&self) -> usize { @@ -115,7 +70,6 @@ where fn from(value: Vec) -> Self { Self { batch_summary: value, - data_fut: Arc::new(Mutex::new(None)), } } } @@ -123,7 +77,6 @@ where impl PartialEq for BatchPointer { fn eq(&self, other: &Self) -> bool { self.batch_summary == other.batch_summary - && Arc::as_ptr(&self.data_fut) == Arc::as_ptr(&other.data_fut) } } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index 9e223e252a63d..5804bed94c3ed 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -371,8 +371,11 @@ impl BlockStore { ); if let Some(payload) = block.payload() { - self.payload_manager - .prefetch_payload_data(payload, block.timestamp_usecs()); + self.payload_manager.prefetch_payload_data( + payload, + block.author().expect("Payload block must have author"), + block.timestamp_usecs(), + ); } let pipelined_block = PipelinedBlock::new_ordered(block.clone()); diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index 07b20edf8a922..55738bd38f2ad 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -399,7 +399,11 @@ impl BlockStore { for (i, block) in blocks.iter().enumerate() { assert_eq!(block.id(), quorum_certs[i].certified_block().id()); if let Some(payload) = block.payload() { - payload_manager.prefetch_payload_data(payload, block.timestamp_usecs()); + payload_manager.prefetch_payload_data( + payload, + block.author().expect("payload block must have author"), + block.timestamp_usecs(), + ); } } diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 3fe94b0dcd128..86f28637d5252 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -526,8 +526,11 @@ impl DagStore { self.storage.save_certified_node(&node)?; debug!("Added node {}", node.id()); - self.payload_manager - .prefetch_payload_data(node.payload(), node.metadata().timestamp()); + self.payload_manager.prefetch_payload_data( + node.payload(), + *node.author(), + node.metadata().timestamp(), + ); self.dag.write().add_validated_node(node) } diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs index 281ace993dc0d..4e701230f9a09 100644 --- a/consensus/src/dag/tests/helpers.rs +++ b/consensus/src/dag/tests/helpers.rs @@ -23,7 +23,7 @@ pub(super) struct MockPayloadManager {} #[async_trait] impl TPayloadManager for MockPayloadManager { - fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {} + fn prefetch_payload_data(&self, _payload: &Payload, _author: Author, _timestamp: u64) {} fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec) {} diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 14ed035334525..12948cbed477f 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -1646,8 +1646,11 @@ impl EpochManager

{ proposal_event @ VerifiedEvent::ProposalMsg(_) => { if let VerifiedEvent::ProposalMsg(p) = &proposal_event { if let Some(payload) = p.proposal().payload() { - payload_manager - .prefetch_payload_data(payload, p.proposal().timestamp_usecs()); + payload_manager.prefetch_payload_data( + payload, + p.proposer(), + p.proposal().timestamp_usecs(), + ); } pending_blocks.lock().insert_block(p.proposal().clone()); } diff --git a/consensus/src/payload_manager/co_payload_manager.rs b/consensus/src/payload_manager/co_payload_manager.rs new file mode 100644 index 0000000000000..8fd63b44bb39e --- /dev/null +++ b/consensus/src/payload_manager/co_payload_manager.rs @@ -0,0 +1,115 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + consensus_observer::{ + network::observer_message::ConsensusObserverMessage, + observer::payload_store::BlockPayloadStatus, + publisher::consensus_publisher::ConsensusPublisher, + }, + payload_manager::TPayloadManager, +}; +use aptos_bitvec::BitVec; +use aptos_consensus_types::{ + block::Block, + common::{Author, Payload, Round}, +}; +use aptos_crypto::HashValue; +use aptos_executor_types::{ExecutorError::InternalError, *}; +use aptos_infallible::Mutex; +use aptos_types::transaction::SignedTransaction; +use async_trait::async_trait; +use std::{ + collections::{btree_map::Entry, BTreeMap}, + sync::Arc, +}; + +/// Returns the transactions for the consensus observer payload manager +async fn get_transactions_for_observer( + block: &Block, + block_payloads: &Arc>>, + consensus_publisher: &Option>, +) -> ExecutorResult<(Vec, Option)> { + // The data should already be available (as consensus observer will only ever + // forward a block to the executor once the data has been received and verified). + let block_payload = match block_payloads.lock().entry((block.epoch(), block.round())) { + Entry::Occupied(mut value) => match value.get_mut() { + BlockPayloadStatus::AvailableAndVerified(block_payload) => block_payload.clone(), + BlockPayloadStatus::AvailableAndUnverified(_) => { + // This shouldn't happen (the payload should already be verified) + let error = format!( + "Payload data for block epoch {}, round {} is unverified!", + block.epoch(), + block.round() + ); + return Err(InternalError { error }); + }, + }, + Entry::Vacant(_) => { + // This shouldn't happen (the payload should already be present) + let error = format!( + "Missing payload data for block epoch {}, round {}!", + block.epoch(), + block.round() + ); + return Err(InternalError { error }); + }, + }; + + // If the payload is valid, publish it to any downstream observers + let transaction_payload = block_payload.transaction_payload(); + if let Some(consensus_publisher) = consensus_publisher { + let message = ConsensusObserverMessage::new_block_payload_message( + block.gen_block_info(HashValue::zero(), 0, None), + transaction_payload.clone(), + ); + consensus_publisher.publish_message(message); + } + + // Return the transactions and the transaction limit + Ok(( + transaction_payload.transactions(), + transaction_payload.limit(), + )) +} + +pub struct ConsensusObserverPayloadManager { + txns_pool: Arc>>, + consensus_publisher: Option>, +} + +impl ConsensusObserverPayloadManager { + pub fn new( + txns_pool: Arc>>, + consensus_publisher: Option>, + ) -> Self { + Self { + txns_pool, + consensus_publisher, + } + } +} + +#[async_trait] +impl TPayloadManager for ConsensusObserverPayloadManager { + fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec) { + // noop + } + + fn prefetch_payload_data(&self, _payload: &Payload, _author: Author, _timestamp: u64) { + // noop + } + + fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { + unreachable!("this method isn't used in ConsensusObserver") + } + + async fn get_transactions( + &self, + block: &Block, + _block_signers: Option, + ) -> ExecutorResult<(Vec, Option)> { + return get_transactions_for_observer(block, &self.txns_pool, &self.consensus_publisher) + .await; + } +} diff --git a/consensus/src/payload_manager/direct_mempool_payload_manager.rs b/consensus/src/payload_manager/direct_mempool_payload_manager.rs new file mode 100644 index 0000000000000..0cd3cd4963244 --- /dev/null +++ b/consensus/src/payload_manager/direct_mempool_payload_manager.rs @@ -0,0 +1,53 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::payload_manager::TPayloadManager; +use aptos_bitvec::BitVec; +use aptos_consensus_types::{ + block::Block, + common::{Author, Payload}, +}; +use aptos_executor_types::*; +use aptos_types::transaction::SignedTransaction; +use async_trait::async_trait; + +/// A payload manager that directly returns the transactions in a block's payload. +pub struct DirectMempoolPayloadManager {} + +impl DirectMempoolPayloadManager { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl TPayloadManager for DirectMempoolPayloadManager { + fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec) {} + + fn prefetch_payload_data(&self, _payload: &Payload, _author: Author, _timestamp: u64) {} + + fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { + Ok(()) + } + + async fn get_transactions( + &self, + block: &Block, + _block_signers: Option, + ) -> ExecutorResult<(Vec, Option)> { + let Some(payload) = block.payload() else { + return Ok((Vec::new(), None)); + }; + + match payload { + Payload::DirectMempool(txns) => Ok((txns.clone(), None)), + _ => unreachable!( + "DirectMempoolPayloadManager: Unacceptable payload type {}. Epoch: {}, Round: {}, Block: {}", + payload, + block.block_data().epoch(), + block.block_data().round(), + block.id() + ), + } + } +} diff --git a/consensus/src/payload_manager/mod.rs b/consensus/src/payload_manager/mod.rs new file mode 100644 index 0000000000000..269515ed16e30 --- /dev/null +++ b/consensus/src/payload_manager/mod.rs @@ -0,0 +1,44 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_bitvec::BitVec; +use aptos_consensus_types::{ + block::Block, + common::{Author, Payload}, +}; +use aptos_executor_types::*; +use aptos_types::transaction::SignedTransaction; +use async_trait::async_trait; + +mod co_payload_manager; +mod direct_mempool_payload_manager; +mod quorum_store_payload_manager; + +pub use co_payload_manager::ConsensusObserverPayloadManager; +pub use direct_mempool_payload_manager::DirectMempoolPayloadManager; +pub use quorum_store_payload_manager::{QuorumStoreCommitNotifier, QuorumStorePayloadManager}; + +/// A trait that defines the interface for a payload manager. The payload manager is responsible for +/// resolving the transactions in a block's payload. +#[async_trait] +pub trait TPayloadManager: Send + Sync { + /// Notify the payload manager that a block has been committed. This indicates that the + /// transactions in the block's payload are no longer required for consensus. + fn notify_commit(&self, block_timestamp: u64, payloads: Vec); + + /// Prefetch the data for a payload. This is used to ensure that the data for a payload is + /// available when block is executed. + fn prefetch_payload_data(&self, payload: &Payload, author: Author, timestamp: u64); + + /// Check if the transactions corresponding are available. This is specific to payload + /// manager implementations. For optimistic quorum store, we only check if optimistic + /// batches are available locally. + fn check_payload_availability(&self, block: &Block) -> Result<(), BitVec>; + + /// Get the transactions in a block's payload. This function returns a vector of transactions. + async fn get_transactions( + &self, + block: &Block, + block_voters: Option, + ) -> ExecutorResult<(Vec, Option)>; +} diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager/quorum_store_payload_manager.rs similarity index 51% rename from consensus/src/payload_manager.rs rename to consensus/src/payload_manager/quorum_store_payload_manager.rs index 86a537edb940b..dc85b7260c227 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager/quorum_store_payload_manager.rs @@ -4,99 +4,54 @@ use crate::{ consensus_observer::{ network::observer_message::{BlockTransactionPayload, ConsensusObserverMessage}, - observer::payload_store::BlockPayloadStatus, publisher::consensus_publisher::ConsensusPublisher, }, counters, + payload_manager::TPayloadManager, quorum_store::{batch_store::BatchReader, quorum_store_coordinator::CoordinatorCommand}, }; use aptos_bitvec::BitVec; use aptos_consensus_types::{ block::Block, - common::{DataStatus, Payload, ProofWithData, Round}, - payload::{BatchPointer, DataFetchFut, TDataInfo}, + common::{Author, Payload, ProofWithData}, + payload::{BatchPointer, TDataInfo}, proof_of_store::BatchInfo, }; use aptos_crypto::HashValue; -use aptos_executor_types::{ - ExecutorError::{DataNotFound, InternalError}, - *, -}; -use aptos_infallible::Mutex; +use aptos_executor_types::*; use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, PeerId}; use async_trait::async_trait; -use futures::{channel::mpsc::Sender, FutureExt}; +use futures::{channel::mpsc::Sender, future::Shared}; use itertools::Itertools; -use std::{ - collections::{btree_map::Entry, BTreeMap, HashMap}, - ops::Deref, - sync::Arc, -}; -use tokio::sync::oneshot; - -/// A trait that defines the interface for a payload manager. The payload manager is responsible for -/// resolving the transactions in a block's payload. -#[async_trait] -pub trait TPayloadManager: Send + Sync { - /// Notify the payload manager that a block has been committed. This indicates that the - /// transactions in the block's payload are no longer required for consensus. - fn notify_commit(&self, block_timestamp: u64, payloads: Vec); - - /// Prefetch the data for a payload. This is used to ensure that the data for a payload is - /// available when block is executed. - fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64); +use std::{collections::HashMap, future::Future, ops::Deref, pin::Pin, sync::Arc}; - /// Check if the transactions corresponding are available. This is specific to payload - /// manager implementations. For optimistic quorum store, we only check if optimistic - /// batches are available locally. - fn check_payload_availability(&self, block: &Block) -> Result<(), BitVec>; - - /// Get the transactions in a block's payload. This function returns a vector of transactions. - async fn get_transactions( - &self, - block: &Block, - block_voters: Option, - ) -> ExecutorResult<(Vec, Option)>; +pub trait TQuorumStoreCommitNotifier: Send + Sync { + fn notify(&self, block_timestamp: u64, batches: Vec); } -/// A payload manager that directly returns the transactions in a block's payload. -pub struct DirectMempoolPayloadManager {} - -impl DirectMempoolPayloadManager { - pub fn new() -> Self { - Self {} - } +pub struct QuorumStoreCommitNotifier { + coordinator_tx: Sender, } -#[async_trait] -impl TPayloadManager for DirectMempoolPayloadManager { - fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec) {} - - fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {} - - fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { - Ok(()) +impl QuorumStoreCommitNotifier { + pub fn new(coordinator_tx: Sender) -> Self { + Self { coordinator_tx } } +} - async fn get_transactions( - &self, - block: &Block, - _block_signers: Option, - ) -> ExecutorResult<(Vec, Option)> { - let Some(payload) = block.payload() else { - return Ok((Vec::new(), None)); - }; +impl TQuorumStoreCommitNotifier for QuorumStoreCommitNotifier { + fn notify(&self, block_timestamp: u64, batches: Vec) { + let mut tx = self.coordinator_tx.clone(); - match payload { - Payload::DirectMempool(txns) => Ok((txns.clone(), None)), - _ => unreachable!( - "DirectMempoolPayloadManager: Unacceptable payload type {}. Epoch: {}, Round: {}, Block: {}", - payload, - block.block_data().epoch(), - block.block_data().round(), - block.id() - ), + if let Err(e) = tx.try_send(CoordinatorCommand::CommitNotification( + block_timestamp, + batches, + )) { + warn!( + "CommitNotification failed. Is the epoch shutting down? error: {}", + e + ); } } } @@ -104,7 +59,7 @@ impl TPayloadManager for DirectMempoolPayloadManager { /// A payload manager that resolves the transactions in a block's payload from the quorum store. pub struct QuorumStorePayloadManager { batch_reader: Arc, - coordinator_tx: Sender, + commit_notifier: Box, maybe_consensus_publisher: Option>, ordered_authors: Vec, address_to_validator_index: HashMap, @@ -113,14 +68,14 @@ pub struct QuorumStorePayloadManager { impl QuorumStorePayloadManager { pub fn new( batch_reader: Arc, - coordinator_tx: Sender, + commit_notifier: Box, maybe_consensus_publisher: Option>, ordered_authors: Vec, address_to_validator_index: HashMap, ) -> Self { Self { batch_reader, - coordinator_tx, + commit_notifier, maybe_consensus_publisher, ordered_authors, address_to_validator_index, @@ -128,14 +83,12 @@ impl QuorumStorePayloadManager { } fn request_transactions( - batches: Vec<(BatchInfo, Arc>>)>, + batches: Vec<(BatchInfo, Vec)>, block_timestamp: u64, batch_reader: Arc, - ) -> Vec<( - HashValue, - oneshot::Receiver>>, - )> { - let mut receivers = Vec::new(); + ) -> Vec>> + Send>>>> + { + let mut futures = Vec::new(); for (batch_info, responders) in batches { trace!( "QSE: requesting batch {:?}, time = {}", @@ -143,19 +96,25 @@ impl QuorumStorePayloadManager { block_timestamp ); if block_timestamp <= batch_info.expiration() { - receivers.push(( - *batch_info.digest(), - batch_reader.get_batch( - *batch_info.digest(), - batch_info.expiration(), - responders.clone(), - ), - )); + futures.push(batch_reader.get_batch(batch_info, responders.clone())); } else { debug!("QSE: skipped expired batch {}", batch_info.digest()); } } - receivers + futures + } + + async fn request_and_wait_transactions( + batches: Vec<(BatchInfo, Vec)>, + block_timestamp: u64, + batch_reader: Arc, + ) -> ExecutorResult> { + let futures = Self::request_transactions(batches, block_timestamp, batch_reader); + let mut all_txns = Vec::new(); + for result in futures::future::join_all(futures).await { + all_txns.append(&mut result?); + } + Ok(all_txns) } } @@ -200,77 +159,53 @@ impl TPayloadManager for QuorumStorePayloadManager { }) .collect(); - let mut tx = self.coordinator_tx.clone(); - - if let Err(e) = tx.try_send(CoordinatorCommand::CommitNotification( - block_timestamp, - batches, - )) { - warn!( - "CommitNotification failed. Is the epoch shutting down? error: {}", - e - ); - } + self.commit_notifier.notify(block_timestamp, batches); } - fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) { + fn prefetch_payload_data(&self, payload: &Payload, author: Author, timestamp: u64) { // This is deprecated. // TODO(ibalajiarun): Remove this after migrating to OptQuorumStore type let request_txns_and_update_status = move |proof_with_status: &ProofWithData, batch_reader: Arc| { - if proof_with_status.status.lock().is_some() { - return; - } - let receivers = Self::request_transactions( + Self::request_transactions( proof_with_status .proofs .iter() .map(|proof| { ( proof.info().clone(), - Arc::new(Mutex::new(proof.shuffled_signers(&self.ordered_authors))), + proof.shuffled_signers(&self.ordered_authors), ) }) .collect(), timestamp, batch_reader, ); - proof_with_status - .status - .lock() - .replace(DataStatus::Requested(receivers)); }; fn prefetch_helper( data_pointer: &BatchPointer, batch_reader: Arc, + author: Option, timestamp: u64, ordered_authors: &[PeerId], ) { - let mut data_fut = data_pointer.data_fut.lock(); - if data_fut.is_some() { - return; - } - - let (batches_and_responders, responders) = data_pointer + let batches_and_responders = data_pointer .batch_summary .iter() - .map(|proof| { - let signers = proof.signers(ordered_authors); - let responders = Arc::new(Mutex::new(signers)); - // TODO(ibalajiarun): Add block author to signers - ((proof.info().clone(), responders.clone()), responders) + .map(|data_info| { + let mut signers = data_info.signers(ordered_authors); + if let Some(author) = author { + signers.push(author); + } + (data_info.info().clone(), signers) }) - .unzip(); - let fut = - request_txns_from_quorum_store(batches_and_responders, timestamp, batch_reader) - .boxed() - .shared(); - *data_fut = Some(DataFetchFut { - fut, - iteration: 0, - responders, - }) + .collect(); + QuorumStorePayloadManager::request_transactions( + batches_and_responders, + timestamp, + batch_reader, + ); } match payload { @@ -293,12 +228,14 @@ impl TPayloadManager for QuorumStorePayloadManager { prefetch_helper( opt_qs_payload.opt_batches(), self.batch_reader.clone(), + Some(author), timestamp, &self.ordered_authors, ); prefetch_helper( opt_qs_payload.proof_with_data(), self.batch_reader.clone(), + None, timestamp, &self.ordered_authors, ) @@ -509,88 +446,6 @@ impl TPayloadManager for QuorumStorePayloadManager { } } -/// Returns the transactions for the consensus observer payload manager -async fn get_transactions_for_observer( - block: &Block, - block_payloads: &Arc>>, - consensus_publisher: &Option>, -) -> ExecutorResult<(Vec, Option)> { - // The data should already be available (as consensus observer will only ever - // forward a block to the executor once the data has been received and verified). - let block_payload = match block_payloads.lock().entry((block.epoch(), block.round())) { - Entry::Occupied(mut value) => match value.get_mut() { - BlockPayloadStatus::AvailableAndVerified(block_payload) => block_payload.clone(), - BlockPayloadStatus::AvailableAndUnverified(_) => { - // This shouldn't happen (the payload should already be verified) - let error = format!( - "Payload data for block epoch {}, round {} is unverified!", - block.epoch(), - block.round() - ); - return Err(InternalError { error }); - }, - }, - Entry::Vacant(_) => { - // This shouldn't happen (the payload should already be present) - let error = format!( - "Missing payload data for block epoch {}, round {}!", - block.epoch(), - block.round() - ); - return Err(InternalError { error }); - }, - }; - - // If the payload is valid, publish it to any downstream observers - let transaction_payload = block_payload.transaction_payload(); - if let Some(consensus_publisher) = consensus_publisher { - let message = ConsensusObserverMessage::new_block_payload_message( - block.gen_block_info(HashValue::zero(), 0, None), - transaction_payload.clone(), - ); - consensus_publisher.publish_message(message); - } - - // Return the transactions and the transaction limit - Ok(( - transaction_payload.transactions(), - transaction_payload.limit(), - )) -} - -async fn request_txns_from_quorum_store( - batches_and_responders: Vec<(BatchInfo, Arc>>)>, - timestamp: u64, - batch_reader: Arc, -) -> ExecutorResult> { - let mut vec_ret = Vec::new(); - let receivers = QuorumStorePayloadManager::request_transactions( - batches_and_responders, - timestamp, - batch_reader, - ); - for (digest, rx) in receivers { - match rx.await { - Err(e) => { - // We probably advanced epoch already. - warn!( - "Oneshot channel to get a batch was dropped with error {:?}", - e - ); - return Err(DataNotFound(digest)); - }, - Ok(Ok(data)) => { - vec_ret.push(data); - }, - Ok(Err(e)) => { - return Err(e); - }, - } - } - let ret: Vec = vec_ret.into_iter().flatten().collect(); - Ok(ret) -} - async fn process_optqs_payload( data_ptr: &BatchPointer, batch_reader: Arc, @@ -598,16 +453,6 @@ async fn process_optqs_payload( ordered_authors: &[PeerId], additional_peers_to_request: Option<&BitVec>, ) -> ExecutorResult> { - let (iteration, fut, existing_responders) = { - let data_fut_guard = data_ptr.data_fut.lock(); - let data_fut = data_fut_guard.as_ref().expect("must be initialized"); - ( - data_fut.iteration, - data_fut.fut.clone(), - data_fut.responders.clone(), - ) - }; - let mut signers = Vec::new(); if let Some(peers) = additional_peers_to_request { for i in peers.iter_ones() { @@ -616,50 +461,27 @@ async fn process_optqs_payload( } } } - - // Append the additional peers to existing responders list - // NB: this might append the same signers multiple times, but this - // should be very rare and has no negative effects. - for responders in existing_responders { - responders.lock().append(&mut signers.clone()); + if let Some(author) = block.author() { + signers.push(author); } - let result = fut.await; + let batches_and_responders = data_ptr + .batch_summary + .iter() + .map(|summary| { + let mut signers = signers.clone(); + signers.append(&mut summary.signers(ordered_authors)); - // If error, reschedule before returning the result - if result.is_err() { - let mut data_fut_guard = data_ptr.data_fut.lock(); - let data_fut = data_fut_guard.as_mut().expect("must be initialized"); - // Protection against race, check the iteration number before rescheduling. - if data_fut.iteration == iteration { - let (batches_and_responders, responders) = data_ptr - .batch_summary - .iter() - .map(|summary| { - let mut signers = signers.clone(); - signers.append(&mut summary.signers(ordered_authors)); - if let Some(author) = block.author() { - signers.push(author); - } - let responders = Arc::new(Mutex::new(signers)); + (summary.info().clone(), signers) + }) + .collect(); - ((summary.info().clone(), responders.clone()), responders) - }) - .unzip(); - *data_fut = DataFetchFut { - fut: request_txns_from_quorum_store( - batches_and_responders, - block.timestamp_usecs(), - batch_reader, - ) - .boxed() - .shared(), - iteration: iteration + 1, - responders, - }; - } - } - result + QuorumStorePayloadManager::request_and_wait_transactions( + batches_and_responders, + block.timestamp_usecs(), + batch_reader, + ) + .await } /// This is deprecated. Use `process_payload_helper` instead after migrating to @@ -670,134 +492,19 @@ async fn process_qs_payload( block: &Block, ordered_authors: &[PeerId], ) -> ExecutorResult> { - let status = proof_with_data.status.lock().take(); - match status.expect("Should have been updated before.") { - DataStatus::Cached(data) => { - counters::QUORUM_BATCH_READY_COUNT.inc(); - proof_with_data - .status - .lock() - .replace(DataStatus::Cached(data.clone())); - Ok(data) - }, - DataStatus::Requested(receivers) => { - let _timer = counters::BATCH_WAIT_DURATION.start_timer(); - let mut vec_ret = Vec::new(); - if !receivers.is_empty() { - debug!( - "QSE: waiting for data on {} receivers, block_round {}", - receivers.len(), - block.round() - ); - } - for (digest, rx) in receivers { - match rx.await { - Err(e) => { - // We probably advanced epoch already. - warn!( - "Oneshot channel to get a batch was dropped with error {:?}", - e - ); - let new_receivers = QuorumStorePayloadManager::request_transactions( - proof_with_data - .proofs - .iter() - .map(|proof| { - ( - proof.info().clone(), - Arc::new(Mutex::new( - proof.shuffled_signers(ordered_authors), - )), - ) - }) - .collect(), - block.timestamp_usecs(), - batch_reader.clone(), - ); - // Could not get all data so requested again - proof_with_data - .status - .lock() - .replace(DataStatus::Requested(new_receivers)); - return Err(DataNotFound(digest)); - }, - Ok(Ok(data)) => { - vec_ret.push(data); - }, - Ok(Err(e)) => { - let new_receivers = QuorumStorePayloadManager::request_transactions( - proof_with_data - .proofs - .iter() - .map(|proof| { - ( - proof.info().clone(), - Arc::new(Mutex::new( - proof.shuffled_signers(ordered_authors), - )), - ) - }) - .collect(), - block.timestamp_usecs(), - batch_reader.clone(), - ); - // Could not get all data so requested again - proof_with_data - .status - .lock() - .replace(DataStatus::Requested(new_receivers)); - return Err(e); - }, - } - } - let ret: Vec = vec_ret.into_iter().flatten().collect(); - // execution asks for the data twice, so data is cached here for the second time. - proof_with_data - .status - .lock() - .replace(DataStatus::Cached(ret.clone())); - Ok(ret) - }, - } -} - -pub struct ConsensusObserverPayloadManager { - txns_pool: Arc>>, - consensus_publisher: Option>, -} - -impl ConsensusObserverPayloadManager { - pub fn new( - txns_pool: Arc>>, - consensus_publisher: Option>, - ) -> Self { - Self { - txns_pool, - consensus_publisher, - } - } -} - -#[async_trait] -impl TPayloadManager for ConsensusObserverPayloadManager { - fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec) { - // noop - } - - fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) { - // noop - } - - fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> { - unreachable!("this method isn't used in ConsensusObserver") - } - - async fn get_transactions( - &self, - block: &Block, - _block_signers: Option, - ) -> ExecutorResult<(Vec, Option)> { - return get_transactions_for_observer(block, &self.txns_pool, &self.consensus_publisher) - .await; - } + QuorumStorePayloadManager::request_and_wait_transactions( + proof_with_data + .proofs + .iter() + .map(|proof| { + ( + proof.info().clone(), + proof.shuffled_signers(ordered_authors), + ) + }) + .collect(), + block.timestamp_usecs(), + batch_reader, + ) + .await } diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 322054402e6e5..5f5c4410ad07a 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -9,7 +9,6 @@ use crate::{ types::{BatchRequest, BatchResponse, PersistedValue}, }, }; -use aptos_consensus_types::proof_of_store::BatchInfo; use aptos_crypto::HashValue; use aptos_executor_types::*; use aptos_infallible::Mutex; @@ -17,27 +16,21 @@ use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, validator_verifier::ValidatorVerifier, PeerId}; use futures::{stream::FuturesUnordered, StreamExt}; use rand::Rng; -use std::{sync::Arc, time::Duration}; +use std::{collections::BTreeSet, sync::Arc, time::Duration}; use tokio::{sync::oneshot, time}; struct BatchRequesterState { - signers: Arc>>, + signers: Arc>>, next_index: usize, - ret_tx: oneshot::Sender>>, num_retries: usize, retry_limit: usize, } impl BatchRequesterState { - fn new( - signers: Arc>>, - ret_tx: oneshot::Sender>>, - retry_limit: usize, - ) -> Self { + fn new(signers: Arc>>, retry_limit: usize) -> Self { Self { signers, next_index: 0, - ret_tx, num_retries: 0, retry_limit, } @@ -68,31 +61,6 @@ impl BatchRequesterState { None } } - - fn serve_request(self, digest: HashValue, maybe_payload: Option>) { - if let Some(payload) = maybe_payload { - trace!( - "QS: batch to oneshot, digest {}, tx {:?}", - digest, - self.ret_tx - ); - if self.ret_tx.send(Ok(payload)).is_err() { - debug!( - "Receiver of requested batch not available for digest {}", - digest - ) - }; - } else if self - .ret_tx - .send(Err(ExecutorError::CouldNotGetData)) - .is_err() - { - debug!( - "Receiver of requested batch not available for unavailable digest {}", - digest - ); - } - } } pub(crate) struct BatchRequester { @@ -133,12 +101,11 @@ impl BatchRequester { &self, digest: HashValue, expiration: u64, - responders: Arc>>, - ret_tx: oneshot::Sender>>, + responders: Arc>>, mut subscriber_rx: oneshot::Receiver, - ) -> Option<(BatchInfo, Vec)> { + ) -> ExecutorResult> { let validator_verifier = self.validator_verifier.clone(); - let mut request_state = BatchRequesterState::new(responders, ret_tx, self.retry_limit); + let mut request_state = BatchRequesterState::new(responders, self.retry_limit); let network_sender = self.network_sender.clone(); let request_num_peers = self.request_num_peers; let my_peer_id = self.my_peer_id; @@ -167,11 +134,8 @@ impl BatchRequester { match response { Ok(BatchResponse::Batch(batch)) => { counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); - let digest = *batch.digest(); - let batch_info = batch.batch_info().clone(); let payload = batch.into_transactions(); - request_state.serve_request(digest, Some(payload.clone())); - return Some((batch_info, payload)); + return Ok(payload); } // Short-circuit if the chain has moved beyond expiration Ok(BatchResponse::NotFound(ledger_info)) => { @@ -182,8 +146,7 @@ impl BatchRequester { { counters::RECEIVED_BATCH_EXPIRED_COUNT.inc(); debug!("QS: batch request expired, digest:{}", digest); - request_state.serve_request(digest, None); - return None; + return Err(ExecutorError::CouldNotGetData); } } Err(e) => { @@ -196,9 +159,8 @@ impl BatchRequester { match result { Ok(persisted_value) => { counters::RECEIVED_BATCH_FROM_SUBSCRIPTION_COUNT.inc(); - let (info, maybe_payload) = persisted_value.unpack(); - request_state.serve_request(*info.digest(), maybe_payload); - return None; + let (_, maybe_payload) = persisted_value.unpack(); + return Ok(maybe_payload.expect("persisted value must exist")); } Err(err) => { debug!("channel closed: {}", err); @@ -209,8 +171,7 @@ impl BatchRequester { } counters::RECEIVED_BATCH_REQUEST_TIMEOUT_COUNT.inc(); debug!("QS: batch request timed out, digest:{}", digest); - request_state.serve_request(digest, None); - None + Err(ExecutorError::CouldNotGetData) }) } } diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index 9c594ee228cb5..e565946952fe9 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -23,8 +23,12 @@ use dashmap::{ DashMap, }; use fail::fail_point; +use futures::{future::Shared, FutureExt}; use once_cell::sync::OnceCell; use std::{ + collections::{BTreeSet, HashMap}, + future::Future, + pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -271,12 +275,16 @@ impl BatchStore { let cache_entry = self.db_cache.entry(digest); if let Occupied(entry) = &cache_entry { - if entry.get().expiration() >= expiration_time { - debug!( - "QS: already have the digest with higher expiration {}", - digest - ); - return Ok(false); + match entry.get().expiration().cmp(&expiration_time) { + std::cmp::Ordering::Equal => return Ok(false), + std::cmp::Ordering::Greater => { + debug!( + "QS: already have the digest with higher expiration {}", + digest + ); + return Ok(false); + }, + std::cmp::Ordering::Less => {}, } }; let value_to_be_stored = if self @@ -493,17 +501,22 @@ pub trait BatchReader: Send + Sync { fn get_batch( &self, - digest: HashValue, - expiration: u64, - signers: Arc>>, - ) -> oneshot::Receiver>>; + batch_info: BatchInfo, + signers: Vec, + ) -> Shared>> + Send>>>; fn update_certified_timestamp(&self, certified_time: u64); } +struct BatchFetchUnit { + responders: Arc>>, + fut: Shared>> + Send>>>, +} + pub struct BatchReaderImpl { batch_store: Arc, batch_requester: Arc>, + inflight_fetch_requests: Arc>>, } impl BatchReaderImpl { @@ -511,8 +524,69 @@ impl BatchReaderImpl { Self { batch_store, batch_requester: Arc::new(batch_requester), + inflight_fetch_requests: Arc::new(Mutex::new(HashMap::new())), } } + + fn get_or_fetch_batch( + &self, + batch_info: BatchInfo, + responders: Vec, + ) -> Shared>> + Send>>> { + let mut responders = responders.into_iter().collect(); + + self.inflight_fetch_requests + .lock() + .entry(*batch_info.digest()) + .and_modify(|fetch_unit| { + fetch_unit.responders.lock().append(&mut responders); + }) + .or_insert_with(|| { + let responders = Arc::new(Mutex::new(responders)); + let responders_clone = responders.clone(); + + let subscriber_rx = self.batch_store.subscribe(*batch_info.digest()); + + let inflight_requests_clone = self.inflight_fetch_requests.clone(); + let batch_store = self.batch_store.clone(); + let requester = self.batch_requester.clone(); + + let fut = async move { + let batch_digest = *batch_info.digest(); + defer!({ + inflight_requests_clone.lock().remove(&batch_digest); + }); + if let Ok(mut value) = batch_store.get_batch_from_local(&batch_digest) { + Ok(value.take_payload().expect("Must have payload")) + } else { + // Quorum store metrics + counters::MISSED_BATCHES_COUNT.inc(); + let payload = requester + .request_batch( + batch_digest, + batch_info.expiration(), + responders, + subscriber_rx, + ) + .await?; + batch_store + .persist(vec![PersistedValue::new(batch_info, Some(payload.clone()))]); + Ok(payload) + } + } + .boxed() + .shared(); + + tokio::spawn(fut.clone()); + + BatchFetchUnit { + responders: responders_clone, + fut, + } + }) + .fut + .clone() + } } impl BatchReader for BatchReaderImpl { @@ -525,37 +599,10 @@ impl BatchReader for Batch fn get_batch( &self, - digest: HashValue, - expiration: u64, - signers: Arc>>, - ) -> oneshot::Receiver>> { - let (tx, rx) = oneshot::channel(); - let batch_store = self.batch_store.clone(); - let batch_requester = self.batch_requester.clone(); - tokio::spawn(async move { - if let Ok(mut value) = batch_store.get_batch_from_local(&digest) { - if tx - .send(Ok(value.take_payload().expect("Must have payload"))) - .is_err() - { - debug!( - "Receiver of local batch not available for digest {}", - digest, - ) - }; - } else { - // Quorum store metrics - counters::MISSED_BATCHES_COUNT.inc(); - let subscriber_rx = batch_store.subscribe(digest); - if let Some((batch_info, payload)) = batch_requester - .request_batch(digest, expiration, signers, tx, subscriber_rx) - .await - { - batch_store.persist(vec![PersistedValue::new(batch_info, Some(payload))]); - } - } - }); - rx + batch_info: BatchInfo, + responders: Vec, + ) -> Shared>> + Send>>> { + self.get_or_fetch_batch(batch_info, responders) } fn update_certified_timestamp(&self, certified_time: u64) { diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index e7878e82ee336..3b026294d2508 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -8,7 +8,10 @@ use crate::{ monitor, network::{IncomingBatchRetrievalRequest, NetworkSender}, network_interface::ConsensusMsg, - payload_manager::{DirectMempoolPayloadManager, QuorumStorePayloadManager, TPayloadManager}, + payload_manager::{ + DirectMempoolPayloadManager, QuorumStoreCommitNotifier, QuorumStorePayloadManager, + TPayloadManager, + }, quorum_store::{ batch_coordinator::{BatchCoordinator, BatchCoordinatorCommand}, batch_generator::{BackPressure, BatchGenerator, BatchGeneratorCommand}, @@ -440,7 +443,7 @@ impl InnerBuilder { Arc::from(QuorumStorePayloadManager::new( batch_reader, // TODO: remove after splitting out clean requests - self.coordinator_tx.clone(), + Box::new(QuorumStoreCommitNotifier::new(self.coordinator_tx.clone())), consensus_publisher, self.verifier.get_ordered_account_addresses(), self.verifier.address_to_validator_index().clone(), diff --git a/consensus/src/quorum_store/tests/batch_requester_test.rs b/consensus/src/quorum_store/tests/batch_requester_test.rs index 148fd2278f063..578d232e15110 100644 --- a/consensus/src/quorum_store/tests/batch_requester_test.rs +++ b/consensus/src/quorum_store/tests/batch_requester_test.rs @@ -7,6 +7,7 @@ use crate::{ batch_requester::BatchRequester, types::{Batch, BatchRequest, BatchResponse}, }, + test_utils::create_vec_signed_transactions, }; use aptos_consensus_types::{ common::Author, @@ -21,6 +22,8 @@ use aptos_types::{ validator_signer::ValidatorSigner, validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier}, }; +use claims::{assert_err, assert_ok_eq}; +use maplit::btreeset; use move_core_types::account_address::AccountAddress; use std::{ sync::Arc, @@ -73,9 +76,10 @@ impl QuorumStoreSender for MockBatchRequester { #[tokio::test] async fn test_batch_request_exists() { + let txns = create_vec_signed_transactions(1); let batch = Batch::new( BatchId::new_for_test(1), - vec![], + txns.clone(), 1, 1, AccountAddress::random(), @@ -84,7 +88,6 @@ async fn test_batch_request_exists() { let batch_response = BatchResponse::Batch(batch.clone()); let validator_signer = ValidatorSigner::random(None); - let (tx, mut rx) = tokio::sync::oneshot::channel(); let batch_requester = BatchRequester::new( 1, AccountAddress::random(), @@ -102,16 +105,11 @@ async fn test_batch_request_exists() { .request_batch( *batch.digest(), batch.expiration(), - Arc::new(Mutex::new(vec![AccountAddress::random()])), - tx, + Arc::new(Mutex::new(btreeset![AccountAddress::random()])), subscriber_rx, ) .await; - assert!(result.is_some()); - if let Some((batch_info, _payload)) = result { - assert_eq!(batch_info, *batch.batch_info()); - } - assert!(rx.try_recv().is_ok()); + assert_ok_eq!(result, txns); } fn create_ledger_info_with_timestamp( @@ -179,7 +177,6 @@ async fn test_batch_request_not_exists_not_expired() { AccountAddress::random(), 0, ); - let (tx, mut rx) = tokio::sync::oneshot::channel(); let batch_response = BatchResponse::NotFound(ledger_info_with_signatures); let batch_requester = BatchRequester::new( 1, @@ -198,14 +195,12 @@ async fn test_batch_request_not_exists_not_expired() { .request_batch( *batch.digest(), batch.expiration(), - Arc::new(Mutex::new(vec![AccountAddress::random()])), - tx, + Arc::new(Mutex::new(btreeset![AccountAddress::random()])), subscriber_rx, ) .await; let request_duration = request_start.elapsed(); - assert!(result.is_none()); - assert!(rx.try_recv().is_ok()); + assert_err!(result); // Retried at least once assert!(request_duration > Duration::from_millis(retry_interval_ms as u64)); } @@ -227,7 +222,6 @@ async fn test_batch_request_not_exists_expired() { AccountAddress::random(), 0, ); - let (tx, mut rx) = tokio::sync::oneshot::channel(); let batch_response = BatchResponse::NotFound(ledger_info_with_signatures); let batch_requester = BatchRequester::new( 1, @@ -246,14 +240,12 @@ async fn test_batch_request_not_exists_expired() { .request_batch( *batch.digest(), batch.expiration(), - Arc::new(Mutex::new(vec![AccountAddress::random()])), - tx, + Arc::new(Mutex::new(btreeset![AccountAddress::random()])), subscriber_rx, ) .await; let request_duration = request_start.elapsed(); - assert!(result.is_none()); - assert!(rx.try_recv().is_ok()); + assert_err!(result); // No retry because of short-circuiting of expired batch assert!(request_duration < Duration::from_millis(retry_interval_ms as u64)); } diff --git a/consensus/src/quorum_store/tests/proof_coordinator_test.rs b/consensus/src/quorum_store/tests/proof_coordinator_test.rs index 8ce920aa8c84c..4bd4277f3efcc 100644 --- a/consensus/src/quorum_store/tests/proof_coordinator_test.rs +++ b/consensus/src/quorum_store/tests/proof_coordinator_test.rs @@ -10,15 +10,17 @@ use crate::{ }, test_utils::{create_vec_signed_transactions, mock_quorum_store_sender::MockQuorumStoreSender}, }; -use aptos_consensus_types::proof_of_store::{BatchId, SignedBatchInfo, SignedBatchInfoMsg}; +use aptos_consensus_types::proof_of_store::{ + BatchId, BatchInfo, SignedBatchInfo, SignedBatchInfoMsg, +}; use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; -use aptos_infallible::Mutex; use aptos_types::{ transaction::SignedTransaction, validator_verifier::random_validator_verifier, PeerId, }; +use futures::future::Shared; use mini_moka::sync::Cache; -use std::sync::Arc; +use std::{future::Future, pin::Pin, sync::Arc}; use tokio::sync::mpsc::channel; pub struct MockBatchReader { @@ -32,10 +34,9 @@ impl BatchReader for MockBatchReader { fn get_batch( &self, - _digest: HashValue, - _expiration: u64, - _signers: Arc>>, - ) -> tokio::sync::oneshot::Receiver>> { + _batch_info: BatchInfo, + _signers: Vec, + ) -> Shared>> + Send>>> { unimplemented!() }