Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and Refactor pre_lock_filter #4980

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
prio_graph_scheduler::{
Batches, PrioGraphScheduler, TransactionSchedulingError, TransactionSchedulingInfo,
},
scheduler::{Scheduler, SchedulingSummary},
scheduler::{PreLockFilterAction, Scheduler, SchedulingSummary},
scheduler_error::SchedulerError,
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError},
transaction_priority_id::TransactionPriorityId,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
&mut self,
container: &mut S,
_pre_graph_filter: impl Fn(&[&Tx], &mut [bool]),
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let target_cu_per_thread = self.config.target_scheduled_cus / num_threads as u64;
Expand All @@ -99,7 +99,6 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
}

// Track metrics on filter.
let mut num_filtered_out: usize = 0;
let mut num_scanned: usize = 0;
let mut num_scheduled: usize = 0;
let mut num_sent: usize = 0;
Expand Down Expand Up @@ -148,10 +147,6 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
)
},
) {
Err(TransactionSchedulingError::Filtered) => {
num_filtered_out += 1;
container.remove_by_id(id.id);
}
Err(TransactionSchedulingError::UnschedulableConflicts)
| Err(TransactionSchedulingError::UnschedulableThread) => {
num_unschedulable += 1;
Expand Down Expand Up @@ -207,7 +202,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
Ok(SchedulingSummary {
num_scheduled,
num_unschedulable,
num_filtered_out,
num_filtered_out: 0,
filter_time_us: 0,
})
}
Expand Down Expand Up @@ -348,17 +343,17 @@ impl<Tx: TransactionWithMeta> GreedyScheduler<Tx> {

fn try_schedule_transaction<Tx: TransactionWithMeta>(
transaction_state: &mut TransactionState<Tx>,
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
account_locks: &mut ThreadAwareAccountLocks,
schedulable_threads: ThreadSet,
thread_selector: impl Fn(ThreadSet) -> ThreadId,
) -> Result<TransactionSchedulingInfo<Tx>, TransactionSchedulingError> {
let transaction = &transaction_state.transaction_ttl().transaction;
if !pre_lock_filter(transaction) {
return Err(TransactionSchedulingError::Filtered);
match pre_lock_filter(transaction_state) {
PreLockFilterAction::AttemptToSchedule => {}
}

// Schedule the transaction if it can be.
let transaction = &transaction_state.transaction_ttl().transaction;
let account_keys = transaction.account_keys();
let write_account_locks = account_keys
.iter()
Expand Down Expand Up @@ -527,8 +522,10 @@ mod test {
results.fill(true);
}

fn test_pre_lock_filter(_tx: &RuntimeTransaction<SanitizedTransaction>) -> bool {
true
fn test_pre_lock_filter(
_tx: &TransactionState<RuntimeTransaction<SanitizedTransaction>>,
) -> PreLockFilterAction {
PreLockFilterAction::AttemptToSchedule
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::{
in_flight_tracker::InFlightTracker,
scheduler::Scheduler,
scheduler::{PreLockFilterAction, Scheduler},
scheduler_error::SchedulerError,
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError},
transaction_state::SanitizedTransactionTTL,
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
&mut self,
container: &mut S,
pre_graph_filter: impl Fn(&[&Tx], &mut [bool]),
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
) -> Result<SchedulingSummary, SchedulerError> {
let num_threads = self.consume_work_senders.len();
let max_cu_per_thread = self.config.max_scheduled_cus / num_threads as u64;
Expand Down Expand Up @@ -233,9 +233,6 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
);

match maybe_schedule_info {
Err(TransactionSchedulingError::Filtered) => {
container.remove_by_id(id.id);
}
Err(TransactionSchedulingError::UnschedulableConflicts)
| Err(TransactionSchedulingError::UnschedulableThread) => {
unschedulable_ids.push(id);
Expand Down Expand Up @@ -558,8 +555,6 @@ pub(crate) struct TransactionSchedulingInfo<Tx> {

/// Error type for reasons a transaction could not be scheduled.
pub(crate) enum TransactionSchedulingError {
/// Transaction was filtered out before locking.
Filtered,
/// Transaction cannot be scheduled due to conflicts, or
/// higher priority conflicting transactions are unschedulable.
UnschedulableConflicts,
Expand All @@ -569,18 +564,18 @@ pub(crate) enum TransactionSchedulingError {

fn try_schedule_transaction<Tx: TransactionWithMeta>(
transaction_state: &mut TransactionState<Tx>,
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
blocking_locks: &mut ReadWriteAccountSet,
account_locks: &mut ThreadAwareAccountLocks,
num_threads: usize,
thread_selector: impl Fn(ThreadSet) -> ThreadId,
) -> Result<TransactionSchedulingInfo<Tx>, TransactionSchedulingError> {
let transaction = &transaction_state.transaction_ttl().transaction;
if !pre_lock_filter(transaction) {
return Err(TransactionSchedulingError::Filtered);
match pre_lock_filter(transaction_state) {
PreLockFilterAction::AttemptToSchedule => {}
}

// Check if this transaction conflicts with any blocked transactions
let transaction = &transaction_state.transaction_ttl().transaction;
if !blocking_locks.check_locks(transaction) {
blocking_locks.take_locks(transaction);
return Err(TransactionSchedulingError::UnschedulableConflicts);
Expand Down Expand Up @@ -755,8 +750,10 @@ mod tests {
results.fill(true);
}

fn test_pre_lock_filter(_tx: &RuntimeTransaction<SanitizedTransaction>) -> bool {
true
fn test_pre_lock_filter(
_tx: &TransactionState<RuntimeTransaction<SanitizedTransaction>>,
) -> PreLockFilterAction {
PreLockFilterAction::AttemptToSchedule
}

#[test]
Expand Down Expand Up @@ -908,27 +905,4 @@ mod tests {

assert_eq!(collect_work(&work_receivers[1]).1, [vec![4], vec![5]]);
}

#[test]
fn test_schedule_pre_lock_filter() {
let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1);
let pubkey = Pubkey::new_unique();
let keypair = Keypair::new();
let mut container = create_container([
(&Keypair::new(), &[pubkey], 1, 1),
(&keypair, &[pubkey], 1, 2),
(&Keypair::new(), &[pubkey], 1, 3),
]);

// 2nd transaction should be filtered out and dropped before locking.
let pre_lock_filter = |tx: &RuntimeTransaction<SanitizedTransaction>| {
tx.message().fee_payer() != &keypair.pubkey()
};
let scheduling_summary = scheduler
.schedule(&mut container, test_pre_graph_filter, pre_lock_filter)
.unwrap();
assert_eq!(scheduling_summary.num_scheduled, 2);
assert_eq!(scheduling_summary.num_unschedulable, 0);
assert_eq!(collect_work(&work_receivers[0]).1, vec![vec![2], vec![0]]);
}
}
13 changes: 11 additions & 2 deletions core/src/banking_stage/transaction_scheduler/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use {
super::{scheduler_error::SchedulerError, transaction_state_container::StateContainer},
super::{
scheduler_error::SchedulerError, transaction_state::TransactionState,
transaction_state_container::StateContainer,
},
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
};

Expand All @@ -11,7 +14,7 @@ pub(crate) trait Scheduler<Tx: TransactionWithMeta> {
&mut self,
container: &mut S,
pre_graph_filter: impl Fn(&[&Tx], &mut [bool]),
pre_lock_filter: impl Fn(&Tx) -> bool,
pre_lock_filter: impl Fn(&TransactionState<Tx>) -> PreLockFilterAction,
) -> Result<SchedulingSummary, SchedulerError>;

/// Receive completed batches of transactions without blocking.
Expand All @@ -22,6 +25,12 @@ pub(crate) trait Scheduler<Tx: TransactionWithMeta> {
) -> Result<(usize, usize), SchedulerError>;
}

/// Action to be taken by pre-lock filter.
pub(crate) enum PreLockFilterAction {
/// Attempt to schedule the transaction.
AttemptToSchedule,
}

/// Metrics from scheduling transactions.
#[derive(Default, Debug, PartialEq, Eq)]
pub(crate) struct SchedulingSummary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use {
super::{
receive_and_buffer::ReceiveAndBuffer,
scheduler::Scheduler,
scheduler::{PreLockFilterAction, Scheduler},
scheduler_error::SchedulerError,
scheduler_metrics::{
SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics,
Expand Down Expand Up @@ -157,7 +157,7 @@ where
MAX_PROCESSING_AGE,
)
},
|_| true // no pre-lock filter for now
|_| PreLockFilterAction::AttemptToSchedule // no pre-lock filter for now
)?);

self.count_metrics.update(|count_metrics| {
Expand Down
Loading