Skip to content

Commit

Permalink
[Executor] Move block gas limit to state computer (#8441)
Browse files Browse the repository at this point in the history
[Executor]  moving block gas limit to state computer, refactoring and fixing tests
  • Loading branch information
danielxiangzl authored Jun 1, 2023
1 parent 82cebdf commit 40dc32c
Show file tree
Hide file tree
Showing 32 changed files with 289 additions and 408 deletions.
2 changes: 1 addition & 1 deletion api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ impl TestContext {
let parent_id = self.executor.committed_block_id();
let result = self
.executor
.execute_block((metadata.id(), txns.clone()), parent_id)
.execute_block((metadata.id(), txns.clone()), parent_id, None)
.unwrap();
let mut compute_status = result.compute_status().clone();
assert_eq!(compute_status.len(), txns.len(), "{:?}", result);
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-debugger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl AptosDebugger {
txns: Vec<Transaction>,
) -> Result<Vec<TransactionOutput>> {
let state_view = DebuggerStateView::new(self.debugger.clone(), version);
AptosVM::execute_block(txns, &state_view)
AptosVM::execute_block(txns, &state_view, None)
.map_err(|err| format_err!("Unexpected VM Error: {:?}", err))
}

Expand Down
10 changes: 5 additions & 5 deletions aptos-move/aptos-transaction-benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct ParamSweepOpt {
pub num_runs: usize,

#[clap(long)]
pub maybe_gas_limit: Option<u64>,
pub maybe_block_gas_limit: Option<u64>,
}

#[derive(Debug, Parser)]
Expand All @@ -76,7 +76,7 @@ struct ExecuteOpt {
pub no_conflict_txns: bool,

#[clap(long)]
pub maybe_gas_limit: Option<u64>,
pub maybe_block_gas_limit: Option<u64>,
}

fn param_sweep(opt: ParamSweepOpt) {
Expand All @@ -91,7 +91,7 @@ fn param_sweep(opt: ParamSweepOpt) {
let run_parallel = !opt.skip_parallel;
let run_sequential = !opt.skip_sequential;

let maybe_gas_limit = opt.maybe_gas_limit;
let maybe_block_gas_limit = opt.maybe_block_gas_limit;

assert!(
run_sequential || run_parallel,
Expand All @@ -110,7 +110,7 @@ fn param_sweep(opt: ParamSweepOpt) {
1,
concurrency_level,
false,
maybe_gas_limit,
maybe_block_gas_limit,
);
par_tps.sort();
seq_tps.sort();
Expand Down Expand Up @@ -171,7 +171,7 @@ fn execute(opt: ExecuteOpt) {
opt.num_executor_shards,
opt.concurrency_level_per_shard,
opt.no_conflict_txns,
opt.maybe_gas_limit,
opt.maybe_block_gas_limit,
);

let sum: usize = par_tps.iter().sum();
Expand Down
36 changes: 18 additions & 18 deletions aptos-move/aptos-transaction-benchmarks/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ where
self.num_transactions,
1,
AccountPickStyle::Unlimited,
None,
)
},
|state| state.execute_sequential(),
Expand All @@ -92,7 +91,6 @@ where
self.num_transactions,
1,
AccountPickStyle::Unlimited,
None,
)
},
|state| state.execute_parallel(),
Expand All @@ -113,7 +111,7 @@ where
num_executor_shards: usize,
concurrency_level_per_shard: usize,
no_conflict_txn: bool,
maybe_gas_limit: Option<u64>,
maybe_block_gas_limit: Option<u64>,
) -> (Vec<usize>, Vec<usize>) {
let mut par_tps = Vec::new();
let mut seq_tps = Vec::new();
Expand All @@ -138,7 +136,6 @@ where
num_txn,
num_executor_shards,
account_pick_style,
maybe_gas_limit,
);

for i in 0..total_runs {
Expand All @@ -149,13 +146,15 @@ where
run_seq,
no_conflict_txn,
concurrency_level_per_shard,
maybe_block_gas_limit,
);
} else {
let tps = state.execute_blockstm_benchmark(
run_par,
run_seq,
no_conflict_txn,
concurrency_level_per_shard,
maybe_block_gas_limit,
);
par_tps.push(tps.0);
seq_tps.push(tps.1);
Expand Down Expand Up @@ -188,14 +187,12 @@ where
num_transactions: usize,
num_executor_shards: usize,
account_pick_style: AccountPickStyle,
maybe_gas_limit: Option<u64>,
) -> Self {
Self::with_universe(
strategy,
universe_strategy(num_accounts, num_transactions, account_pick_style),
num_transactions,
num_executor_shards,
maybe_gas_limit,
)
}

