From 53e33ec0fbe00d0a80df8b2e38edbf5502a38bc4 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Tue, 28 Jan 2025 18:20:30 -0600 Subject: [PATCH] Move block_cost_limit tracking to BankingStage in preparation for SIMD-0207 (#753) --- banking-bench/src/main.rs | 1 + core/benches/banking_stage.rs | 2 + core/benches/consumer.rs | 1 + core/src/banking_simulation.rs | 1 + core/src/banking_stage.rs | 26 +- core/src/banking_stage/consume_worker.rs | 30 ++- core/src/banking_stage/consumer.rs | 60 +++-- .../forward_packet_batches_by_accounts.rs | 2 +- core/src/banking_stage/qos_service.rs | 22 +- core/src/bundle_stage.rs | 22 -- core/src/bundle_stage/bundle_consumer.rs | 105 +------- .../bundle_reserved_space_manager.rs | 237 ------------------ core/src/tpu.rs | 77 +++++- cost-model/benches/cost_tracker.rs | 4 +- cost-model/src/cost_tracker.rs | 76 ++++-- ledger-tool/src/main.rs | 2 +- ledger/src/blockstore_processor.rs | 2 +- rpc-test/tests/rpc.rs | 2 + 18 files changed, 246 insertions(+), 426 deletions(-) delete mode 100644 core/src/bundle_stage/bundle_reserved_space_manager.rs diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 9e3800f948..099e3a90ce 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -485,6 +485,7 @@ fn main() { false, HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); // This is so that the signal_receiver does not go out of scope after the closure. diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 1b7f2c17c2..e9de574146 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -135,6 +135,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &mut transaction_buffer, &BankingStageStats::default(), &mut LeaderSlotMetricsTracker::new(0), + &|_| 0, ); }); @@ -322,6 +323,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { false, HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); let chunk_len = verified.len() / CHUNKS; diff --git a/core/benches/consumer.rs b/core/benches/consumer.rs index 2fe9ceb614..ceeafae1f4 100644 --- a/core/benches/consumer.rs +++ b/core/benches/consumer.rs @@ -175,6 +175,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz &bank, transaction_iter.next().unwrap(), 0, + &|_| 0, ); assert!(summary .execute_and_commit_transactions_output diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index c4d25d9aa1..0fbd8fc2f8 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -820,6 +820,7 @@ impl BankingSimulator { false, collections::HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); let (&_slot, &raw_base_event_time) = freeze_time_by_slot diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5d75289e86..1517e53df9 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -38,7 +38,7 @@ use { solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, solana_poh::poh_recorder::{PohRecorder, TransactionRecorder}, solana_runtime::{ - bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, solana_sdk::{pubkey::Pubkey, timing::AtomicInterval}, @@ -365,6 +365,8 @@ impl BankingStage { enable_forwarding: bool, blacklisted_accounts: HashSet, bundle_account_locker: BundleAccountLocker, + // callback function for compute space reservation for BundleStage + block_cost_limit_block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static, ) -> Self { Self::new_num_threads( block_production_method, @@ -383,6 +385,7 @@ impl BankingStage { enable_forwarding, blacklisted_accounts, bundle_account_locker, + block_cost_limit_block_cost_limit_reservation_cb, ) } @@ -404,6 +407,7 @@ impl BankingStage { enable_forwarding: bool, blacklisted_accounts: HashSet, bundle_account_locker: BundleAccountLocker, + block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static, ) -> Self { match block_production_method { BlockProductionMethod::ThreadLocalMultiIterator => { @@ -422,6 +426,7 @@ impl BankingStage { prioritization_fee_cache, blacklisted_accounts, bundle_account_locker, + block_cost_limit_reservation_cb, ) } BlockProductionMethod::CentralScheduler => Self::new_central_scheduler( @@ -440,6 +445,7 @@ impl BankingStage { enable_forwarding, blacklisted_accounts, bundle_account_locker, + block_cost_limit_reservation_cb, ), } } @@ -460,6 +466,7 @@ impl BankingStage { prioritization_fee_cache: &Arc, blacklisted_accounts: HashSet, bundle_account_locker: BundleAccountLocker, + block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); // Single thread to generate entries from many banks. @@ -528,6 +535,7 @@ impl BankingStage { unprocessed_transaction_storage, blacklisted_accounts.clone(), bundle_account_locker.clone(), + block_cost_limit_reservation_cb.clone(), ) }) .collect(); @@ -551,6 +559,7 @@ impl BankingStage { enable_forwarding: bool, blacklisted_accounts: HashSet, bundle_account_locker: BundleAccountLocker, + block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); // Single thread to generate entries from many banks. @@ -599,6 +608,7 @@ impl BankingStage { ), blacklisted_accounts.clone(), bundle_account_locker.clone(), + block_cost_limit_reservation_cb.clone(), )); } @@ -628,11 +638,12 @@ impl BankingStage { ); worker_metrics.push(consume_worker.metrics_handle()); + let cb = block_cost_limit_reservation_cb.clone(); bank_thread_hdls.push( Builder::new() .name(format!("solCoWorker{id:02}")) .spawn(move || { - let _ = consume_worker.run(); + let _ = consume_worker.run(cb); }) .unwrap(), ) @@ -687,6 +698,7 @@ impl BankingStage { unprocessed_transaction_storage: UnprocessedTransactionStorage, blacklisted_accounts: HashSet, bundle_account_locker: BundleAccountLocker, + block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static, ) -> JoinHandle<()> { let mut packet_receiver = PacketReceiver::new(id, packet_receiver); let consumer = Consumer::new( @@ -708,6 +720,7 @@ impl BankingStage { &consumer, id, unprocessed_transaction_storage, + block_cost_limit_reservation_cb, ) }) .unwrap() @@ -722,6 +735,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, tracer_packet_stats: &mut TracerPacketStats, + block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64, ) { if unprocessed_transaction_storage.should_not_process() { return; @@ -747,6 +761,7 @@ impl BankingStage { unprocessed_transaction_storage, banking_stage_stats, slot_metrics_tracker, + block_cost_limit_reservation_cb )); slot_metrics_tracker .increment_consume_buffered_packets_us(consume_buffered_packets_us); @@ -787,6 +802,7 @@ impl BankingStage { consumer: &Consumer, id: u32, mut unprocessed_transaction_storage: UnprocessedTransactionStorage, + block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64, ) { let mut banking_stage_stats = BankingStageStats::new(id); let mut tracer_packet_stats = TracerPacketStats::new(id); @@ -806,6 +822,7 @@ impl BankingStage { &banking_stage_stats, &mut slot_metrics_tracker, &mut tracer_packet_stats, + &block_cost_limit_reservation_cb )); slot_metrics_tracker .increment_process_buffered_packets_us(process_buffered_packets_us); @@ -939,6 +956,7 @@ mod tests { false, HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); drop(non_vote_sender); drop(tpu_vote_sender); @@ -997,6 +1015,7 @@ mod tests { false, HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); trace!("sending bank"); drop(non_vote_sender); @@ -1084,6 +1103,7 @@ mod tests { false, HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); // fund another account so we can send 2 good transactions in a single batch. @@ -1261,6 +1281,7 @@ mod tests { &Arc::new(PrioritizationFeeCache::new(0u64)), HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); // wait for banking_stage to eat the packets @@ -1464,6 +1485,7 @@ mod tests { false, HashSet::default(), BundleAccountLocker::default(), + |_| 0, ); let keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index 3e54aab976..975a7d85f3 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -58,14 +58,18 @@ impl ConsumeWorker { self.metrics.clone() } - pub fn run(self) -> Result<(), ConsumeWorkerError> { + pub fn run(self, reservation_cb: impl Fn(&Bank) -> u64) -> Result<(), ConsumeWorkerError> { loop { let work = self.consume_receiver.recv()?; - self.consume_loop(work)?; + self.consume_loop(work, &reservation_cb)?; } } - fn consume_loop(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + fn consume_loop( + &self, + work: ConsumeWork, + reservation_cb: &impl Fn(&Bank) -> u64, + ) -> Result<(), ConsumeWorkerError> { let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank()); let Some(mut bank) = maybe_consume_bank else { self.metrics @@ -99,18 +103,24 @@ impl ConsumeWorker { return self.retry_drain(work); } } - self.consume(&bank, work)?; + self.consume(&bank, work, reservation_cb)?; } Ok(()) } /// Consume a single batch. - fn consume(&self, bank: &Arc, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + fn consume( + &self, + bank: &Arc, + work: ConsumeWork, + reservation_cb: &impl Fn(&Bank) -> u64, + ) -> Result<(), ConsumeWorkerError> { let output = self.consumer.process_and_record_aged_transactions( bank, &work.transactions, &work.max_ages, + reservation_cb, ); self.metrics.update_for_consume(&output); @@ -847,7 +857,7 @@ mod tests { consumed_receiver, .. } = &test_frame; - let worker_thread = std::thread::spawn(move || worker.run()); + let worker_thread = std::thread::spawn(move || worker.run(|_| 0)); let pubkey1 = Pubkey::new_unique(); @@ -892,7 +902,7 @@ mod tests { consumed_receiver, .. } = &test_frame; - let worker_thread = std::thread::spawn(move || worker.run()); + let worker_thread = std::thread::spawn(move || worker.run(|_| 0)); poh_recorder .write() .unwrap() @@ -941,7 +951,7 @@ mod tests { consumed_receiver, .. } = &test_frame; - let worker_thread = std::thread::spawn(move || worker.run()); + let worker_thread = std::thread::spawn(move || worker.run(|_| 0)); poh_recorder .write() .unwrap() @@ -993,7 +1003,7 @@ mod tests { consumed_receiver, .. } = &test_frame; - let worker_thread = std::thread::spawn(move || worker.run()); + let worker_thread = std::thread::spawn(move || worker.run(|_| 0)); poh_recorder .write() .unwrap() @@ -1068,7 +1078,7 @@ mod tests { consumed_receiver, .. } = &test_frame; - let worker_thread = std::thread::spawn(move || worker.run()); + let worker_thread = std::thread::spawn(move || worker.run(|_| 0)); poh_recorder .write() .unwrap() diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index d654ea61f3..e733305b0b 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -123,6 +123,7 @@ impl Consumer { unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + reservation_cb: &impl Fn(&Bank) -> u64, ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; @@ -141,6 +142,7 @@ impl Consumer { &mut consumed_buffered_packets_count, &mut rebuffered_packet_count, packets_to_process, + reservation_cb, ) }, &self.blacklisted_accounts, @@ -181,6 +183,7 @@ impl Consumer { consumed_buffered_packets_count: &mut usize, rebuffered_packet_count: &mut usize, packets_to_process: &[Arc], + reservation_cb: &impl Fn(&Bank) -> u64, ) -> Option> { if payload.reached_end_of_slot { return None; @@ -194,6 +197,7 @@ impl Consumer { &payload.sanitized_transactions, banking_stage_stats, payload.slot_metrics_tracker, + reservation_cb )); payload .slot_metrics_tracker @@ -241,10 +245,15 @@ impl Consumer { sanitized_transactions: &[SanitizedTransaction], banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + reservation_cb: &impl Fn(&Bank) -> u64, ) -> ProcessTransactionsSummary { - let (mut process_transactions_summary, process_transactions_us) = measure_us!( - self.process_transactions(bank, bank_creation_time, sanitized_transactions) - ); + let (mut process_transactions_summary, process_transactions_us) = measure_us!(self + .process_transactions( + bank, + bank_creation_time, + sanitized_transactions, + reservation_cb + )); slot_metrics_tracker.increment_process_transactions_us(process_transactions_us); banking_stage_stats .transaction_processing_elapsed @@ -295,6 +304,7 @@ impl Consumer { bank: &Arc, bank_creation_time: &Instant, transactions: &[SanitizedTransaction], + reservation_cb: &impl Fn(&Bank) -> u64, ) -> ProcessTransactionsSummary { let mut chunk_start = 0; let mut all_retryable_tx_indexes = vec![]; @@ -315,6 +325,7 @@ impl Consumer { bank, &transactions[chunk_start..chunk_end], chunk_start, + reservation_cb, ); let ProcessTransactionBatchOutput { @@ -398,6 +409,7 @@ impl Consumer { bank: &Arc, txs: &[SanitizedTransaction], chunk_offset: usize, + reservation_cb: &impl Fn(&Bank) -> u64, ) -> ProcessTransactionBatchOutput { let mut error_counters = TransactionErrorMetrics::default(); let pre_results = vec![Ok(()); txs.len()]; @@ -426,6 +438,7 @@ impl Consumer { txs, chunk_offset, check_results.into_iter(), + reservation_cb, ); // Accumulate error counters from the initial checks into final results @@ -441,6 +454,7 @@ impl Consumer { bank: &Arc, txs: &[SanitizedTransaction], max_ages: &[MaxAge], + reservation_cb: &impl Fn(&Bank) -> u64, ) -> ProcessTransactionBatchOutput { let move_precompile_verification_to_svm = bank .feature_set @@ -477,7 +491,13 @@ impl Consumer { Ok(()) }); - self.process_and_record_transactions_with_pre_results(bank, txs, 0, pre_results) + self.process_and_record_transactions_with_pre_results( + bank, + txs, + 0, + pre_results, + reservation_cb, + ) } fn process_and_record_transactions_with_pre_results( @@ -486,15 +506,16 @@ impl Consumer { txs: &[SanitizedTransaction], chunk_offset: usize, pre_results: impl Iterator>, + reservation_cb: &impl Fn(&Bank) -> u64, ) -> ProcessTransactionBatchOutput { let ( (transaction_qos_cost_results, cost_model_throttled_transactions_count), cost_model_us, ) = measure_us!(self.qos_service.select_and_accumulate_transaction_costs( bank, - &mut bank.write_cost_tracker().unwrap(), txs, - pre_results + pre_results, + reservation_cb )); // Only lock accounts for those transactions are selected for the block; @@ -954,7 +975,7 @@ mod tests { BundleAccountLocker::default(), ); let process_transactions_summary = - consumer.process_transactions(&bank, &Instant::now(), &transactions); + consumer.process_transactions(&bank, &Instant::now(), &transactions, &|_| 0); poh_recorder .read() @@ -1137,7 +1158,7 @@ mod tests { ); let process_transactions_batch_output = - consumer.process_and_record_transactions(&bank, &transactions, 0); + consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); let ExecuteAndCommitTransactionsOutput { transaction_counts, @@ -1191,7 +1212,7 @@ mod tests { )]); let process_transactions_batch_output = - consumer.process_and_record_transactions(&bank, &transactions, 0); + consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); let ExecuteAndCommitTransactionsOutput { transaction_counts, @@ -1339,7 +1360,7 @@ mod tests { ); let process_transactions_batch_output = - consumer.process_and_record_transactions(&bank, &transactions, 0); + consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); let ExecuteAndCommitTransactionsOutput { transaction_counts, commit_transactions_result, @@ -1454,7 +1475,7 @@ mod tests { ); let process_transactions_batch_output = - consumer.process_and_record_transactions(&bank, &transactions, 0); + consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); let ExecuteAndCommitTransactionsOutput { transaction_counts, @@ -1556,7 +1577,7 @@ mod tests { )]); let process_transactions_batch_output = - consumer.process_and_record_transactions(&bank, &transactions, 0); + consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); let ExecuteAndCommitTransactionsOutput { transaction_counts, @@ -1586,7 +1607,7 @@ mod tests { ]); let process_transactions_batch_output = - consumer.process_and_record_transactions(&bank, &transactions, 0); + consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); let ExecuteAndCommitTransactionsOutput { transaction_counts, @@ -1710,7 +1731,7 @@ mod tests { ); let process_transactions_batch_output = - consumer.process_and_record_transactions(&bank, &transactions, 0); + consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); poh_recorder .read() @@ -1922,7 +1943,7 @@ mod tests { ); let process_transactions_summary = - consumer.process_transactions(&bank, &Instant::now(), &transactions); + consumer.process_transactions(&bank, &Instant::now(), &transactions, &|_| 0); let ProcessTransactionsSummary { reached_max_poh_height, @@ -2057,7 +2078,7 @@ mod tests { BundleAccountLocker::default(), ); - let _ = consumer.process_and_record_transactions(&bank, &transactions, 0); + let _ = consumer.process_and_record_transactions(&bank, &transactions, 0, &|_| 0); drop(consumer); // drop/disconnect transaction_status_sender transaction_status_service.join().unwrap(); @@ -2209,7 +2230,8 @@ mod tests { BundleAccountLocker::default(), ); - let _ = consumer.process_and_record_transactions(&bank, &[sanitized_tx.clone()], 0); + let _ = + consumer.process_and_record_transactions(&bank, &[sanitized_tx.clone()], 0, &|_| 0); drop(consumer); // drop/disconnect transaction_status_sender transaction_status_service.join().unwrap(); @@ -2289,6 +2311,7 @@ mod tests { &mut buffered_packet_batches, &banking_stage_stats, &mut LeaderSlotMetricsTracker::new(0), + &|_| 0, ); // Check that all packets were processed without retrying @@ -2380,6 +2403,7 @@ mod tests { &mut buffered_packet_batches, &BankingStageStats::default(), &mut LeaderSlotMetricsTracker::new(0), + &|_| 0, ); assert!(buffered_packet_batches.is_empty()); poh_recorder @@ -2458,6 +2482,7 @@ mod tests { &mut buffered_packet_batches, &banking_stage_stats, &mut LeaderSlotMetricsTracker::new(0), + &|_| 0, ); // Check that all but 1 transaction was processed. And that it was rebuffered. @@ -2579,6 +2604,7 @@ mod tests { &mut buffered_packet_batches, &banking_stage_stats, &mut LeaderSlotMetricsTracker::new(0), + &|_| 0, ); // Check that all packets were processed without retrying diff --git a/core/src/banking_stage/forward_packet_batches_by_accounts.rs b/core/src/banking_stage/forward_packet_batches_by_accounts.rs index a6e372655c..1457fe36ce 100644 --- a/core/src/banking_stage/forward_packet_batches_by_accounts.rs +++ b/core/src/banking_stage/forward_packet_batches_by_accounts.rs @@ -120,7 +120,7 @@ impl ForwardPacketBatchesByAccounts { ) -> bool { let tx_cost = CostModel::calculate_cost(sanitized_transaction, feature_set); - if let Ok(updated_costs) = self.cost_tracker.try_add(&tx_cost) { + if let Ok(updated_costs) = self.cost_tracker.try_add(&tx_cost, 0) { let batch_index = self.get_batch_index_by_updated_costs(&updated_costs); if let Some(forward_batch) = self.forward_batches.get_mut(batch_index) { diff --git a/core/src/banking_stage/qos_service.rs b/core/src/banking_stage/qos_service.rs index 1e6b2b48d3..df48dd06cd 100644 --- a/core/src/banking_stage/qos_service.rs +++ b/core/src/banking_stage/qos_service.rs @@ -6,9 +6,7 @@ use { super::{committer::CommitTransactionDetails, BatchedTransactionDetails}, solana_cost_model::{ - cost_model::CostModel, - cost_tracker::{CostTracker, UpdatedCosts}, - transaction_cost::TransactionCost, + cost_model::CostModel, cost_tracker::UpdatedCosts, transaction_cost::TransactionCost, }, solana_feature_set::FeatureSet, solana_measure::measure::Measure, @@ -45,9 +43,9 @@ impl QosService { pub fn select_and_accumulate_transaction_costs<'a>( &self, bank: &Bank, - cost_tracker: &mut CostTracker, transactions: &'a [SanitizedTransaction], pre_results: impl Iterator>, + block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64, ) -> ( Vec>>, u64, @@ -58,7 +56,7 @@ impl QosService { transactions.iter(), transaction_costs.into_iter(), bank, - cost_tracker, + block_cost_limit_reservation_cb, ); self.accumulate_estimated_transaction_costs(&Self::accumulate_batched_transaction_costs( transactions_qos_cost_results.iter(), @@ -107,17 +105,19 @@ impl QosService { Item = transaction::Result>, >, bank: &Bank, - cost_tracker: &mut CostTracker, + block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64, ) -> ( Vec>>, usize, ) { let mut cost_tracking_time = Measure::start("cost_tracking_time"); + let mut cost_tracker = bank.write_cost_tracker().unwrap(); + let reservation_amount = block_cost_limit_reservation_cb(bank); let mut num_included = 0; let select_results = transactions .zip(transactions_costs) .map(|(tx, cost)| match cost { - Ok(cost) => match cost_tracker.try_add(&cost) { + Ok(cost) => match cost_tracker.try_add(&cost, reservation_amount) { Ok(UpdatedCosts { updated_block_cost, updated_costliest_account_cost, @@ -725,7 +725,7 @@ mod tests { txs.iter(), txs_costs.into_iter(), &bank, - &mut bank.write_cost_tracker().unwrap(), + &|_| 0, ); assert_eq!(num_selected, 2); @@ -783,7 +783,7 @@ mod tests { txs.iter(), txs_costs.into_iter(), &bank, - &mut bank.write_cost_tracker().unwrap(), + &|_| 0, ); assert_eq!( total_txs_cost, @@ -852,7 +852,7 @@ mod tests { txs.iter(), txs_costs.into_iter(), &bank, - &mut bank.write_cost_tracker().unwrap(), + &|_| 0, ); assert_eq!( total_txs_cost, @@ -911,7 +911,7 @@ mod tests { txs.iter(), txs_costs.into_iter(), &bank, - &mut bank.write_cost_tracker().unwrap(), + &|_| 0, ); assert_eq!( total_txs_cost, diff --git a/core/src/bundle_stage.rs b/core/src/bundle_stage.rs index 763e3a6dc6..87fc801436 100644 --- a/core/src/bundle_stage.rs +++ b/core/src/bundle_stage.rs @@ -10,7 +10,6 @@ use { bundle_stage::{ bundle_account_locker::BundleAccountLocker, bundle_consumer::BundleConsumer, bundle_packet_receiver::BundleReceiver, - bundle_reserved_space_manager::BundleReservedSpaceManager, bundle_stage_leader_metrics::BundleStageLeaderMetrics, committer::Committer, }, packet_bundle::PacketBundle, @@ -18,7 +17,6 @@ use { tip_manager::TipManager, }, crossbeam_channel::{Receiver, RecvTimeoutError}, - solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS, solana_gossip::cluster_info::ClusterInfo, solana_ledger::blockstore_processor::TransactionStatusSender, solana_measure::measure_us, @@ -41,7 +39,6 @@ pub mod bundle_account_locker; mod bundle_consumer; mod bundle_packet_deserializer; mod bundle_packet_receiver; -mod bundle_reserved_space_manager; pub(crate) mod bundle_stage_leader_metrics; mod committer; @@ -208,7 +205,6 @@ impl BundleStage { tip_manager: TipManager, bundle_account_locker: BundleAccountLocker, block_builder_fee_info: &Arc>, - preallocated_bundle_cost: u64, prioritization_fee_cache: &Arc, ) -> Self { Self::start_bundle_thread( @@ -223,7 +219,6 @@ impl BundleStage { bundle_account_locker, MAX_BUNDLE_RETRY_DURATION, block_builder_fee_info, - preallocated_bundle_cost, prioritization_fee_cache, ) } @@ -245,7 +240,6 @@ impl BundleStage { bundle_account_locker: BundleAccountLocker, max_bundle_retry_duration: Duration, block_builder_fee_info: &Arc>, - preallocated_bundle_cost: u64, prioritization_fee_cache: &Arc, ) -> Self { const BUNDLE_STAGE_ID: u32 = 10_000; @@ -263,21 +257,6 @@ impl BundleStage { let unprocessed_bundle_storage = UnprocessedTransactionStorage::new_bundle_storage(); - let reserved_ticks = poh_recorder - .read() - .unwrap() - .ticks_per_slot() - .saturating_mul(8) - .saturating_div(10); - - // The first 80% of the block, based on poh ticks, has `preallocated_bundle_cost` less compute units. - // The last 20% has has full compute so blockspace is maximized if BundleStage is idle. - let reserved_space = BundleReservedSpaceManager::new( - MAX_BLOCK_UNITS, - preallocated_bundle_cost, - reserved_ticks, - ); - let consumer = BundleConsumer::new( committer, poh_recorder.read().unwrap().new_recorder(), @@ -288,7 +267,6 @@ impl BundleStage { block_builder_fee_info.clone(), max_bundle_retry_duration, cluster_info, - reserved_space, ); let bundle_thread = Builder::new() diff --git a/core/src/bundle_stage/bundle_consumer.rs b/core/src/bundle_stage/bundle_consumer.rs index 299cfbc9e5..f53221879a 100644 --- a/core/src/bundle_stage/bundle_consumer.rs +++ b/core/src/bundle_stage/bundle_consumer.rs @@ -9,7 +9,6 @@ use { }, bundle_stage::{ bundle_account_locker::{BundleAccountLocker, LockedBundle}, - bundle_reserved_space_manager::BundleReservedSpaceManager, bundle_stage_leader_metrics::BundleStageLeaderMetrics, committer::Committer, }, @@ -74,8 +73,6 @@ pub struct BundleConsumer { max_bundle_retry_duration: Duration, cluster_info: Arc, - - reserved_space: BundleReservedSpaceManager, } impl BundleConsumer { @@ -90,7 +87,6 @@ impl BundleConsumer { block_builder_fee_info: Arc>, max_bundle_retry_duration: Duration, cluster_info: Arc, - reserved_space: BundleReservedSpaceManager, ) -> Self { let blacklisted_accounts = HashSet::from_iter([tip_manager.tip_payment_program_id()]); Self { @@ -106,7 +102,6 @@ impl BundleConsumer { block_builder_fee_info, max_bundle_retry_duration, cluster_info, - reserved_space, } } @@ -133,8 +128,6 @@ impl BundleConsumer { unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, ) { - self.reserved_space.tick(&bank_start.working_bank); - let reached_end_of_slot = unprocessed_transaction_storage.process_bundles( bank_start.working_bank.clone(), bundle_stage_leader_metrics, @@ -151,7 +144,6 @@ impl BundleConsumer { &self.qos_service, &self.log_messages_bytes_limit, self.max_bundle_retry_duration, - &self.reserved_space, bundles, bank_start, bundle_stage_leader_metrics, @@ -180,7 +172,6 @@ impl BundleConsumer { qos_service: &QosService, log_messages_bytes_limit: &Option, max_bundle_retry_duration: Duration, - reserved_space: &BundleReservedSpaceManager, bundles: &[(ImmutableDeserializedBundle, SanitizedBundle)], bank_start: &BankStart, bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, @@ -217,7 +208,6 @@ impl BundleConsumer { qos_service, log_messages_bytes_limit, max_bundle_retry_duration, - reserved_space, &locked_bundle, bank_start, bundle_stage_leader_metrics, @@ -257,7 +247,6 @@ impl BundleConsumer { qos_service: &QosService, log_messages_bytes_limit: &Option, max_bundle_retry_duration: Duration, - reserved_space: &BundleReservedSpaceManager, locked_bundle: &LockedBundle, bank_start: &BankStart, bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, @@ -286,7 +275,6 @@ impl BundleConsumer { qos_service, log_messages_bytes_limit, max_bundle_retry_duration, - reserved_space, bank_start, bundle_stage_leader_metrics, ); @@ -306,7 +294,6 @@ impl BundleConsumer { qos_service, log_messages_bytes_limit, max_bundle_retry_duration, - reserved_space, locked_bundle.sanitized_bundle(), bank_start, bundle_stage_leader_metrics, @@ -327,7 +314,6 @@ impl BundleConsumer { qos_service: &QosService, log_messages_bytes_limit: &Option, max_bundle_retry_duration: Duration, - reserved_space: &BundleReservedSpaceManager, bank_start: &BankStart, bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, ) -> Result<(), BundleExecutionError> { @@ -356,7 +342,6 @@ impl BundleConsumer { qos_service, log_messages_bytes_limit, max_bundle_retry_duration, - reserved_space, locked_init_tip_programs_bundle.sanitized_bundle(), bank_start, bundle_stage_leader_metrics, @@ -409,7 +394,6 @@ impl BundleConsumer { qos_service, log_messages_bytes_limit, max_bundle_retry_duration, - reserved_space, locked_tip_crank_bundle.sanitized_bundle(), bank_start, bundle_stage_leader_metrics, @@ -438,26 +422,17 @@ impl BundleConsumer { /// Rolls back the reserved space if there's not enough blockspace for all transactions in the bundle. fn reserve_bundle_blockspace<'a>( qos_service: &QosService, - reserved_space: &BundleReservedSpaceManager, sanitized_bundle: &'a SanitizedBundle, bank: &Arc, ) -> ReserveBundleBlockspaceResult<'a> { - let mut write_cost_tracker = bank.write_cost_tracker().unwrap(); - - // set the block cost limit to the original block cost limit, run the select + accumulate - // then reset back to the expected block cost limit. this allows bundle stage to potentially - // increase block_compute_limits, allocate the space, and reset the block_cost_limits to - // the reserved space without BankingStage racing to allocate this extra reserved space - write_cost_tracker.set_block_cost_limit(reserved_space.block_cost_limit()); let (transaction_qos_cost_results, cost_model_throttled_transactions_count) = qos_service .select_and_accumulate_transaction_costs( bank, - &mut write_cost_tracker, &sanitized_bundle.transactions, std::iter::repeat(Ok(())), + // bundle stage does not respect the cost model reservation + &|_| 0, ); - write_cost_tracker.set_block_cost_limit(reserved_space.expected_block_cost_limits(bank)); - drop(write_cost_tracker); // rollback all transaction costs if it can't fit and if transaction_qos_cost_results.iter().any(|c| c.is_err()) { @@ -477,7 +452,6 @@ impl BundleConsumer { qos_service: &QosService, log_messages_bytes_limit: &Option, max_bundle_retry_duration: Duration, - reserved_space: &BundleReservedSpaceManager, sanitized_bundle: &SanitizedBundle, bank_start: &BankStart, bundle_stage_leader_metrics: &mut BundleStageLeaderMetrics, @@ -493,7 +467,6 @@ impl BundleConsumer { cost_model_elapsed_us, ) = measure_us!(Self::reserve_bundle_blockspace( qos_service, - reserved_space, sanitized_bundle, &bank_start.working_bank )?); @@ -743,7 +716,6 @@ mod tests { bundle_stage::{ bundle_account_locker::BundleAccountLocker, bundle_consumer::BundleConsumer, bundle_packet_deserializer::BundlePacketDeserializer, - bundle_reserved_space_manager::BundleReservedSpaceManager, bundle_stage_leader_metrics::BundleStageLeaderMetrics, committer::Committer, QosService, UnprocessedTransactionStorage, }, @@ -755,7 +727,7 @@ mod tests { jito_tip_distribution::sdk::derive_tip_distribution_account_address, rand::{thread_rng, RngCore}, solana_bundle::SanitizedBundle, - solana_cost_model::{block_cost_limits::MAX_BLOCK_UNITS, cost_model::CostModel}, + solana_cost_model::cost_model::CostModel, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ blockstore::Blockstore, genesis_utils::create_genesis_config, @@ -1042,16 +1014,6 @@ mod tests { block_builder_info, Duration::from_secs(10), cluster_info, - BundleReservedSpaceManager::new( - MAX_BLOCK_UNITS, - 3_000_000, - poh_recorder - .read() - .unwrap() - .ticks_per_slot() - .saturating_mul(8) - .saturating_div(10), - ), ); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); @@ -1190,16 +1152,6 @@ mod tests { block_builder_info, Duration::from_secs(10), cluster_info.clone(), - BundleReservedSpaceManager::new( - MAX_BLOCK_UNITS, - 3_000_000, - poh_recorder - .read() - .unwrap() - .ticks_per_slot() - .saturating_mul(8) - .saturating_div(10), - ), ); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); @@ -1360,12 +1312,6 @@ mod tests { let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - let reserved_ticks = bank.max_tick_height().saturating_mul(8).saturating_div(10); - - // The first 80% of the block, based on poh ticks, has `preallocated_bundle_cost` less compute units. - // The last 20% has has full compute so blockspace is maximized if BundleStage is idle. - let reserved_space = - BundleReservedSpaceManager::new(MAX_BLOCK_UNITS, 3_000_000, reserved_ticks); let mut bundle_stage_leader_metrics = BundleStageLeaderMetrics::new(1); assert_matches!( BundleConsumer::handle_tip_programs( @@ -1378,7 +1324,6 @@ mod tests { &QosService::new(1), &None, Duration::from_secs(10), - &reserved_space, &bank_start, &mut bundle_stage_leader_metrics ), @@ -1471,20 +1416,10 @@ mod tests { CostModel::calculate_cost(&sanitized_bundle.transactions[0], &bank.feature_set); let qos_service = QosService::new(1); - let reserved_ticks = bank.max_tick_height().saturating_mul(8).saturating_div(10); - - // The first 80% of the block, based on poh ticks, has `preallocated_bundle_cost` less compute units. - // The last 20% has has full compute so blockspace is maximized if BundleStage is idle. - let reserved_space = - BundleReservedSpaceManager::new(MAX_BLOCK_UNITS, 3_000_000, reserved_ticks); - - assert!(BundleConsumer::reserve_bundle_blockspace( - &qos_service, - &reserved_space, - &sanitized_bundle, - &bank - ) - .is_ok()); + assert!( + BundleConsumer::reserve_bundle_blockspace(&qos_service, &sanitized_bundle, &bank) + .is_ok() + ); assert_eq!( bank.read_cost_tracker().unwrap().block_cost(), transfer_cost.sum() @@ -1524,30 +1459,12 @@ mod tests { .set_block_cost_limit(transfer_cost.sum()); let qos_service = QosService::new(1); - let reserved_ticks = bank.max_tick_height().saturating_mul(8).saturating_div(10); - // The first 80% of the block, based on poh ticks, has `preallocated_bundle_cost` less compute units. - // The last 20% has has full compute so blockspace is maximized if BundleStage is idle. - let reserved_space = BundleReservedSpaceManager::new( - bank.read_cost_tracker().unwrap().block_cost(), - 50, - reserved_ticks, + assert!( + BundleConsumer::reserve_bundle_blockspace(&qos_service, &sanitized_bundle, &bank) + .is_err() ); - - assert!(BundleConsumer::reserve_bundle_blockspace( - &qos_service, - &reserved_space, - &sanitized_bundle, - &bank - ) - .is_err()); + // the block cost shall not be modified assert_eq!(bank.read_cost_tracker().unwrap().block_cost(), 0); - assert_eq!( - bank.read_cost_tracker().unwrap().block_cost_limit(), - bank.read_cost_tracker() - .unwrap() - .block_cost_limit() - .saturating_sub(50) - ); } } diff --git a/core/src/bundle_stage/bundle_reserved_space_manager.rs b/core/src/bundle_stage/bundle_reserved_space_manager.rs deleted file mode 100644 index 16ba0b888c..0000000000 --- a/core/src/bundle_stage/bundle_reserved_space_manager.rs +++ /dev/null @@ -1,237 +0,0 @@ -use {solana_runtime::bank::Bank, solana_sdk::clock::Slot, std::sync::Arc}; - -/// Manager responsible for reserving `bundle_reserved_cost` during the first `reserved_ticks` of a bank -/// and resetting the block cost limit to `block_cost_limit` after the reserved tick period is over -pub struct BundleReservedSpaceManager { - // the bank's cost limit - block_cost_limit: u64, - // bundles get this much reserved space for the first reserved_ticks - bundle_reserved_cost: u64, - // a reduced block_compute_limit is reserved for this many ticks, afterwards it goes back to full cost - reserved_ticks: u64, - last_slot_updated: Slot, -} - -impl BundleReservedSpaceManager { - pub fn new(block_cost_limit: u64, bundle_reserved_cost: u64, reserved_ticks: u64) -> Self { - Self { - block_cost_limit, - bundle_reserved_cost, - reserved_ticks, - last_slot_updated: u64::MAX, - } - } - - /// Call this on creation of new bank and periodically while bundle processing - /// to manage the block_cost_limits - pub fn tick(&mut self, bank: &Arc) { - if self.last_slot_updated == bank.slot() && !self.is_in_reserved_tick_period(bank) { - // new slot logic already ran, need to revert the block cost limit to original if - // ticks are past the reserved tick mark - debug!( - "slot: {} ticks: {}, resetting block_cost_limit to {}", - bank.slot(), - bank.tick_height(), - self.block_cost_limit - ); - bank.write_cost_tracker() - .unwrap() - .set_block_cost_limit(self.block_cost_limit); - } else if self.last_slot_updated != bank.slot() && self.is_in_reserved_tick_period(bank) { - // new slot, if in the first max_tick - tick_height slots reserve space - // otherwise can leave the current block limit as is - let new_block_cost_limit = self.reduced_block_cost_limit(); - debug!( - "slot: {} ticks: {}, reserving block_cost_limit with block_cost_limit of {}", - bank.slot(), - bank.tick_height(), - new_block_cost_limit - ); - bank.write_cost_tracker() - .unwrap() - .set_block_cost_limit(new_block_cost_limit); - self.last_slot_updated = bank.slot(); - } - } - - /// return true if the bank is still in the period where block_cost_limits is reduced - pub fn is_in_reserved_tick_period(&self, bank: &Bank) -> bool { - bank.tick_height() % bank.ticks_per_slot() < self.reserved_ticks - } - - /// return the block_cost_limits as determined by the tick height of the bank - pub fn expected_block_cost_limits(&self, bank: &Bank) -> u64 { - if self.is_in_reserved_tick_period(bank) { - self.reduced_block_cost_limit() - } else { - self.block_cost_limit() - } - } - - pub fn reduced_block_cost_limit(&self) -> u64 { - self.block_cost_limit - .saturating_sub(self.bundle_reserved_cost) - } - - pub fn block_cost_limit(&self) -> u64 { - self.block_cost_limit - } -} - -#[cfg(test)] -mod tests { - use { - crate::bundle_stage::bundle_reserved_space_manager::BundleReservedSpaceManager, - solana_ledger::genesis_utils::create_genesis_config, solana_runtime::bank::Bank, - solana_sdk::pubkey::Pubkey, std::sync::Arc, - }; - - #[test] - fn test_reserve_block_cost_limits_during_reserved_ticks() { - const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; - - let genesis_config_info = create_genesis_config(100); - let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); - - let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); - - let mut reserved_space = BundleReservedSpaceManager::new( - block_cost_limits, - BUNDLE_BLOCK_COST_LIMITS_RESERVATION, - 5, - ); - reserved_space.tick(&bank); - - assert_eq!( - bank.read_cost_tracker().unwrap().block_cost_limit(), - block_cost_limits - BUNDLE_BLOCK_COST_LIMITS_RESERVATION - ); - } - - #[test] - fn test_dont_reserve_block_cost_limits_after_reserved_ticks() { - const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; - - let genesis_config_info = create_genesis_config(100); - let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); - - let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); - - for _ in 0..5 { - bank.register_default_tick_for_test(); - } - - let mut reserved_space = BundleReservedSpaceManager::new( - block_cost_limits, - BUNDLE_BLOCK_COST_LIMITS_RESERVATION, - 5, - ); - reserved_space.tick(&bank); - - assert_eq!( - bank.read_cost_tracker().unwrap().block_cost_limit(), - block_cost_limits - ); - } - - #[test] - fn test_dont_reset_block_cost_limits_during_reserved_ticks() { - const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; - - let genesis_config_info = create_genesis_config(100); - let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); - - let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); - - let mut reserved_space = BundleReservedSpaceManager::new( - block_cost_limits, - BUNDLE_BLOCK_COST_LIMITS_RESERVATION, - 5, - ); - - reserved_space.tick(&bank); - bank.register_default_tick_for_test(); - reserved_space.tick(&bank); - - assert_eq!( - bank.read_cost_tracker().unwrap().block_cost_limit(), - block_cost_limits - BUNDLE_BLOCK_COST_LIMITS_RESERVATION - ); - } - - #[test] - fn test_reset_block_cost_limits_after_reserved_ticks() { - const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; - - let genesis_config_info = create_genesis_config(100); - let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); - - let block_cost_limits = bank.read_cost_tracker().unwrap().block_cost_limit(); - - let mut reserved_space = BundleReservedSpaceManager::new( - block_cost_limits, - BUNDLE_BLOCK_COST_LIMITS_RESERVATION, - 5, - ); - - reserved_space.tick(&bank); - - for _ in 0..5 { - bank.register_default_tick_for_test(); - } - reserved_space.tick(&bank); - - assert_eq!( - bank.read_cost_tracker().unwrap().block_cost_limit(), - block_cost_limits - ); - } - - #[test] - fn test_block_limits_after_first_slot() { - const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; - const RESERVED_TICKS: u64 = 5; - let genesis_config_info = create_genesis_config(100); - let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); - - for _ in 0..genesis_config_info.genesis_config.ticks_per_slot { - bank.register_default_tick_for_test(); - } - assert!(bank.is_complete()); - bank.freeze(); - assert_eq!( - bank.read_cost_tracker().unwrap().block_cost_limit(), - solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS_SIMD_0207, - ); - - let bank1 = Arc::new(Bank::new_from_parent(bank.clone(), &Pubkey::default(), 1)); - assert_eq!(bank1.slot(), 1); - assert_eq!(bank1.tick_height(), 64); - assert_eq!(bank1.max_tick_height(), 128); - - // reserve space - let block_cost_limits = bank1.read_cost_tracker().unwrap().block_cost_limit(); - let mut reserved_space = BundleReservedSpaceManager::new( - block_cost_limits, - BUNDLE_BLOCK_COST_LIMITS_RESERVATION, - RESERVED_TICKS, - ); - reserved_space.tick(&bank1); - - // wait for reservation to be over - (0..RESERVED_TICKS).for_each(|_| { - bank1.register_default_tick_for_test(); - assert_eq!( - bank1.read_cost_tracker().unwrap().block_cost_limit(), - block_cost_limits - BUNDLE_BLOCK_COST_LIMITS_RESERVATION - ); - }); - reserved_space.tick(&bank1); - - // after reservation, revert back to normal limit - assert_eq!( - bank1.read_cost_tracker().unwrap().block_cost_limit(), - solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS_SIMD_0207, - ); - } -} diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a5110bd4a6..c0d550cb2a 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -38,6 +38,7 @@ use { rpc_subscriptions::RpcSubscriptions, }, solana_runtime::{ + bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, @@ -78,6 +79,20 @@ pub struct TpuSockets { pub transactions_forwards_quic: Vec, } +/// For the first `reserved_ticks` ticks of a bank, the preallocated_bundle_cost is subtracted +/// from the Bank's block cost limit. +fn calculate_block_cost_limit_reservation( + bank: &Bank, + reserved_ticks: u64, + preallocated_bundle_cost: u64, +) -> u64 { + if bank.tick_height() % bank.ticks_per_slot() < reserved_ticks { + preallocated_bundle_cost + } else { + 0 + } +} + pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, @@ -299,6 +314,15 @@ impl Tpu { let bundle_account_locker = BundleAccountLocker::default(); // The tip program can't be used in BankingStage to avoid someone from stealing tips mid-slot. + // The first 80% of the block, based on poh ticks, has `preallocated_bundle_cost` less compute units. + // The last 20% has has full compute so blockspace is maximized if BundleStage is idle. + let reserved_ticks = poh_recorder + .read() + .unwrap() + .ticks_per_slot() + .saturating_mul(8) + .saturating_div(10); + let mut blacklisted_accounts = HashSet::new(); blacklisted_accounts.insert(tip_manager.tip_payment_program_id()); let banking_stage = BankingStage::new( @@ -317,6 +341,13 @@ impl Tpu { enable_block_production_forwarding, blacklisted_accounts, bundle_account_locker.clone(), + move |bank| { + calculate_block_cost_limit_reservation( + bank, + reserved_ticks, + preallocated_bundle_cost, + ) + }, ); let bundle_stage = BundleStage::new( @@ -330,7 +361,6 @@ impl Tpu { tip_manager, bundle_account_locker, &block_builder_fee_info, - preallocated_bundle_cost, prioritization_fee_cache, ); @@ -417,3 +447,48 @@ impl Tpu { Ok(()) } } + +#[cfg(test)] +mod test { + use { + super::calculate_block_cost_limit_reservation, + solana_ledger::genesis_utils::create_genesis_config, solana_runtime::bank::Bank, + solana_sdk::pubkey::Pubkey, std::sync::Arc, + }; + + #[test] + fn test_calculate_block_cost_limit_reservation() { + const BUNDLE_BLOCK_COST_LIMITS_RESERVATION: u64 = 100; + const RESERVED_TICKS: u64 = 5; + let genesis_config_info = create_genesis_config(100); + let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); + + for _ in 0..genesis_config_info.genesis_config.ticks_per_slot { + bank.register_default_tick_for_test(); + } + assert!(bank.is_complete()); + bank.freeze(); + let bank1 = Arc::new(Bank::new_from_parent(bank.clone(), &Pubkey::default(), 1)); + + // wait for reservation to be over + (0..RESERVED_TICKS).for_each(|_| { + assert_eq!( + calculate_block_cost_limit_reservation( + &bank1, + RESERVED_TICKS, + BUNDLE_BLOCK_COST_LIMITS_RESERVATION, + ), + BUNDLE_BLOCK_COST_LIMITS_RESERVATION + ); + bank1.register_default_tick_for_test(); + }); + assert_eq!( + calculate_block_cost_limit_reservation( + &bank1, + RESERVED_TICKS, + BUNDLE_BLOCK_COST_LIMITS_RESERVATION, + ), + 0 + ); + } +} diff --git a/cost-model/benches/cost_tracker.rs b/cost-model/benches/cost_tracker.rs index a7b8b107d0..19944542b7 100644 --- a/cost-model/benches/cost_tracker.rs +++ b/cost-model/benches/cost_tracker.rs @@ -73,7 +73,7 @@ fn bench_cost_tracker_non_contentious_transaction(bencher: &mut Bencher) { bencher.iter(|| { for tx_cost in tx_costs.iter() { - if cost_tracker.try_add(tx_cost).is_err() { + if cost_tracker.try_add(tx_cost, 0).is_err() { break; } // stop when hit limits cost_tracker.update_execution_cost(tx_cost, 0, 0); // update execution cost down to zero @@ -91,7 +91,7 @@ fn bench_cost_tracker_contentious_transaction(bencher: &mut Bencher) { bencher.iter(|| { for tx_cost in tx_costs.iter() { - if cost_tracker.try_add(tx_cost).is_err() { + if cost_tracker.try_add(tx_cost, 0).is_err() { break; } // stop when hit limits cost_tracker.update_execution_cost(tx_cost, 0, 0); // update execution cost down to zero diff --git a/cost-model/src/cost_tracker.rs b/cost-model/src/cost_tracker.rs index dcf5c32358..417a5ea70b 100644 --- a/cost-model/src/cost_tracker.rs +++ b/cost-model/src/cost_tracker.rs @@ -170,8 +170,9 @@ impl CostTracker { pub fn try_add( &mut self, tx_cost: &TransactionCost, + block_cost_limit_reservation: u64, ) -> Result { - self.would_fit(tx_cost)?; + self.would_fit(tx_cost, block_cost_limit_reservation)?; let updated_costliest_account_cost = self.add_transaction_cost(tx_cost); Ok(UpdatedCosts { updated_block_cost: self.block_cost, @@ -283,6 +284,7 @@ impl CostTracker { fn would_fit( &self, tx_cost: &TransactionCost, + block_cost_limit_reservation: u64, ) -> Result<(), CostTrackerError> { let cost: u64 = tx_cost.sum(); @@ -293,7 +295,11 @@ impl CostTracker { } } - if self.block_cost.saturating_add(cost) > self.block_cost_limit { + if self.block_cost.saturating_add(cost) + > self + .block_cost_limit + .saturating_sub(block_cost_limit_reservation) + { // check against the total package cost return Err(CostTrackerError::WouldExceedBlockMaxLimit); } @@ -505,7 +511,7 @@ mod tests { // build testee to have capacity for one simple transaction let mut testee = CostTracker::new(cost, cost, cost); - assert!(testee.would_fit(&tx_cost).is_ok()); + assert!(testee.would_fit(&tx_cost, 0).is_ok()); testee.add_transaction_cost(&tx_cost); assert_eq!(cost, testee.block_cost); assert_eq!(0, testee.vote_cost); @@ -522,7 +528,7 @@ mod tests { // build testee to have capacity for one simple transaction let mut testee = CostTracker::new(cost, cost, cost); - assert!(testee.would_fit(&tx_cost).is_ok()); + assert!(testee.would_fit(&tx_cost, 0).is_ok()); testee.add_transaction_cost(&tx_cost); assert_eq!(cost, testee.block_cost); assert_eq!(cost, testee.vote_cost); @@ -544,7 +550,7 @@ mod tests { // build testee to have capacity for one simple transaction let mut testee = CostTracker::new(cost, cost, cost); - assert!(testee.would_fit(&tx_cost).is_ok()); + assert!(testee.would_fit(&tx_cost, 0).is_ok()); let old = testee.allocated_accounts_data_size; testee.add_transaction_cost(&tx_cost); assert_eq!(old + 1, testee.allocated_accounts_data_size); @@ -564,11 +570,11 @@ mod tests { // build testee to have capacity for two simple transactions, with same accounts let mut testee = CostTracker::new(cost1 + cost2, cost1 + cost2, cost1 + cost2); { - assert!(testee.would_fit(&tx_cost1).is_ok()); + assert!(testee.would_fit(&tx_cost1, 0).is_ok()); testee.add_transaction_cost(&tx_cost1); } { - assert!(testee.would_fit(&tx_cost2).is_ok()); + assert!(testee.would_fit(&tx_cost2, 0).is_ok()); testee.add_transaction_cost(&tx_cost2); } assert_eq!(cost1 + cost2, testee.block_cost); @@ -593,11 +599,11 @@ mod tests { // build testee to have capacity for two simple transactions, with same accounts let mut testee = CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2, cost1 + cost2); { - assert!(testee.would_fit(&tx_cost1).is_ok()); + assert!(testee.would_fit(&tx_cost1, 0).is_ok()); testee.add_transaction_cost(&tx_cost1); } { - assert!(testee.would_fit(&tx_cost2).is_ok()); + assert!(testee.would_fit(&tx_cost2, 0).is_ok()); testee.add_transaction_cost(&tx_cost2); } assert_eq!(cost1 + cost2, testee.block_cost); @@ -621,12 +627,12 @@ mod tests { let mut testee = CostTracker::new(cmp::min(cost1, cost2), cost1 + cost2, cost1 + cost2); // should have room for first transaction { - assert!(testee.would_fit(&tx_cost1).is_ok()); + assert!(testee.would_fit(&tx_cost1, 0).is_ok()); testee.add_transaction_cost(&tx_cost1); } // but no more sapce on the same chain (same signer account) { - assert!(testee.would_fit(&tx_cost2).is_err()); + assert!(testee.would_fit(&tx_cost2, 0).is_err()); } } @@ -647,12 +653,12 @@ mod tests { CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2 - 1, cost1 + cost2 - 1); // should have room for first transaction { - assert!(testee.would_fit(&tx_cost1).is_ok()); + assert!(testee.would_fit(&tx_cost1, 0).is_ok()); testee.add_transaction_cost(&tx_cost1); } // but no more room for package as whole { - assert!(testee.would_fit(&tx_cost2).is_err()); + assert!(testee.would_fit(&tx_cost2, 0).is_err()); } } @@ -672,19 +678,19 @@ mod tests { let mut testee = CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2, cost1 + cost2 - 1); // should have room for first vote { - assert!(testee.would_fit(&tx_cost1).is_ok()); + assert!(testee.would_fit(&tx_cost1, 0).is_ok()); testee.add_transaction_cost(&tx_cost1); } // but no more room for package as whole { - assert!(testee.would_fit(&tx_cost2).is_err()); + assert!(testee.would_fit(&tx_cost2, 0).is_err()); } // however there is room for none-vote tx3 { let third_account = Keypair::new(); let tx3 = build_simple_transaction(&third_account); let tx_cost3 = simple_transaction_cost(&tx3, 5); - assert!(testee.would_fit(&tx_cost3).is_ok()); + assert!(testee.would_fit(&tx_cost3, 0).is_ok()); } } @@ -712,10 +718,10 @@ mod tests { // build testee that passes let testee = CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2 - 1, cost1 + cost2 - 1); - assert!(testee.would_fit(&tx_cost1).is_ok()); + assert!(testee.would_fit(&tx_cost1, 0).is_ok()); // data is too big assert_eq!( - testee.would_fit(&tx_cost2), + testee.would_fit(&tx_cost2, 0), Err(CostTrackerError::WouldExceedAccountDataBlockLimit), ); } @@ -735,8 +741,8 @@ mod tests { // build testee let mut testee = CostTracker::new(cost1 + cost2, cost1 + cost2, cost1 + cost2); - assert!(testee.try_add(&tx_cost1).is_ok()); - assert!(testee.try_add(&tx_cost2).is_ok()); + assert!(testee.try_add(&tx_cost1, 0).is_ok()); + assert!(testee.try_add(&tx_cost2, 0).is_ok()); assert_eq!(cost1 + cost2, testee.block_cost); // removing a tx_cost affects block_cost @@ -744,11 +750,11 @@ mod tests { assert_eq!(cost2, testee.block_cost); // add back tx1 - assert!(testee.try_add(&tx_cost1).is_ok()); + assert!(testee.try_add(&tx_cost1, 0).is_ok()); assert_eq!(cost1 + cost2, testee.block_cost); // cannot add tx1 again, cost limit would be exceeded - assert!(testee.try_add(&tx_cost1).is_err()); + assert!(testee.try_add(&tx_cost1, 0).is_err()); } #[test] @@ -770,7 +776,7 @@ mod tests { { let transaction = WritableKeysTransaction(vec![acct1, acct2, acct3]); let tx_cost = simple_transaction_cost(&transaction, cost); - assert!(testee.try_add(&tx_cost).is_ok()); + assert!(testee.try_add(&tx_cost, 0).is_ok()); let (_costliest_account, costliest_account_cost) = testee.find_costliest_account(); assert_eq!(cost, testee.block_cost); assert_eq!(3, testee.cost_by_writable_accounts.len()); @@ -785,7 +791,7 @@ mod tests { { let transaction = WritableKeysTransaction(vec![acct2]); let tx_cost = simple_transaction_cost(&transaction, cost); - assert!(testee.try_add(&tx_cost).is_ok()); + assert!(testee.try_add(&tx_cost, 0).is_ok()); let (costliest_account, costliest_account_cost) = testee.find_costliest_account(); assert_eq!(cost * 2, testee.block_cost); assert_eq!(3, testee.cost_by_writable_accounts.len()); @@ -802,7 +808,7 @@ mod tests { { let transaction = WritableKeysTransaction(vec![acct1, acct2]); let tx_cost = simple_transaction_cost(&transaction, cost); - assert!(testee.try_add(&tx_cost).is_err()); + assert!(testee.try_add(&tx_cost, 0).is_err()); let (costliest_account, costliest_account_cost) = testee.find_costliest_account(); assert_eq!(cost * 2, testee.block_cost); assert_eq!(3, testee.cost_by_writable_accounts.len()); @@ -825,7 +831,7 @@ mod tests { let tx_cost = simple_transaction_cost(&transaction, cost); let mut expected_block_cost = tx_cost.sum(); let expected_tx_count = 1; - assert!(testee.try_add(&tx_cost).is_ok()); + assert!(testee.try_add(&tx_cost, 0).is_ok()); assert_eq!(expected_block_cost, testee.block_cost()); assert_eq!(expected_tx_count, testee.transaction_count()); testee @@ -924,7 +930,7 @@ mod tests { let test_update_cost_tracker = |execution_cost_adjust: i64, loaded_acounts_data_size_cost_adjust: i64| { let mut cost_tracker = CostTracker::default(); - assert!(cost_tracker.try_add(&tx_cost).is_ok()); + assert!(cost_tracker.try_add(&tx_cost, 0).is_ok()); let actual_programs_execution_cost = (estimated_programs_execution_cost as i64 + execution_cost_adjust) as u64; @@ -988,4 +994,20 @@ mod tests { assert_eq!(0, cost_tracker.vote_cost); assert_eq!(0, cost_tracker.allocated_accounts_data_size); } + + #[test] + fn test_cost_tracker_try_add_with_reservation() { + let mut cost_tracker = CostTracker { + block_cost_limit: 100, + ..CostTracker::default() + }; + + let transaction = WritableKeysTransaction(vec![Pubkey::new_unique()]); + let tx_cost = simple_transaction_cost(&transaction, 100); + assert_eq!( + cost_tracker.try_add(&tx_cost, 1).unwrap_err(), + CostTrackerError::WouldExceedBlockMaxLimit + ); + cost_tracker.try_add(&tx_cost, 0).unwrap(); + } } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 5fa1c96909..5e98cc9d6f 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -488,7 +488,7 @@ fn compute_slot_cost( num_programs += transaction.message().instructions().len(); let tx_cost = CostModel::calculate_cost(&transaction, &feature_set); - let result = cost_tracker.try_add(&tx_cost); + let result = cost_tracker.try_add(&tx_cost, 0); if result.is_err() { println!( "Slot: {slot}, CostModel rejected transaction {transaction:?}, reason \ diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 7c37b59ddd..8c02817e68 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -257,7 +257,7 @@ fn check_block_cost_limits( let mut cost_tracker = bank.write_cost_tracker().unwrap(); for tx_cost in &tx_costs_with_actual_execution_units { cost_tracker - .try_add(tx_cost) + .try_add(tx_cost, 0) .map_err(TransactionError::from)?; } } diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 67d229d43a..f3d801482e 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -5,6 +5,7 @@ use { log::*, reqwest::{self, header::CONTENT_TYPE}, serde_json::{json, Value}, + serial_test::serial, solana_account_decoder::UiAccount, solana_client::connection_cache::ConnectionCache, solana_pubsub_client::nonblocking::pubsub_client::PubsubClient, @@ -283,6 +284,7 @@ fn test_rpc_slot_updates() { } #[test] +#[serial] fn test_rpc_subscriptions() { solana_logger::setup();