diff --git a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs index 65b3ed36531048..54b9c5b57a0b06 100644 --- a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs @@ -4,7 +4,7 @@ use { prio_graph_scheduler::{ Batches, PrioGraphScheduler, TransactionSchedulingError, TransactionSchedulingInfo, }, - scheduler::{Scheduler, SchedulingSummary}, + scheduler::{PreLockFilterAction, Scheduler, SchedulingSummary}, scheduler_error::SchedulerError, thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError}, transaction_priority_id::TransactionPriorityId, @@ -82,7 +82,7 @@ impl Scheduler for GreedyScheduler { &mut self, container: &mut S, _pre_graph_filter: impl Fn(&[&Tx], &mut [bool]), - pre_lock_filter: impl Fn(&Tx) -> bool, + pre_lock_filter: impl Fn(&TransactionState) -> PreLockFilterAction, ) -> Result { let num_threads = self.consume_work_senders.len(); let target_cu_per_thread = self.config.target_scheduled_cus / num_threads as u64; @@ -99,7 +99,6 @@ impl Scheduler for GreedyScheduler { } // Track metrics on filter. - let mut num_filtered_out: usize = 0; let mut num_scanned: usize = 0; let mut num_scheduled: usize = 0; let mut num_sent: usize = 0; @@ -148,10 +147,6 @@ impl Scheduler for GreedyScheduler { ) }, ) { - Err(TransactionSchedulingError::Filtered) => { - num_filtered_out += 1; - container.remove_by_id(id.id); - } Err(TransactionSchedulingError::UnschedulableConflicts) | Err(TransactionSchedulingError::UnschedulableThread) => { num_unschedulable += 1; @@ -207,7 +202,7 @@ impl Scheduler for GreedyScheduler { Ok(SchedulingSummary { num_scheduled, num_unschedulable, - num_filtered_out, + num_filtered_out: 0, filter_time_us: 0, }) } @@ -348,17 +343,17 @@ impl GreedyScheduler { fn try_schedule_transaction( transaction_state: &mut TransactionState, - pre_lock_filter: impl Fn(&Tx) -> bool, + pre_lock_filter: impl Fn(&TransactionState) -> PreLockFilterAction, account_locks: &mut ThreadAwareAccountLocks, schedulable_threads: ThreadSet, thread_selector: impl Fn(ThreadSet) -> ThreadId, ) -> Result, TransactionSchedulingError> { - let transaction = &transaction_state.transaction_ttl().transaction; - if !pre_lock_filter(transaction) { - return Err(TransactionSchedulingError::Filtered); + match pre_lock_filter(transaction_state) { + PreLockFilterAction::AttemptToSchedule => {} } // Schedule the transaction if it can be. + let transaction = &transaction_state.transaction_ttl().transaction; let account_keys = transaction.account_keys(); let write_account_locks = account_keys .iter() @@ -527,8 +522,10 @@ mod test { results.fill(true); } - fn test_pre_lock_filter(_tx: &RuntimeTransaction) -> bool { - true + fn test_pre_lock_filter( + _tx: &TransactionState>, + ) -> PreLockFilterAction { + PreLockFilterAction::AttemptToSchedule } #[test] diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 9d5b3a08d1189c..c2caa0957ee4ae 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -1,7 +1,7 @@ use { super::{ in_flight_tracker::InFlightTracker, - scheduler::Scheduler, + scheduler::{PreLockFilterAction, Scheduler}, scheduler_error::SchedulerError, thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError}, transaction_state::SanitizedTransactionTTL, @@ -108,7 +108,7 @@ impl Scheduler for PrioGraphScheduler { &mut self, container: &mut S, pre_graph_filter: impl Fn(&[&Tx], &mut [bool]), - pre_lock_filter: impl Fn(&Tx) -> bool, + pre_lock_filter: impl Fn(&TransactionState) -> PreLockFilterAction, ) -> Result { let num_threads = self.consume_work_senders.len(); let max_cu_per_thread = self.config.max_scheduled_cus / num_threads as u64; @@ -233,9 +233,6 @@ impl Scheduler for PrioGraphScheduler { ); match maybe_schedule_info { - Err(TransactionSchedulingError::Filtered) => { - container.remove_by_id(id.id); - } Err(TransactionSchedulingError::UnschedulableConflicts) | Err(TransactionSchedulingError::UnschedulableThread) => { unschedulable_ids.push(id); @@ -558,8 +555,6 @@ pub(crate) struct TransactionSchedulingInfo { /// Error type for reasons a transaction could not be scheduled. pub(crate) enum TransactionSchedulingError { - /// Transaction was filtered out before locking. - Filtered, /// Transaction cannot be scheduled due to conflicts, or /// higher priority conflicting transactions are unschedulable. UnschedulableConflicts, @@ -569,18 +564,18 @@ pub(crate) enum TransactionSchedulingError { fn try_schedule_transaction( transaction_state: &mut TransactionState, - pre_lock_filter: impl Fn(&Tx) -> bool, + pre_lock_filter: impl Fn(&TransactionState) -> PreLockFilterAction, blocking_locks: &mut ReadWriteAccountSet, account_locks: &mut ThreadAwareAccountLocks, num_threads: usize, thread_selector: impl Fn(ThreadSet) -> ThreadId, ) -> Result, TransactionSchedulingError> { - let transaction = &transaction_state.transaction_ttl().transaction; - if !pre_lock_filter(transaction) { - return Err(TransactionSchedulingError::Filtered); + match pre_lock_filter(transaction_state) { + PreLockFilterAction::AttemptToSchedule => {} } // Check if this transaction conflicts with any blocked transactions + let transaction = &transaction_state.transaction_ttl().transaction; if !blocking_locks.check_locks(transaction) { blocking_locks.take_locks(transaction); return Err(TransactionSchedulingError::UnschedulableConflicts); @@ -755,8 +750,10 @@ mod tests { results.fill(true); } - fn test_pre_lock_filter(_tx: &RuntimeTransaction) -> bool { - true + fn test_pre_lock_filter( + _tx: &TransactionState>, + ) -> PreLockFilterAction { + PreLockFilterAction::AttemptToSchedule } #[test] @@ -908,27 +905,4 @@ mod tests { assert_eq!(collect_work(&work_receivers[1]).1, [vec![4], vec![5]]); } - - #[test] - fn test_schedule_pre_lock_filter() { - let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); - let pubkey = Pubkey::new_unique(); - let keypair = Keypair::new(); - let mut container = create_container([ - (&Keypair::new(), &[pubkey], 1, 1), - (&keypair, &[pubkey], 1, 2), - (&Keypair::new(), &[pubkey], 1, 3), - ]); - - // 2nd transaction should be filtered out and dropped before locking. - let pre_lock_filter = |tx: &RuntimeTransaction| { - tx.message().fee_payer() != &keypair.pubkey() - }; - let scheduling_summary = scheduler - .schedule(&mut container, test_pre_graph_filter, pre_lock_filter) - .unwrap(); - assert_eq!(scheduling_summary.num_scheduled, 2); - assert_eq!(scheduling_summary.num_unschedulable, 0); - assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![2], vec![0]]); - } } diff --git a/core/src/banking_stage/transaction_scheduler/scheduler.rs b/core/src/banking_stage/transaction_scheduler/scheduler.rs index 0aacb68630df2a..1e6c91fc7f2f95 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler.rs @@ -1,5 +1,8 @@ use { - super::{scheduler_error::SchedulerError, transaction_state_container::StateContainer}, + super::{ + scheduler_error::SchedulerError, transaction_state::TransactionState, + transaction_state_container::StateContainer, + }, solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, }; @@ -11,7 +14,7 @@ pub(crate) trait Scheduler { &mut self, container: &mut S, pre_graph_filter: impl Fn(&[&Tx], &mut [bool]), - pre_lock_filter: impl Fn(&Tx) -> bool, + pre_lock_filter: impl Fn(&TransactionState) -> PreLockFilterAction, ) -> Result; /// Receive completed batches of transactions without blocking. @@ -22,6 +25,12 @@ pub(crate) trait Scheduler { ) -> Result<(usize, usize), SchedulerError>; } +/// Action to be taken by pre-lock filter. +pub(crate) enum PreLockFilterAction { + /// Attempt to schedule the transaction. + AttemptToSchedule, +} + /// Metrics from scheduling transactions. #[derive(Default, Debug, PartialEq, Eq)] pub(crate) struct SchedulingSummary { diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index d7e7061bb0c246..6fe1a1e2ffade3 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -4,7 +4,7 @@ use { super::{ receive_and_buffer::ReceiveAndBuffer, - scheduler::Scheduler, + scheduler::{PreLockFilterAction, Scheduler}, scheduler_error::SchedulerError, scheduler_metrics::{ SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics, @@ -157,7 +157,7 @@ where MAX_PROCESSING_AGE, ) }, - |_| true // no pre-lock filter for now + |_| PreLockFilterAction::AttemptToSchedule // no pre-lock filter for now )?); self.count_metrics.update(|count_metrics| {