Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move block_cost_limit tracking to BankingStage in preparation for SIMD-0207 (BP #753) #786

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ fn main() {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// This is so that the signal_receiver does not go out of scope after the closure.
Expand Down
2 changes: 2 additions & 0 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
&|_| 0,
);
});

Expand Down Expand Up @@ -322,6 +323,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let chunk_len = verified.len() / CHUNKS;
Expand Down
1 change: 1 addition & 0 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz
&bank,
transaction_iter.next().unwrap(),
0,
&|_| 0,
);
assert!(summary
.execute_and_commit_transactions_output
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ impl BankingSimulator {
false,
collections::HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let (&_slot, &raw_base_event_time) = freeze_time_by_slot
Expand Down
26 changes: 24 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use {
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
Expand Down Expand Up @@ -365,6 +365,8 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
// callback function for compute space reservation for BundleStage
block_cost_limit_block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
Self::new_num_threads(
block_production_method,
Expand All @@ -383,6 +385,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_block_cost_limit_reservation_cb,
)
}

Expand All @@ -404,6 +407,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
match block_production_method {
BlockProductionMethod::ThreadLocalMultiIterator => {
Expand All @@ -422,6 +426,7 @@ impl BankingStage {
prioritization_fee_cache,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_reservation_cb,
)
}
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
Expand All @@ -440,6 +445,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_reservation_cb,
),
}
}
Expand All @@ -460,6 +466,7 @@ impl BankingStage {
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -528,6 +535,7 @@ impl BankingStage {
unprocessed_transaction_storage,
blacklisted_accounts.clone(),
bundle_account_locker.clone(),
block_cost_limit_reservation_cb.clone(),
)
})
.collect();
Expand All @@ -551,6 +559,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -599,6 +608,7 @@ impl BankingStage {
),
blacklisted_accounts.clone(),
bundle_account_locker.clone(),
block_cost_limit_reservation_cb.clone(),
));
}

Expand Down Expand Up @@ -628,11 +638,12 @@ impl BankingStage {
);

worker_metrics.push(consume_worker.metrics_handle());
let cb = block_cost_limit_reservation_cb.clone();
bank_thread_hdls.push(
Builder::new()
.name(format!("solCoWorker{id:02}"))
.spawn(move || {
let _ = consume_worker.run();
let _ = consume_worker.run(cb);
})
.unwrap(),
)
Expand Down Expand Up @@ -687,6 +698,7 @@ impl BankingStage {
unprocessed_transaction_storage: UnprocessedTransactionStorage,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let consumer = Consumer::new(
Expand All @@ -708,6 +720,7 @@ impl BankingStage {
&consumer,
id,
unprocessed_transaction_storage,
block_cost_limit_reservation_cb,
)
})
.unwrap()
Expand All @@ -722,6 +735,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64,
) {
if unprocessed_transaction_storage.should_not_process() {
return;
Expand All @@ -747,6 +761,7 @@ impl BankingStage {
unprocessed_transaction_storage,
banking_stage_stats,
slot_metrics_tracker,
block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_consume_buffered_packets_us(consume_buffered_packets_us);
Expand Down Expand Up @@ -787,6 +802,7 @@ impl BankingStage {
consumer: &Consumer,
id: u32,
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64,
) {
let mut banking_stage_stats = BankingStageStats::new(id);
let mut tracer_packet_stats = TracerPacketStats::new(id);
Expand All @@ -806,6 +822,7 @@ impl BankingStage {
&banking_stage_stats,
&mut slot_metrics_tracker,
&mut tracer_packet_stats,
&block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_us);
Expand Down Expand Up @@ -939,6 +956,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
drop(non_vote_sender);
drop(tpu_vote_sender);
Expand Down Expand Up @@ -997,6 +1015,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
trace!("sending bank");
drop(non_vote_sender);
Expand Down Expand Up @@ -1084,6 +1103,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// fund another account so we can send 2 good transactions in a single batch.
Expand Down Expand Up @@ -1261,6 +1281,7 @@ mod tests {
&Arc::new(PrioritizationFeeCache::new(0u64)),
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// wait for banking_stage to eat the packets
Expand Down Expand Up @@ -1464,6 +1485,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
Expand Down
30 changes: 20 additions & 10 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ impl ConsumeWorker {
self.metrics.clone()
}

pub fn run(self) -> Result<(), ConsumeWorkerError> {
pub fn run(self, reservation_cb: impl Fn(&Bank) -> u64) -> Result<(), ConsumeWorkerError> {
loop {
let work = self.consume_receiver.recv()?;
self.consume_loop(work)?;
self.consume_loop(work, &reservation_cb)?;
}
}

fn consume_loop(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn consume_loop(
&self,
work: ConsumeWork,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError> {
let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
let Some(mut bank) = maybe_consume_bank else {
self.metrics
Expand Down Expand Up @@ -99,18 +103,24 @@ impl ConsumeWorker {
return self.retry_drain(work);
}
}
self.consume(&bank, work)?;
self.consume(&bank, work, reservation_cb)?;
}

Ok(())
}

/// Consume a single batch.
fn consume(&self, bank: &Arc<Bank>, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn consume(
&self,
bank: &Arc<Bank>,
work: ConsumeWork,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
&work.max_ages,
reservation_cb,
);

self.metrics.update_for_consume(&output);
Expand Down Expand Up @@ -847,7 +857,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));

let pubkey1 = Pubkey::new_unique();

Expand Down Expand Up @@ -892,7 +902,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -941,7 +951,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -993,7 +1003,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -1068,7 +1078,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down
Loading
Loading