From 809457f269bbca15e08cbd442e0740c625cbe0f9 Mon Sep 17 00:00:00 2001 From: George Mitenkov Date: Fri, 6 Dec 2024 21:17:09 +0000 Subject: [PATCH] [move] Benchmarking historical transactions (#15329) A tool to benchmark execution of past transactions. The user has to provide first and last versions of the interval that need to be benchmarked. The tool partitions transactions in the specified closed interval into blocks, and runs all blocks end-to-end, measuring the time. During this run, executor is shared (and so are environment and module caches). There is no commit here, only execution time. For each block, we maintain the state (read-set estimated from 1 run before the benchmarks) on top of which it should run. And so, the benchmark just runs in sequence blocks on top of their initial states (outputs are only used for comparison against the on-chain data). The tool allows one to override configs to experiment with new features, or with how the execution would look like without some features. For now, we support: - enabling features - disabling features In the future, we can add more overrides: gas schedule, modules, etc. --- Cargo.lock | 21 + Cargo.toml | 2 + .../aptos-debugger/src/aptos_debugger.rs | 73 ++-- .../src/transaction_bench_state.rs | 69 ++-- aptos-move/aptos-vm/src/aptos_vm.rs | 67 ++-- aptos-move/aptos-vm/src/block_executor/mod.rs | 2 +- aptos-move/replay-benchmark/Cargo.toml | 30 ++ aptos-move/replay-benchmark/README.md | 84 ++++ aptos-move/replay-benchmark/src/block.rs | 154 +++++++ aptos-move/replay-benchmark/src/diff.rs | 376 ++++++++++++++++++ aptos-move/replay-benchmark/src/generator.rs | 105 +++++ aptos-move/replay-benchmark/src/lib.rs | 10 + aptos-move/replay-benchmark/src/main.rs | 187 +++++++++ aptos-move/replay-benchmark/src/overrides.rs | 90 +++++ aptos-move/replay-benchmark/src/runner.rs | 127 ++++++ aptos-move/replay-benchmark/src/state_view.rs | 82 ++++ aptos-move/replay-benchmark/src/workload.rs | 51 +++ crates/aptos-logger/src/metadata.rs | 108 +++-- .../transaction_slice_metadata.rs | 9 + types/src/write_set.rs | 4 + 20 files changed, 1506 insertions(+), 145 deletions(-) create mode 100644 aptos-move/replay-benchmark/Cargo.toml create mode 100644 aptos-move/replay-benchmark/README.md create mode 100644 aptos-move/replay-benchmark/src/block.rs create mode 100644 aptos-move/replay-benchmark/src/diff.rs create mode 100644 aptos-move/replay-benchmark/src/generator.rs create mode 100644 aptos-move/replay-benchmark/src/lib.rs create mode 100644 aptos-move/replay-benchmark/src/main.rs create mode 100644 aptos-move/replay-benchmark/src/overrides.rs create mode 100644 aptos-move/replay-benchmark/src/runner.rs create mode 100644 aptos-move/replay-benchmark/src/state_view.rs create mode 100644 aptos-move/replay-benchmark/src/workload.rs diff --git a/Cargo.lock b/Cargo.lock index ca3e47878f1d8..80a48ae3ef7c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3566,6 +3566,27 @@ dependencies = [ "tokio-retry", ] +[[package]] +name = "aptos-replay-benchmark" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-block-executor", + "aptos-logger", + "aptos-move-debugger", + "aptos-push-metrics", + "aptos-rest-client", + "aptos-types", + "aptos-vm", + "bcs 0.1.4", + "claims", + "clap 4.5.21", + "parking_lot 0.12.1", + "serde", + "tokio", + "url", +] + [[package]] name = "aptos-resource-viewer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index cc429d5c1435d..7bb1b9b0c15e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ members = [ "aptos-move/move-examples", "aptos-move/mvhashmap", "aptos-move/package-builder", + "aptos-move/replay-benchmark", "aptos-move/script-composer", "aptos-move/vm-genesis", "aptos-node", @@ -419,6 +420,7 @@ aptos-push-metrics = { path = "crates/aptos-push-metrics" } aptos-rate-limiter = { path = "crates/aptos-rate-limiter" } aptos-release-builder = { path = "aptos-move/aptos-release-builder" } aptos-reliable-broadcast = { path = "crates/reliable-broadcast" } +aptos-replay-benchmark = { path = "aptos-move/replay-benchmark" } aptos-resource-viewer = { path = "aptos-move/aptos-resource-viewer" } aptos-rest-client = { path = "crates/aptos-rest-client" } aptos-retrier = { path = "crates/aptos-retrier" } diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 2dc191812cc1a..a27281eceb672 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -1,12 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::{bail, format_err, Result}; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use anyhow::{bail, format_err}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_gas_profiling::{GasProfiler, TransactionGasLog}; use aptos_rest_client::Client; use aptos_types::{ @@ -28,9 +24,7 @@ use aptos_validator_interface::{ AptosValidatorInterface, DBDebuggerInterface, DebuggerStateView, RestDebuggerInterface, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, - data_cache::AsMoveResolver, - AptosVM, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, AptosVM, VMBlockExecutor, }; use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_logging::log_schema::AdapterLogSchema; @@ -47,23 +41,31 @@ impl AptosDebugger { Self { debugger } } - pub fn rest_client(rest_client: Client) -> Result { + pub fn rest_client(rest_client: Client) -> anyhow::Result { Ok(Self::new(Arc::new(RestDebuggerInterface::new(rest_client)))) } - pub fn db + Clone>(db_root_path: P) -> Result { + pub fn db + Clone>(db_root_path: P) -> anyhow::Result { Ok(Self::new(Arc::new(DBDebuggerInterface::open( db_root_path, )?))) } + pub async fn get_committed_transactions( + &self, + begin: Version, + limit: u64, + ) -> anyhow::Result<(Vec, Vec)> { + self.debugger.get_committed_transactions(begin, limit).await + } + pub fn execute_transactions_at_version( &self, version: Version, txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let sig_verified_txns: Vec = txns.into_iter().map(|x| x.into()).collect::>(); let txn_provider = DefaultTxnProvider::new(sig_verified_txns); @@ -114,7 +116,7 @@ impl AptosDebugger { &self, version: Version, txn: SignedTransaction, - ) -> Result<(VMStatus, VMOutput, TransactionGasLog)> { + ) -> anyhow::Result<(VMStatus, VMOutput, TransactionGasLog)> { let state_view = DebuggerStateView::new(self.debugger.clone(), version); let log_context = AdapterLogSchema::new(state_view.id(), 0); let txn = txn @@ -166,11 +168,8 @@ impl AptosDebugger { use_same_block_boundaries: bool, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { - let (txns, txn_infos) = self - .debugger - .get_committed_transactions(begin, limit) - .await?; + ) -> anyhow::Result> { + let (txns, txn_infos) = self.get_committed_transactions(begin, limit).await?; if use_same_block_boundaries { // when going block by block, no need to worry about epoch boundaries @@ -238,7 +237,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let results = self.execute_transactions_at_version( begin, txns, @@ -268,7 +267,7 @@ impl AptosDebugger { repeat_execution_times: u64, concurrency_levels: &[usize], mut txn_infos: Vec, - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; while limit != 0 { println!( @@ -301,7 +300,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; let mut cur = vec![]; let mut cur_version = begin; @@ -336,7 +335,7 @@ impl AptosDebugger { &self, account: AccountAddress, seq: u64, - ) -> Result> { + ) -> anyhow::Result> { self.debugger .get_version_by_account_sequence(account, seq) .await @@ -345,7 +344,7 @@ impl AptosDebugger { pub async fn get_committed_transaction_at_version( &self, version: Version, - ) -> Result<(Transaction, TransactionInfo)> { + ) -> anyhow::Result<(Transaction, TransactionInfo)> { let (mut txns, mut info) = self.debugger.get_committed_transactions(version, 1).await?; let txn = txns.pop().expect("there must be exactly 1 txn in the vec"); @@ -434,20 +433,16 @@ fn execute_block_no_limit( state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { - AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - state_view, - &AptosModuleCacheManager::new(), - BlockExecutorConfig { - local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), - onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), - }, - TransactionSliceMetadata::unknown(), - None, - ) - .map(BlockOutput::into_transaction_outputs_forced) + let executor = AptosVMBlockExecutor::new(); + executor + .execute_block_with_config( + txn_provider, + state_view, + BlockExecutorConfig { + local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + }, + TransactionSliceMetadata::unknown(), + ) + .map(BlockOutput::into_transaction_outputs_forced) } diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index f250de5c60758..81db4cbded0c2 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -3,11 +3,7 @@ use crate::transactions; use aptos_bitvec::BitVec; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_block_partitioner::{ v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig, }; @@ -32,15 +28,15 @@ use aptos_types::{ }, ExecutionStatus, Transaction, TransactionOutput, TransactionStatus, }, - vm_status::VMStatus, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, sharded_block_executor::{ local_executor_shard::{LocalExecutorClient, LocalExecutorService}, ShardedBlockExecutor, }, + VMBlockExecutor, }; use proptest::{collection::vec, prelude::Strategy, strategy::ValueTree, test_runner::TestRunner}; use std::{net::SocketAddr, sync::Arc, time::Instant}; @@ -217,20 +213,18 @@ where ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), + TransactionSliceMetadata::unknown(), + ) + .expect("Sequential block execution should succeed") + .into_transaction_outputs_forced(); + let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) @@ -263,28 +257,25 @@ where fn execute_benchmark_parallel( &self, txn_provider: &DefaultTxnProvider, - concurrency_level_per_shard: usize, + concurrency_level: usize, maybe_block_gas_limit: Option, ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit( - concurrency_level_per_shard, - maybe_block_gas_limit, - ), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit( + concurrency_level, + maybe_block_gas_limit, + ), + TransactionSliceMetadata::unknown(), + ) + .expect("Parallel block execution should succeed") + .into_transaction_outputs_forced(); let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 1b469bcd82c9a..9e3cc84baaa4a 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -2780,35 +2780,31 @@ pub struct AptosVMBlockExecutor { module_cache_manager: AptosModuleCacheManager, } -impl VMBlockExecutor for AptosVMBlockExecutor { - fn new() -> Self { - Self { - module_cache_manager: AptosModuleCacheManager::new(), - } - } - - fn execute_block( +impl AptosVMBlockExecutor { + /// Executes transactions with the specified [BlockExecutorConfig] and returns output for each + /// one of them. + pub fn execute_block_with_config( &self, txn_provider: &DefaultTxnProvider, state_view: &(impl StateView + Sync), - onchain_config: BlockExecutorConfigFromOnchain, + config: BlockExecutorConfig, transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { - fail_point!("move_adapter::execute_block", |_| { + fail_point!("aptos_vm_block_executor::execute_block_with_config", |_| { Err(VMStatus::error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, None, )) }); + let log_context = AdapterLogSchema::new(state_view.id(), 0); + let num_txns = txn_provider.num_txns(); info!( log_context, - "Executing block, transaction count: {}", - txn_provider.num_txns() + "Executing block, transaction count: {}", num_txns ); - let count = txn_provider.num_txns(); - let ret = AptosVMBlockExecutorWrapper::execute_block::< + let result = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, @@ -2816,23 +2812,42 @@ impl VMBlockExecutor for AptosVMBlockExecutor { txn_provider, state_view, &self.module_cache_manager, - BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level: AptosVM::get_concurrency_level(), - allow_fallback: true, - discard_failed_blocks: AptosVM::get_discard_failed_blocks(), - module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), - }, - onchain: onchain_config, - }, + config, transaction_slice_metadata, None, ); - if ret.is_ok() { + if result.is_ok() { // Record the histogram count for transactions per block. - BLOCK_TRANSACTION_COUNT.observe(count as f64); + BLOCK_TRANSACTION_COUNT.observe(num_txns as f64); } - ret + result + } +} + +impl VMBlockExecutor for AptosVMBlockExecutor { + fn new() -> Self { + Self { + module_cache_manager: AptosModuleCacheManager::new(), + } + } + + fn execute_block( + &self, + txn_provider: &DefaultTxnProvider, + state_view: &(impl StateView + Sync), + onchain_config: BlockExecutorConfigFromOnchain, + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + let config = BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level: AptosVM::get_concurrency_level(), + allow_fallback: true, + discard_failed_blocks: AptosVM::get_discard_failed_blocks(), + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + onchain: onchain_config, + }; + self.execute_block_with_config(txn_provider, state_view, config, transaction_slice_metadata) } fn execute_block_sharded>( diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index d5c55eeb957c4..168b1bb4a3719 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -481,7 +481,7 @@ impl< } /// Uses shared thread pool to execute blocks. - pub fn execute_block< + pub(crate) fn execute_block< S: StateView + Sync, L: TransactionCommitHook, TP: TxnProvider + Sync, diff --git a/aptos-move/replay-benchmark/Cargo.toml b/aptos-move/replay-benchmark/Cargo.toml new file mode 100644 index 0000000000000..e531876d494de --- /dev/null +++ b/aptos-move/replay-benchmark/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "aptos-replay-benchmark" +version = "0.1.0" +description = "A tool to replay and locally benchmark on-chain transactions." + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +aptos-block-executor = { workspace = true } +aptos-logger = { workspace = true } +aptos-move-debugger = { workspace = true } +aptos-push-metrics = { workspace = true } +aptos-rest-client = { workspace = true } +aptos-types = { workspace = true } +aptos-vm = { workspace = true } +bcs = { workspace = true } +claims = { workspace = true } +clap = { workspace = true } +parking_lot = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +url = { workspace = true } diff --git a/aptos-move/replay-benchmark/README.md b/aptos-move/replay-benchmark/README.md new file mode 100644 index 0000000000000..15563eb4212fb --- /dev/null +++ b/aptos-move/replay-benchmark/README.md @@ -0,0 +1,84 @@ +## A tool to replay and benchmark past Aptos transactions + + +### Benchmarking and measurements + +This tool allows to benchmark an ordered sequence of past transactions, specifying the first (`--begin-version B`) and the last (`--end-version E`) versions. +Transactions are split into blocks, to mimic on-chain behaviour, and blocks are executed one-by-one using an executor. +During the execution, the time is measured. +Each block runs based on the pre-computed state, so there is no "commit" of block execution outputs. +Similarly, signature verification is also left out. +Hence, the benchmark reports the runtime only. + +The tool supports two ways to measure the time: + + 1. Measuring total execution time for all transactions (default). + 2. Measuring execution time for each of the executed blocks, and reporting all. + To enable this, use `--measure-block-times` flag. + +In both cases, the measurement is repeated at least 3 times (this can be configured by specifying the number of repeats, `N`, using `--num-repeats N`), and the minimum, maximum, average and median times are reported (in microseconds). + +When benchmarking, a list of concurrency levels (`--concurrency-levels L1 L2 ...`) has to be provided. +Concurrency level specifies the number of threads Block-STM will use to execute a block of transactions. +Typically, you want to have the concurrency level to match the number of cores. +If multiple concurrency levels are provided, the benchmark is run for all, reporting the measurements. +This way it is possible to see how concurrency affects the runtime. + +Finally, in order to differentiate between cold and warm states, there is an option to skip measurement for the first few blocks. +By specifying `--num-block-to-skip N`, the tool will not ignore measurements when reporting for the first `N` blocks. + +### State overriding + +The benchmark runs every block on top of the corresponding on-chain state. +However, it is possible to override the state. +Currently, the only supported overrides are feature flags: + + 1. Feature flags can be forcefully enabled (`--enable-features F1 F2 ...`). + 2. Feature flags can be forcefully disabled (`--disable-features F1 F2 ...`). + +Feature flags should be spelled in capital letters, e.g., `ENABLE_LOADER_V2`. +For the full list of available features, see [here](../../types/src/on_chain_config/aptos_features.rs). + +Overriding the feature flags allows to see how having some feature on or off affects the runtime. +For example, if there is a new feature that improves the performance of MoveVM, with overrides it is possible to evaluate it on past transactions. + +### Comparison to on-chain behavior + +Overriding the state can change the execution behavior. +Hence, if any overrides are provided, the tool compares the on-chain outputs to new outputs obtained when execution on top of a modified state. +The diff of comparison is logged, and the users of the tool can evaluate if the differences are significant or not. +If the differences are not significant (e.g., only the gas usage has changed), the execution behaviour still stays the same. +Hence, the time measurements are still representative of the on-chain behavior. + +### HTTP request rate limit quotas + +Transactions are fetched from the fullnode via REST API. +Users should provide fullnode's REST API query endpoint using `--rest-endpoint E` flag. +For example, to fetch mainnet transactions, specify `--rest-endpoint https://mainnet.aptoslabs.com/v1`. + +If too many transactions are fetched and executed (preparing the benchmark pre-executes the specified transactions and reads the state from the remote), it is possible to run into HTTP request rate limits. +To learn more about the API quotas, see https://developers.aptoslabs.com/docs/api-access/quotas. + +It is possible to increase your quota by creating an API key in Aptos Build. +For that, follow instructions here: https://developers.aptoslabs.com/docs/api-access/api-keys. +Then, when using the tool the key can be specified using `--api-key K` flag. + +### Examples + +An end-to-end example for using the tool: + +```commandline +aptos-replay-benchmark --begin-version 1944524532 \ + --end-version 1944524714 \ + --rest-endpoint https://mainnet.aptoslabs.com/v1 \ + --concurrency-levels 2 4 \ + --num-repeats 10 \ + --num-blocks-to-skip 1 \ + --enable-features ENABLE_LOADER_V2 +``` + +Here, mainnet transactions from versions 1944524532 to 1944524714 are benchmarked. +There are two measurements: when Block-STM uses 2 threads, or 4 threads per block. +Each measurement is repeated 10 times, and the overall execution time is reported for each level. +Note that the reported time excludes the first block. +Additionally, `ENABLE_LOADER_V2` feature flag is forcefully enabled to see how it impacts the runtime for past transactions. diff --git a/aptos-move/replay-benchmark/src/block.rs b/aptos-move/replay-benchmark/src/block.rs new file mode 100644 index 0000000000000..f545a2b71116b --- /dev/null +++ b/aptos-move/replay-benchmark/src/block.rs @@ -0,0 +1,154 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + diff::TransactionDiff, + state_view::{ReadSet, ReadSetCapturingStateView}, + workload::Workload, +}; +use aptos_logger::error; +use aptos_types::{ + block_executor::config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, + transaction::{TransactionOutput, Version}, +}; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::collections::HashMap; + +/// Block execution config used for replay benchmarking. +fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { + BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level, + allow_fallback: true, + discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + // For replay, there is no block limit. + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + } +} + +/// Represents a single benchmarking unit: a block of transactions with the input pre-block state. +/// Also stores the comparison of outputs based on the input state to on-chain outputs (recall that +/// input state may contain an override and differ from on-chain pre-block state). +pub struct Block { + /// Stores all data needed to execute this block. + inputs: ReadSet, + /// Stores transactions to execute and benchmark. + workload: Workload, + /// Stores diff results for each transaction output. The number of diffs is always equal to the + /// number of transactions, but they may or may not be empty. + diffs: Vec, +} + +impl Block { + /// Creates a new block for benchmarking by executing transactions on top of an overridden + /// state. If there are any state overrides, transactions are first executed based on the + /// on-chain state for later comparison (otherwise, if there are no overrides diffs are empty). + /// + /// Note: transaction execution is sequential, so that multiple blocks can be constructed in + /// parallel. + pub(crate) fn new( + workload: Workload, + state_view: &(impl StateView + Sync), + state_override: HashMap, + ) -> Self { + // Execute transactions without overrides. + let state_view_without_override = + ReadSetCapturingStateView::new(state_view, HashMap::new()); + let onchain_outputs = execute_workload( + &AptosVMBlockExecutor::new(), + &workload, + &state_view_without_override, + 1, + ); + let _onchain_inputs = state_view_without_override.into_read_set(); + + // Check on-chain outputs do not modify the state we override. If so, benchmarking results + // may not be correct. + let begin = workload + .transaction_slice_metadata() + .begin_version() + .expect("Transaction metadata must be a chunk"); + for (idx, on_chain_output) in onchain_outputs.iter().enumerate() { + for (state_key, _) in on_chain_output.write_set() { + if state_override.contains_key(state_key) { + error!( + "Transaction {} writes to overridden state value for {:?}", + begin + idx as Version, + state_key + ); + } + } + } + + // Execute transactions with an override. + let state_view_with_override = ReadSetCapturingStateView::new(state_view, state_override); + let outputs = execute_workload( + &AptosVMBlockExecutor::new(), + &workload, + &state_view_with_override, + 1, + ); + let inputs = state_view_with_override.into_read_set(); + + // Compute the differences between outputs. + // TODO: We can also compute the differences between the read sets. Maybe we should add it? + let diffs = onchain_outputs + .into_iter() + .zip(outputs) + .map(|(onchain_output, output)| TransactionDiff::from_outputs(onchain_output, output)) + .collect(); + + Self { + inputs, + workload, + diffs, + } + } + + /// Prints the difference in transaction outputs when running with overrides. + pub fn print_diffs(&self) { + let begin = self + .workload + .transaction_slice_metadata() + .begin_version() + .expect("Transaction metadata is a chunk"); + for (idx, diff) in self.diffs.iter().enumerate() { + if !diff.is_empty() { + println!("Transaction {} diff:\n {}\n", begin + idx as Version, diff); + } + } + } + + /// Executes the workload for benchmarking. + pub(crate) fn run(&self, executor: &AptosVMBlockExecutor, concurrency_level: usize) { + execute_workload(executor, &self.workload, &self.inputs, concurrency_level); + } +} + +fn execute_workload( + executor: &AptosVMBlockExecutor, + workload: &Workload, + state_view: &(impl StateView + Sync), + concurrency_level: usize, +) -> Vec { + executor + .execute_block_with_config( + workload.txn_provider(), + state_view, + block_execution_config(concurrency_level), + workload.transaction_slice_metadata(), + ) + .unwrap_or_else(|err| { + panic!( + "Block execution should not fail, but returned an error: {:?}", + err + ) + }) + .into_transaction_outputs_forced() +} diff --git a/aptos-move/replay-benchmark/src/diff.rs b/aptos-move/replay-benchmark/src/diff.rs new file mode 100644 index 0000000000000..ba10c8c1c9701 --- /dev/null +++ b/aptos-move/replay-benchmark/src/diff.rs @@ -0,0 +1,376 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::{ + contract_event::ContractEvent, + state_store::state_key::StateKey, + transaction::{ExecutionStatus, TransactionOutput}, + write_set::{WriteOp, WriteSet}, +}; +use claims::assert_ok; +use std::collections::BTreeMap; + +/// Different parts of [TransactionOutput] that can be different: +/// 1. gas used, +/// 2. status (must be kept since transactions are replayed), +/// 3. events, +/// 4. writes. +/// Note that fine-grained comparison allows for some differences to be okay, e.g., using more gas +/// implies that the fee statement event, the account balance of the fee payer, and the total token +/// supply are different. +#[derive(Eq, PartialEq)] +enum Diff { + GasUsed { + left: u64, + right: u64, + }, + ExecutionStatus { + left: ExecutionStatus, + right: ExecutionStatus, + }, + Event { + left: Option, + right: Option, + }, + WriteSet { + state_key: StateKey, + left: Option, + right: Option, + }, +} + +/// Holds all differences for a pair of transaction outputs. +pub(crate) struct TransactionDiff { + diffs: Vec, +} + +impl TransactionDiff { + /// Given a pair of transaction outputs, computes its [TransactionDiff] that includes the gas + /// used, execution status, events and write sets. + // TODO: Make comparison configurable, so we can skip gas differences, etc. + pub(crate) fn from_outputs(left: TransactionOutput, right: TransactionOutput) -> Self { + let (left_write_set, left_events, left_gas_used, left_transaction_status, _) = + left.unpack(); + let (right_write_set, right_events, right_gas_used, right_transaction_status, _) = + right.unpack(); + + let mut diffs = vec![]; + + // All statuses must be kept, since we are replaying transactions. + let left_execution_status = assert_ok!(left_transaction_status.as_kept_status()); + let right_execution_status = assert_ok!(right_transaction_status.as_kept_status()); + if left_execution_status != right_execution_status { + diffs.push(Diff::ExecutionStatus { + left: left_execution_status, + right: right_execution_status, + }); + } + + if left_gas_used != right_gas_used { + diffs.push(Diff::GasUsed { + left: left_gas_used, + right: right_gas_used, + }); + } + + Self::diff_events(&mut diffs, left_events, right_events); + Self::diff_write_sets(&mut diffs, left_write_set, right_write_set); + + Self { diffs } + } + + /// Returns true if the diff is empty, and transaction outputs match. + pub(crate) fn is_empty(&self) -> bool { + self.diffs.is_empty() + } + + /// Computes the differences between a pair of event vectors, and adds them to the diff. + fn diff_events(diffs: &mut Vec, left: Vec, right: Vec) { + let event_vec_to_map = |events: Vec| { + events + .into_iter() + .map(|event| (event.type_tag().clone(), event)) + .collect::>() + }; + + let left = event_vec_to_map(left); + let mut right = event_vec_to_map(right); + + for (left_ty_tag, left_event) in left { + let maybe_right_event = right.remove(&left_ty_tag); + if maybe_right_event + .as_ref() + .is_some_and(|right_event| left_event.event_data() == right_event.event_data()) + { + continue; + } + + diffs.push(Diff::Event { + left: Some(left_event), + right: maybe_right_event, + }); + } + + for right_event in right.into_values() { + diffs.push(Diff::Event { + left: None, + right: Some(right_event), + }); + } + } + + /// Computes the differences between a pair of write sets, and adds them to the diff. + fn diff_write_sets(diffs: &mut Vec, left: WriteSet, right: WriteSet) { + let left = left.into_mut().into_inner(); + let mut right = right.into_mut().into_inner(); + + for (left_state_key, left_write_op) in left { + let maybe_right_write_op = right.remove(&left_state_key); + if maybe_right_write_op + .as_ref() + .is_some_and(|right_write_op| right_write_op == &left_write_op) + { + continue; + } + + diffs.push(Diff::WriteSet { + state_key: left_state_key, + left: Some(left_write_op), + right: maybe_right_write_op, + }); + } + + for (right_state_key, right_write_op) in right { + diffs.push(Diff::WriteSet { + state_key: right_state_key, + left: None, + right: Some(right_write_op), + }); + } + } +} + +impl std::fmt::Display for TransactionDiff { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, " >>>>> ")?; + for diff in &self.diffs { + match diff { + Diff::GasUsed { left, right } => { + writeln!(f, "[gas used] before: {}, after: {}", left, right)?; + }, + Diff::ExecutionStatus { left, right } => { + writeln!( + f, + "[execution status] before: {:?}, after: {:?}", + left, right + )?; + }, + Diff::Event { left, right } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!( + f, + "[event] {} was not emitted before", + right.unwrap().type_tag().to_canonical_string() + )?; + } else if right.is_none() { + writeln!( + f, + "[event] {} is not emitted anymore", + left.unwrap().type_tag().to_canonical_string() + )?; + } else { + writeln!( + f, + "[event] {} has changed its data", + left.unwrap().type_tag().to_canonical_string() + )?; + } + }, + Diff::WriteSet { + state_key, + left, + right, + } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!(f, "[write] {:?} was not written to before", state_key)?; + } else if right.is_none() { + writeln!(f, "[write] {:?} is not written to anymore", state_key)?; + } else { + writeln!(f, "[write] {:?} has changed its value", state_key)?; + } + }, + } + } + writeln!(f, " <<<<< ") + } +} + +#[cfg(test)] +mod test { + use super::*; + use aptos_types::{ + on_chain_config::CurrentTimeMicroseconds, state_store::state_value::StateValueMetadata, + write_set::WriteSetMut, + }; + + #[test] + fn test_diff_events() { + let mut diffs = vec![]; + + let events_1 = vec![ + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventA", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventB", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventD", vec![0, 1, 2]), + ]; + + let events_2 = vec![ + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventA", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventC", vec![0, 1, 2]), + ContractEvent::new_v2_with_type_tag_str("0x1::event::EventD", vec![0, 1, 3]), + ]; + + let expected_diffs = vec![ + Diff::Event { + left: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventB", + vec![0, 1, 2], + )), + right: None, + }, + Diff::Event { + left: None, + right: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventC", + vec![0, 1, 2], + )), + }, + Diff::Event { + left: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventD", + vec![0, 1, 2], + )), + right: Some(ContractEvent::new_v2_with_type_tag_str( + "0x1::event::EventD", + vec![0, 1, 3], + )), + }, + ]; + + TransactionDiff::diff_events(&mut diffs, events_1, events_2); + + assert_eq!(diffs.len(), 3); + assert!(diffs.iter().all(|diff| expected_diffs.contains(diff))); + } + + #[test] + fn test_diff_write_sets() { + let mut diffs = vec![]; + + let write_set_1 = WriteSetMut::new(vec![ + // Same in 2nd write set. + ( + StateKey::raw(b"key-1"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Does not exist in 2nd write set. + ( + StateKey::raw(b"key-2"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Different from 2nd write-set. + ( + StateKey::raw(b"key-4"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + ( + StateKey::raw(b"key-5"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + ( + StateKey::raw(b"key-6"), + WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 100 }), + ), + ), + ]) + .freeze() + .unwrap(); + + let write_set_2 = WriteSetMut::new(vec![ + // Same in 1st write set. + ( + StateKey::raw(b"key-1"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Does nto exist in 1st write set. + ( + StateKey::raw(b"key-3"), + WriteOp::legacy_creation(vec![0, 1, 2].into()), + ), + // Different from 1st write-set. + ( + StateKey::raw(b"key-4"), + WriteOp::legacy_creation(vec![0, 1, 3].into()), + ), + ( + StateKey::raw(b"key-5"), + WriteOp::legacy_modification(vec![0, 1, 2].into()), + ), + ( + StateKey::raw(b"key-6"), + WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 200 }), + ), + ), + ]) + .freeze() + .unwrap(); + + let expected_diffs = vec![ + Diff::WriteSet { + state_key: StateKey::raw(b"key-2"), + left: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + right: None, + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-3"), + left: None, + right: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-4"), + left: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + right: Some(WriteOp::legacy_creation(vec![0, 1, 3].into())), + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-5"), + left: Some(WriteOp::legacy_creation(vec![0, 1, 2].into())), + right: Some(WriteOp::legacy_modification(vec![0, 1, 2].into())), + }, + Diff::WriteSet { + state_key: StateKey::raw(b"key-6"), + left: Some(WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 100 }), + )), + right: Some(WriteOp::creation( + vec![0, 1, 2].into(), + StateValueMetadata::new(1, 2, &CurrentTimeMicroseconds { microseconds: 200 }), + )), + }, + ]; + + TransactionDiff::diff_write_sets(&mut diffs, write_set_1, write_set_2); + + assert_eq!(diffs.len(), 5); + assert!(diffs.iter().all(|diff| expected_diffs.contains(diff))); + } +} diff --git a/aptos-move/replay-benchmark/src/generator.rs b/aptos-move/replay-benchmark/src/generator.rs new file mode 100644 index 0000000000000..089d6a36acbfb --- /dev/null +++ b/aptos-move/replay-benchmark/src/generator.rs @@ -0,0 +1,105 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{block::Block, overrides::OverrideConfig, workload::Workload}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_types::transaction::{Transaction, Version}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Instant, +}; + +pub struct BenchmarkGenerator { + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, +} + +impl BenchmarkGenerator { + /// Generates a sequence of [Block] for benchmarking. + pub async fn generate_blocks( + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, + ) -> anyhow::Result> { + let generator = Arc::new(Self { + debugger, + begin_version, + end_version, + override_config, + }); + + let limit = generator.end_version - generator.begin_version + 1; + let (txns, _) = generator + .debugger + .get_committed_transactions(generator.begin_version, limit) + .await?; + let txn_blocks = generator.partition(txns); + + let num_generated = Arc::new(AtomicU64::new(0)); + let num_blocks = txn_blocks.len(); + + let mut tasks = Vec::with_capacity(num_blocks); + for (begin, txn_block) in txn_blocks { + let task = tokio::task::spawn_blocking({ + let generator = generator.clone(); + let num_generated = num_generated.clone(); + move || { + let start_time = Instant::now(); + let block = generator.generate_block(begin, txn_block); + let time = start_time.elapsed().as_secs(); + println!( + "Generated block {}/{} in {}s", + num_generated.fetch_add(1, Ordering::SeqCst) + 1, + num_blocks, + time + ); + block + } + }); + tasks.push(task); + } + + let mut blocks = Vec::with_capacity(tasks.len()); + for task in tasks { + blocks.push(task.await?); + } + + Ok(blocks) + } + + /// Generates a single [Block] for benchmarking. + fn generate_block(&self, begin: Version, txns: Vec) -> Block { + let workload = Workload::new(begin, txns); + let state_view = self.debugger.state_view_at_version(begin); + let state_override = self.override_config.get_state_override(&state_view); + Block::new(workload, &state_view, state_override) + } + + /// Partitions a sequence of transactions into blocks. + fn partition(&self, txns: Vec) -> Vec<(Version, Vec)> { + let mut begin_versions_and_blocks = Vec::with_capacity(txns.len()); + + let mut curr_begin = self.begin_version; + let mut curr_block = Vec::with_capacity(txns.len()); + + for txn in txns { + if txn.is_block_start() && !curr_block.is_empty() { + let block_size = curr_block.len(); + begin_versions_and_blocks.push((curr_begin, std::mem::take(&mut curr_block))); + curr_begin += block_size as Version; + } + curr_block.push(txn); + } + if !curr_block.is_empty() { + begin_versions_and_blocks.push((curr_begin, curr_block)); + } + + begin_versions_and_blocks + } +} diff --git a/aptos-move/replay-benchmark/src/lib.rs b/aptos-move/replay-benchmark/src/lib.rs new file mode 100644 index 0000000000000..0792bc55e5f4f --- /dev/null +++ b/aptos-move/replay-benchmark/src/lib.rs @@ -0,0 +1,10 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod block; +mod diff; +pub mod generator; +pub mod overrides; +pub mod runner; +mod state_view; +mod workload; diff --git a/aptos-move/replay-benchmark/src/main.rs b/aptos-move/replay-benchmark/src/main.rs new file mode 100644 index 0000000000000..051e0edc0e950 --- /dev/null +++ b/aptos-move/replay-benchmark/src/main.rs @@ -0,0 +1,187 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_logger::{Level, Logger}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_push_metrics::MetricsPusher; +use aptos_replay_benchmark::{ + generator::BenchmarkGenerator, overrides::OverrideConfig, runner::BenchmarkRunner, +}; +use aptos_rest_client::{AptosBaseUrl, Client}; +use aptos_types::{on_chain_config::FeatureFlag, transaction::Version}; +use clap::Parser; +use url::Url; + +/// Minimum number of times to execute blocks of transactions and measure the time taken. +const MIN_NUM_REPEATS: usize = 3; + +#[derive(Parser)] +#[command(about)] +pub struct Command { + #[clap(long, default_value_t = Level::Error)] + log_level: Level, + + #[clap( + long, + help = "Fullnode's REST API query endpoint, e.g., https://mainnet.aptoslabs.com/v1 for \ + mainnet." + )] + rest_endpoint: String, + + #[clap( + long, + help = "Optional API key to increase HTTP request rate limit quota." + )] + api_key: Option, + + #[clap(long, help = "First transaction to include for benchmarking.")] + begin_version: Version, + + #[clap(long, help = "Last transaction to include for benchmarking.")] + end_version: Version, + + #[clap( + long, + default_value_t = 0, + help = "Number of blocks to skip for time measurement. Allows to warm-up caches." + )] + num_blocks_to_skip: usize, + + #[clap( + long, + num_args = 1.., + help = "List of space-separated concurrency levels that define how many threads Block-STM \ + is using to execute a block of transactions. For each level, the time taken to \ + execute blocks of transactions is measured and reported." + )] + concurrency_levels: Vec, + + #[clap( + long, + default_value_t = MIN_NUM_REPEATS, + help = "Number of times to execute blocks of transactions and measure the timr taken for \ + each concurrency level." + )] + num_repeats: usize, + + #[clap( + long, + help = "If true, measure time taken to execute each block separately. If false, measure \ + the overall time to execute all blocks." + )] + measure_block_times: bool, + + #[clap( + long, + num_args = 1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to enable, in capital letters. For example, \ + GAS_PAYER_ENABLED or EMIT_FEE_STATEMENT. For the full list of feature flags, see \ + aptos-core/types/src/on_chain_config/aptos_features.rs." + )] + enable_features: Vec, + + #[clap( + long, + num_args = 1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to disable, in capital letters. For \ + example, GAS_PAYER_ENABLED or EMIT_FEE_STATEMENT. For the full list of feature \ + flags, see aptos-core/types/src/on_chain_config/aptos_features.rs." + )] + disable_features: Vec, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let command = Command::parse(); + + let mut logger = Logger::new(); + logger.level(command.log_level); + logger.init(); + + let _mp = MetricsPusher::start(vec![]); + + // Sanity checks for provided commands. + assert!( + command.begin_version <= command.end_version, + "Transaction versions should be a valid closed interval. Instead got begin: {}, end: {}", + command.begin_version, + command.end_version, + ); + assert!( + !command.concurrency_levels.is_empty(), + "At least one concurrency level must be provided", + ); + assert!( + command.num_repeats >= MIN_NUM_REPEATS, + "Number of repeats must be at least {}", + MIN_NUM_REPEATS, + ); + assert!( + command + .enable_features + .iter() + .all(|f| !command.disable_features.contains(f)), + "Enable and disable feature flags cannot overlap", + ); + + // TODO: + // Right now we fetch transactions from debugger, but ideally we need a way to save them + // locally (with corresponding read-sets) so we can use this for CI. + let builder = Client::builder(AptosBaseUrl::Custom(Url::parse(&command.rest_endpoint)?)); + let client = if let Some(api_key) = command.api_key { + builder.api_key(&api_key)?.build() + } else { + builder.build() + }; + let debugger = AptosDebugger::rest_client(client)?; + + // TODO: + // Right now, only features can be overridden. In general, this can be allowed for anything: + // 1. Framework code, e.g., to test performance of new natives or compiler, + // 2. Gas schedule, to track the costs of charging gas or tracking limits. + // We probably should support at least these. + let override_config = OverrideConfig::new(command.enable_features, command.disable_features); + + let blocks = BenchmarkGenerator::generate_blocks( + debugger, + command.begin_version, + command.end_version, + override_config, + ) + .await?; + + // Ensure we have at least one block to benchmark. + assert!( + command.num_blocks_to_skip < blocks.len(), + "There are only {} blocks, but skipping {}", + blocks.len(), + command.num_blocks_to_skip + ); + + for block in &blocks { + block.print_diffs(); + } + + BenchmarkRunner::new( + command.concurrency_levels, + command.num_repeats, + command.measure_block_times, + command.num_blocks_to_skip, + ) + .measure_execution_time(&blocks); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn verify_tool() { + use clap::CommandFactory; + Command::command().debug_assert(); + } +} diff --git a/aptos-move/replay-benchmark/src/overrides.rs b/aptos-move/replay-benchmark/src/overrides.rs new file mode 100644 index 0000000000000..8e0270bbb898b --- /dev/null +++ b/aptos-move/replay-benchmark/src/overrides.rs @@ -0,0 +1,90 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Defines different overrides for on-chain state used for benchmarking. With overrides, past +//! transactions can be replayed on top of a modified state, and we can evaluate how it impacts +//! performance or other things. + +use aptos_logger::error; +use aptos_types::{ + on_chain_config::{FeatureFlag, Features, OnChainConfig}, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, +}; +use serde::Serialize; +use std::collections::HashMap; + +/// Stores feature flags to enable/disable, essentially overriding on-chain state. +pub struct OverrideConfig { + additional_enabled_features: Vec, + additional_disabled_features: Vec, +} + +impl OverrideConfig { + pub fn new( + additional_enabled_features: Vec, + additional_disabled_features: Vec, + ) -> Self { + Self { + additional_enabled_features, + additional_disabled_features, + } + } + + pub(crate) fn get_state_override( + &self, + state_view: &impl StateView, + ) -> HashMap { + let mut state_override = HashMap::new(); + + // Enable/disable features. + let (features_state_key, features_state_value) = + config_override::(state_view, |features| { + for feature in &self.additional_enabled_features { + if features.is_enabled(*feature) { + error!("Feature {:?} is already enabled", feature); + } + features.enable(*feature); + } + for feature in &self.additional_disabled_features { + if !features.is_enabled(*feature) { + error!("Feature {:?} is already disabled", feature); + } + features.disable(*feature); + } + }); + state_override.insert(features_state_key, features_state_value); + state_override + } +} + +/// Returns the state key for on-chain config type. +fn config_state_key() -> StateKey { + StateKey::resource(T::address(), &T::struct_tag()) + .expect("Constructing state key for on-chain config must succeed") +} + +/// Fetches the config from the storage, and modifies it based on the passed function. Panics if +/// there is a storage error, config does not exist or fails to (de-)serialize. +fn config_override( + state_view: &impl StateView, + override_func: F, +) -> (StateKey, StateValue) { + let state_key = config_state_key::(); + let state_value = state_view + .get_state_value(&state_key) + .unwrap_or_else(|err| { + panic!( + "Failed to fetch on-chain config for {:?}: {:?}", + state_key, err + ) + }) + .unwrap_or_else(|| panic!("On-chain config for {:?} must always exist", state_key)); + + let mut config = T::deserialize_into_config(state_value.bytes()) + .expect("On-chain config must be deserializable"); + override_func(&mut config); + let config_bytes = bcs::to_bytes(&config).expect("On-chain config must be serializable"); + + let new_state_value = state_value.map_bytes(|_| Ok(config_bytes.into())).unwrap(); + (state_key, new_state_value) +} diff --git a/aptos-move/replay-benchmark/src/runner.rs b/aptos-move/replay-benchmark/src/runner.rs new file mode 100644 index 0000000000000..ed546ec40ff7b --- /dev/null +++ b/aptos-move/replay-benchmark/src/runner.rs @@ -0,0 +1,127 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::block::Block; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::time::Instant; + +/// Holds configuration for running the benchmarks and measuring the time taken. +pub struct BenchmarkRunner { + concurrency_levels: Vec, + num_repeats: usize, + measure_per_block_instead_of_overall_time: bool, + num_blocks_to_skip: usize, +} + +impl BenchmarkRunner { + pub fn new( + concurrency_levels: Vec, + num_repeats: usize, + measure_per_block_instead_of_overall_time: bool, + num_blocks_to_skip: usize, + ) -> Self { + Self { + concurrency_levels, + num_repeats, + measure_per_block_instead_of_overall_time, + num_blocks_to_skip, + } + } + + // TODO: + // This measures execution time from a cold-start. Ideally, we want to warm-up with executing + // 1-2 blocks prior to selected range, but not timing them. + pub fn measure_execution_time(&self, blocks: &[Block]) { + for concurrency_level in &self.concurrency_levels { + if self.measure_per_block_instead_of_overall_time { + self.measure_block_execution_times(blocks, *concurrency_level); + } else { + self.measure_overall_execution_time(blocks, *concurrency_level); + } + } + } + + /// Runs a sequence of blocks, measuring execution time for each block. The median is reported. + fn measure_block_execution_times(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = (0..blocks.len()) + .map(|_| Vec::with_capacity(self.num_repeats)) + .collect::>(); + + for i in 0..self.num_repeats { + let executor = AptosVMBlockExecutor::new(); + for (idx, block) in blocks.iter().enumerate() { + let start_time = Instant::now(); + block.run(&executor, concurrency_level); + let time = start_time.elapsed().as_micros(); + if idx >= self.num_blocks_to_skip { + println!( + "[{}/{}] Block {} execution time is {}us", + i + 1, + self.num_repeats, + idx + 1, + time, + ); + } + times[idx].push(time); + } + } + + for (idx, mut time) in times.into_iter().enumerate() { + if idx >= self.num_blocks_to_skip { + time.sort(); + let min_time = *time.first().unwrap(); + let average_time = time.iter().sum::() as f64 / self.num_repeats as f64; + let median_time = time[self.num_repeats / 2]; + let max_time = *time.last().unwrap(); + + println!( + "Block {} execution time: min {}us, average {:.2}us, median {}us, max {}us\n", + idx + 1, + min_time, + average_time, + median_time, + max_time, + ); + } + } + } + + /// Runs the sequence of blocks, measuring end-to-end execution time. + fn measure_overall_execution_time(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = Vec::with_capacity(self.num_repeats); + for i in 0..self.num_repeats { + let executor = AptosVMBlockExecutor::new(); + + // Warm-up. + for block in &blocks[..self.num_blocks_to_skip] { + block.run(&executor, concurrency_level); + } + + // Actual measurement. + let start_time = Instant::now(); + for block in &blocks[self.num_blocks_to_skip..] { + block.run(&executor, concurrency_level); + } + let time = start_time.elapsed().as_micros(); + + println!( + "[{}/{}] Overall execution time is {}us", + i + 1, + self.num_repeats, + time, + ); + times.push(time); + } + + times.sort(); + let min_time = *times.first().unwrap(); + let average_time = times.iter().sum::() as f64 / self.num_repeats as f64; + let median_time = times[self.num_repeats / 2]; + let max_time = *times.last().unwrap(); + + println!( + "Overall execution time (blocks {}-{}): min {}us, average {:.2}us, median {}us, max {}us\n", + self.num_blocks_to_skip + 1, blocks.len(), min_time, average_time, median_time, max_time, + ); + } +} diff --git a/aptos-move/replay-benchmark/src/state_view.rs b/aptos-move/replay-benchmark/src/state_view.rs new file mode 100644 index 0000000000000..0de19abc37dfd --- /dev/null +++ b/aptos-move/replay-benchmark/src/state_view.rs @@ -0,0 +1,82 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::state_store::{ + state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue, + StateView, StateViewResult, TStateView, +}; +use parking_lot::Mutex; +use std::collections::HashMap; + +/// Represents the read-set obtained when executing transactions. +pub(crate) struct ReadSet { + data: HashMap, +} + +impl TStateView for ReadSet { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> StateViewResult> { + Ok(self.data.get(state_key).cloned()) + } + + fn get_usage(&self) -> StateViewResult { + unreachable!("Should not be called when benchmarking") + } +} + +/// [StateView] implementation that records all execution reads. Captured reads can be converted +/// into a [ReadSet]. +pub(crate) struct ReadSetCapturingStateView<'s, S> { + captured_reads: Mutex>, + state_view: &'s S, +} + +impl<'s, S: StateView> ReadSetCapturingStateView<'s, S> { + pub(crate) fn new(state_view: &'s S, initial_read_set: HashMap) -> Self { + Self { + captured_reads: Mutex::new(initial_read_set), + state_view, + } + } + + pub(crate) fn into_read_set(self) -> ReadSet { + ReadSet { + data: self.captured_reads.into_inner(), + } + } +} + +impl<'s, S: StateView> TStateView for ReadSetCapturingStateView<'s, S> { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> StateViewResult> { + // Check the read-set first. + if let Some(state_value) = self.captured_reads.lock().get(state_key) { + return Ok(Some(state_value.clone())); + } + + // We do not allow failures because then benchmarking will not be correct (we miss a read). + // Plus, these failures should not happen when replaying past transactions. + let maybe_state_value = self + .state_view + .get_state_value(state_key) + .unwrap_or_else(|err| { + panic!("Failed to fetch state value for {:?}: {:?}", state_key, err) + }); + + // Capture the read on first access. + if let Some(state_value) = &maybe_state_value { + let mut captured_reads = self.captured_reads.lock(); + if !captured_reads.contains_key(state_key) { + captured_reads.insert(state_key.clone(), state_value.clone()); + } + } + + Ok(maybe_state_value) + } + + fn get_usage(&self) -> StateViewResult { + unreachable!("Should not be called when benchmarking") + } +} diff --git a/aptos-move/replay-benchmark/src/workload.rs b/aptos-move/replay-benchmark/src/workload.rs new file mode 100644 index 0000000000000..ccc9ed1ea2f73 --- /dev/null +++ b/aptos-move/replay-benchmark/src/workload.rs @@ -0,0 +1,51 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_block_executor::txn_provider::default::DefaultTxnProvider; +use aptos_types::{ + block_executor::transaction_slice_metadata::TransactionSliceMetadata, + transaction::{ + signature_verified_transaction::{ + into_signature_verified_block, SignatureVerifiedTransaction, + }, + Transaction, Version, + }, +}; + +/// A workload to benchmark. Contains signature verified transactions, and metadata specifying the +/// start and end versions of these transactions. +pub(crate) struct Workload { + /// Stores a block of transactions for execution. Always has at least one transaction. + txn_provider: DefaultTxnProvider, + /// Stores metadata for the version range of a block. It is always set to + /// [TransactionSliceMetadata::Chunk]. + transaction_slice_metadata: TransactionSliceMetadata, +} + +impl Workload { + /// Returns a new workload to execute transactions at specified version. + pub(crate) fn new(begin: Version, txns: Vec) -> Self { + assert!(!txns.is_empty()); + + let end = begin + txns.len() as Version; + let transaction_slice_metadata = TransactionSliceMetadata::chunk(begin, end); + + let signature_verified_txns = into_signature_verified_block(txns); + let txn_provider = DefaultTxnProvider::new(signature_verified_txns); + + Workload { + txn_provider, + transaction_slice_metadata, + } + } + + /// Returns the signature verified transactions in the workload. + pub(crate) fn txn_provider(&self) -> &DefaultTxnProvider { + &self.txn_provider + } + + /// Returns transaction metadata corresponding to [begin, end) versions of the workload. + pub(crate) fn transaction_slice_metadata(&self) -> TransactionSliceMetadata { + self.transaction_slice_metadata + } +} diff --git a/crates/aptos-logger/src/metadata.rs b/crates/aptos-logger/src/metadata.rs index aba5b7503ce61..6ef82530c0a06 100644 --- a/crates/aptos-logger/src/metadata.rs +++ b/crates/aptos-logger/src/metadata.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use serde::{Deserialize, Serialize}; -use std::{fmt, str::FromStr}; +use strum_macros::{Display, EnumString, FromRepr}; /// Associated metadata with every log to identify what kind of log and where it came from #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -60,68 +60,96 @@ impl Metadata { } } -static LOG_LEVEL_NAMES: &[&str] = &["ERROR", "WARN", "INFO", "DEBUG", "TRACE"]; - /// Logging levels, used for stratifying logs, and disabling less important ones for performance reasons #[repr(usize)] -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize, Deserialize)] +#[derive( + Copy, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Debug, + Hash, + Serialize, + Deserialize, + FromRepr, + EnumString, + Display, +)] #[serde(rename_all = "UPPERCASE")] pub enum Level { /// The "error" level. /// /// Designates very serious errors. + #[strum(ascii_case_insensitive)] + #[strum(to_string = "ERROR")] Error = 0, /// The "warn" level. /// /// Designates hazardous situations. - Warn, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "WARN")] + Warn = 1, /// The "info" level. /// /// Designates useful information. - Info, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "INFO")] + Info = 2, /// The "debug" level. /// /// Designates lower priority information. - Debug, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "DEBUG")] + Debug = 3, /// The "trace" level. /// /// Designates very low priority, often extremely verbose, information. - Trace, + #[strum(ascii_case_insensitive)] + #[strum(to_string = "TRACE")] + Trace = 4, } -impl Level { - fn from_usize(idx: usize) -> Option { - let lvl = match idx { - 0 => Level::Error, - 1 => Level::Warn, - 2 => Level::Info, - 3 => Level::Debug, - 4 => Level::Trace, - _ => return None, - }; - - Some(lvl) +#[cfg(test)] +mod test { + use super::*; + use std::str::FromStr; + + #[test] + fn test_log_level_from_string() { + assert_eq!(Level::Error, Level::from_str("ERROR").unwrap()); + assert_eq!(Level::Error, Level::from_str("Error").unwrap()); + assert_eq!(Level::Error, Level::from_str("error").unwrap()); + + assert_eq!(Level::Warn, Level::from_str("WARN").unwrap()); + assert_eq!(Level::Warn, Level::from_str("wArN").unwrap()); + assert_eq!(Level::Warn, Level::from_str("warn").unwrap()); + + assert_eq!(Level::Info, Level::from_str("INFO").unwrap()); + assert_eq!(Level::Info, Level::from_str("infO").unwrap()); + assert_eq!(Level::Info, Level::from_str("info").unwrap()); + + assert_eq!(Level::Debug, Level::from_str("DEBUG").unwrap()); + assert_eq!(Level::Debug, Level::from_str("Debug").unwrap()); + assert_eq!(Level::Debug, Level::from_str("debug").unwrap()); + + assert_eq!(Level::Trace, Level::from_str("TRACE").unwrap()); + assert_eq!(Level::Trace, Level::from_str("trAce").unwrap()); + assert_eq!(Level::Trace, Level::from_str("trace").unwrap()); + + assert!(Level::from_str("ERR").is_err()); + assert!(Level::from_str("err_or").is_err()); + assert!(Level::from_str("INF").is_err()); + assert!(Level::from_str("tracey").is_err()); } -} - -/// An error given when no `Level` matches the inputted string -#[derive(Debug)] -pub struct LevelParseError; - -impl FromStr for Level { - type Err = LevelParseError; - - fn from_str(level: &str) -> Result { - LOG_LEVEL_NAMES - .iter() - .position(|name| name.eq_ignore_ascii_case(level)) - .map(|idx| Level::from_usize(idx).unwrap()) - .ok_or(LevelParseError) - } -} -impl fmt::Display for Level { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.pad(LOG_LEVEL_NAMES[*self as usize]) + #[test] + fn test_log_level_to_string() { + assert_eq!(String::from("ERROR"), Level::Error.to_string()); + assert_eq!(String::from("WARN"), Level::Warn.to_string()); + assert_eq!(String::from("INFO"), Level::Info.to_string()); + assert_eq!(String::from("DEBUG"), Level::Debug.to_string()); + assert_eq!(String::from("TRACE"), Level::Trace.to_string()); } } diff --git a/types/src/block_executor/transaction_slice_metadata.rs b/types/src/block_executor/transaction_slice_metadata.rs index 8df707552253f..381259b2da3f1 100644 --- a/types/src/block_executor/transaction_slice_metadata.rs +++ b/types/src/block_executor/transaction_slice_metadata.rs @@ -76,6 +76,15 @@ impl TransactionSliceMetadata { (Chunk { end, .. }, Chunk { begin, .. }) => begin == end, } } + + /// Returns the first transaction version for [TransactionSliceMetadata::Chunk], and [None] + /// otherwise. + pub fn begin_version(&self) -> Option { + if let TransactionSliceMetadata::Chunk { begin, .. } = self { + return Some(*begin); + } + None + } } #[cfg(test)] diff --git a/types/src/write_set.rs b/types/src/write_set.rs index e939b70d1346a..0da5b6b89dcd5 100644 --- a/types/src/write_set.rs +++ b/types/src/write_set.rs @@ -568,6 +568,10 @@ impl WriteSetMut { &mut self.write_set } + pub fn into_inner(self) -> BTreeMap { + self.write_set + } + pub fn squash(mut self, other: Self) -> Result { use btree_map::Entry::*;