Expand All @@ -206,7 +203,6 @@ where
universe_strategy: impl Strategy<Value = AccountUniverseGen>,
num_transactions: usize,
num_executor_shards: usize,
maybe_gas_limit: Option<u64>,
) -> Self {
let mut runner = TestRunner::default();
let universe_gen = universe_strategy
Expand All @@ -221,13 +217,9 @@ where
let universe = universe_gen.setup_gas_cost_stability(&mut executor);

let state_view = Arc::new(executor.get_state_view().clone());
let parallel_block_executor = Arc::new(ShardedBlockExecutor::new(
num_executor_shards,
None,
maybe_gas_limit,
));
let sequential_block_executor =
Arc::new(ShardedBlockExecutor::new(1, Some(1), maybe_gas_limit));
let parallel_block_executor =
Arc::new(ShardedBlockExecutor::new(num_executor_shards, None));
let sequential_block_executor = Arc::new(ShardedBlockExecutor::new(1, Some(1)));

let validator_set = ValidatorSet::fetch_config(
&FakeExecutor::from_head_genesis()
Expand Down Expand Up @@ -292,7 +284,7 @@ where
let txns = self.gen_transaction(false);
let executor = self.sequential_block_executor;
executor
.execute_block(self.state_view.clone(), txns, 1)
.execute_block(self.state_view.clone(), txns, 1, None)
.expect("VM should not fail to start");
}

Expand All @@ -303,7 +295,7 @@ where
let txns = self.gen_transaction(false);
let executor = self.parallel_block_executor.clone();
executor
.execute_block(self.state_view.clone(), txns, num_cpus::get())
.execute_block(self.state_view.clone(), txns, num_cpus::get(), None)
.expect("VM should not fail to start");
}

Expand All @@ -312,6 +304,7 @@ where
transactions: Vec<Transaction>,
block_executor: Arc<ShardedBlockExecutor<FakeDataStore>>,
concurrency_level_per_shard: usize,
maybe_block_gas_limit: Option<u64>,
) -> usize {
let block_size = transactions.len();
let timer = Instant::now();
Expand All @@ -320,6 +313,7 @@ where
self.state_view.clone(),
transactions,
concurrency_level_per_shard,
maybe_block_gas_limit,
)
.expect("VM should not fail to start");
let exec_time = timer.elapsed().as_millis();
Expand All @@ -333,6 +327,7 @@ where
run_seq: bool,
no_conflict_txns: bool,
conurrency_level_per_shard: usize,
maybe_block_gas_limit: Option<u64>,
) -> (usize, usize) {
let transactions = self.gen_transaction(no_conflict_txns);
let par_tps = if run_par {
Expand All @@ -341,6 +336,7 @@ where
transactions.clone(),
self.parallel_block_executor.clone(),
conurrency_level_per_shard,
maybe_block_gas_limit,
);
println!("Parallel execution finishes, TPS = {}", tps);
tps
Expand All @@ -349,8 +345,12 @@ where
};
let seq_tps = if run_seq {
println!("Sequential execution starts...");
let tps =
self.execute_benchmark(transactions, self.sequential_block_executor.clone(), 1);
let tps = self.execute_benchmark(
transactions,
self.sequential_block_executor.clone(),
1,
maybe_block_gas_limit,
);
println!("Sequential execution finishes, TPS = {}", tps);
tps
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl<'a> AptosTestAdapter<'a> {
/// Should error if the transaction ends up being discarded, or having a status other than
/// EXECUTED.
fn run_transaction(&mut self, txn: Transaction) -> Result<TransactionOutput> {
let mut outputs = AptosVM::execute_block(vec![txn], &self.storage.clone())?;
let mut outputs = AptosVM::execute_block(vec![txn], &self.storage.clone(), None)?;

assert_eq!(outputs.len(), 1);

Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm-profiling/src/bins/run_aptos_p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn main() -> Result<()> {
})
.collect();

let res = AptosVM::execute_block(txns, &state_store)?;
let res = AptosVM::execute_block(txns, &state_store, None)?;
for i in 0..NUM_TXNS {
assert!(res[i as usize].status().status().unwrap().is_success());
}
Expand Down
39 changes: 4 additions & 35 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,7 @@ impl VMExecutor for AptosVM {
fn execute_block(
transactions: Vec<Transaction>,
state_view: &(impl StateView + Sync),
maybe_block_gas_limit: Option<u64>,
) -> Result<Vec<TransactionOutput>, VMStatus> {
fail_point!("move_adapter::execute_block", |_| {
Err(VMStatus::Error(
Expand All @@ -1492,41 +1493,7 @@ impl VMExecutor for AptosVM {
transactions,
state_view,
Self::get_concurrency_level(),
None,
);
if ret.is_ok() {
// Record the histogram count for transactions per block.
BLOCK_TRANSACTION_COUNT.observe(count as f64);
}
ret
}

fn execute_block_with_gas_limit(
transactions: Vec<Transaction>,
state_view: &(impl StateView + Sync),
maybe_gas_limit: Option<u64>,
) -> std::result::Result<Vec<TransactionOutput>, VMStatus> {
fail_point!("move_adapter::execute_block", |_| {
Err(VMStatus::Error(
StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR,
None,
))
});

let log_context = AdapterLogSchema::new(state_view.id(), 0);
info!(
log_context,
"Executing block, transaction count: {}",
transactions.len()
);

let count = transactions.len();
let ret = BlockAptosVM::execute_block(
Arc::clone(&RAYON_EXEC_POOL),
transactions,
state_view,
Self::get_concurrency_level(),
maybe_gas_limit,
maybe_block_gas_limit,
);
if ret.is_ok() {
// Record the histogram count for transactions per block.
Expand All @@ -1539,6 +1506,7 @@ impl VMExecutor for AptosVM {
sharded_block_executor: &ShardedBlockExecutor<S>,
transactions: Vec<Transaction>,
state_view: Arc<S>,
maybe_block_gas_limit: Option<u64>,
) -> Result<Vec<TransactionOutput>, VMStatus> {
let log_context = AdapterLogSchema::new(state_view.id(), 0);
info!(
Expand All @@ -1552,6 +1520,7 @@ impl VMExecutor for AptosVM {
state_view,
transactions,
AptosVM::get_concurrency_level(),
maybe_block_gas_limit,
);
if ret.is_ok() {
// Record the histogram count for transactions per block.
Expand Down
4 changes: 2 additions & 2 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl BlockAptosVM {
transactions: Vec<Transaction>,
state_view: &S,
concurrency_level: usize,
maybe_gas_limit: Option<u64>,
maybe_block_gas_limit: Option<u64>,
) -> Result<Vec<TransactionOutput>, VMStatus> {
let _timer = BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS.start_timer();
// Verify the signatures of all the transactions in parallel.
Expand All @@ -162,7 +162,7 @@ impl BlockAptosVM {
let executor = BlockExecutor::<PreprocessedTransaction, AptosExecutorTask<S>, S>::new(
concurrency_level,
executor_thread_pool,
maybe_gas_limit,
maybe_block_gas_limit,
);

let ret = executor.execute_block(state_view, signature_verified_block, state_view);
Expand Down
10 changes: 2 additions & 8 deletions aptos-move/aptos-vm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,15 @@ pub trait VMExecutor: Send + Sync {
fn execute_block(
transactions: Vec<Transaction>,
state_view: &(impl StateView + Sync),
) -> Result<Vec<TransactionOutput>, VMStatus>;

/// Executes a block of transactions with per_block_gas_limit
/// and returns output for each one of them.
fn execute_block_with_gas_limit(
transactions: Vec<Transaction>,
state_view: &(impl StateView + Sync),
maybe_gas_limit: Option<u64>,
maybe_block_gas_limit: Option<u64>,
) -> Result<Vec<TransactionOutput>, VMStatus>;

/// Executes a block of transactions using a sharded block executor and returns the results.
fn execute_block_sharded<S: StateView + Sync + Send + 'static>(
sharded_block_executor: &ShardedBlockExecutor<S>,
transactions: Vec<Transaction>,
state_view: Arc<S>,
maybe_block_gas_limit: Option<u64>,
) -> Result<Vec<TransactionOutput>, VMStatus>;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub struct ExecutorShard<S: StateView + Sync + Send + 'static> {
executor_thread_pool: Arc<rayon::ThreadPool>,
command_rx: Receiver<ExecutorShardCommand<S>>,
result_tx: Sender<Result<Vec<TransactionOutput>, VMStatus>>,
maybe_gas_limit: Option<u64>,
}

impl<S: StateView + Sync + Send + 'static> ExecutorShard<S> {
Expand All @@ -30,7 +29,6 @@ impl<S: StateView + Sync + Send + 'static> ExecutorShard<S> {
num_executor_threads: usize,
command_rx: Receiver<ExecutorShardCommand<S>>,
result_tx: Sender<Result<Vec<TransactionOutput>, VMStatus>>,
maybe_gas_limit: Option<u64>,
) -> Self {
let executor_thread_pool = Arc::new(
rayon::ThreadPoolBuilder::new()
Expand All @@ -49,7 +47,6 @@ impl<S: StateView + Sync + Send + 'static> ExecutorShard<S> {
executor_thread_pool,
command_rx,
result_tx,
maybe_gas_limit,
}
}

Expand All @@ -61,6 +58,7 @@ impl<S: StateView + Sync + Send + 'static> ExecutorShard<S> {
state_view,
transactions,
concurrency_level_per_shard,
maybe_block_gas_limit,
) => {
trace!(
"Shard {} received ExecuteBlock command of block size {} ",
Expand All @@ -72,7 +70,7 @@ impl<S: StateView + Sync + Send + 'static> ExecutorShard<S> {
transactions,
state_view.as_ref(),
concurrency_level_per_shard,
self.maybe_gas_limit,
maybe_block_gas_limit,
);
drop(state_view);
self.result_tx.send(ret).unwrap();
Expand Down
Loading

0 comments on commit 40dc32c

Please sign in to comment.