Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add LeaderSchedule trait #4973

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5381,7 +5381,7 @@ pub mod tests {
super::*,
crate::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule::{FixedSchedule, LeaderSchedule},
leader_schedule::{CurrentLeaderSchedule, FixedSchedule},
shred::{max_ticks_per_n_shreds, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
},
assert_matches::assert_matches,
Expand Down Expand Up @@ -10486,9 +10486,9 @@ pub mod tests {
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
let fixed_schedule = FixedSchedule {
leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![
leader_keypair.pubkey()
])),
leader_schedule: Arc::new(Box::new(CurrentLeaderSchedule::new_from_schedule(vec![
leader_keypair.pubkey(),
]))),
};
leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));

Expand Down
230 changes: 48 additions & 182 deletions ledger/src/leader_schedule.rs
Original file line number Diff line number Diff line change
@@ -1,101 +1,40 @@
use {
itertools::Itertools,
rand::distributions::{Distribution, WeightedIndex},
rand_chacha::{rand_core::SeedableRng, ChaChaRng},
solana_pubkey::Pubkey,
solana_sdk::clock::Epoch,
std::{collections::HashMap, convert::identity, ops::Index, sync::Arc},
};

mod current;
pub use current::LeaderSchedule as CurrentLeaderSchedule;

// Used for testing
#[derive(Clone, Debug)]
pub struct FixedSchedule {
pub leader_schedule: Arc<LeaderSchedule>,
}

/// Stake-weighted leader schedule for one epoch.
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct LeaderSchedule {
slot_leaders: Vec<Pubkey>,
// Inverted index from pubkeys to indices where they are the leader.
index: HashMap<Pubkey, Arc<Vec<usize>>>,
}

impl LeaderSchedule {
// Note: passing in zero stakers will cause a panic.
pub fn new_keyed_by_validator_identity(
epoch_staked_nodes: &HashMap<Pubkey, u64>,
epoch: Epoch,
len: u64,
repeat: u64,
) -> Self {
let keyed_stakes: Vec<_> = epoch_staked_nodes
.iter()
.map(|(pubkey, stake)| (pubkey, *stake))
.collect();
let slot_leaders = Self::stake_weighted_slot_leaders(keyed_stakes, epoch, len, repeat);
Self::new_from_schedule(slot_leaders)
}

// Note: passing in zero stakers will cause a panic.
fn stake_weighted_slot_leaders(
mut keyed_stakes: Vec<(&Pubkey, u64)>,
epoch: Epoch,
len: u64,
repeat: u64,
) -> Vec<Pubkey> {
sort_stakes(&mut keyed_stakes);
let (keys, stakes): (Vec<_>, Vec<_>) = keyed_stakes.into_iter().unzip();
let weighted_index = WeightedIndex::new(stakes).unwrap();
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
let rng = &mut ChaChaRng::from_seed(seed);
let mut current_slot_leader = Pubkey::default();
(0..len)
.map(|i| {
if i % repeat == 0 {
current_slot_leader = keys[weighted_index.sample(rng)];
}
current_slot_leader
})
.collect()
}

pub fn new_from_schedule(slot_leaders: Vec<Pubkey>) -> Self {
Self {
index: Self::index_from_slot_leaders(&slot_leaders),
slot_leaders,
}
}

fn index_from_slot_leaders(slot_leaders: &[Pubkey]) -> HashMap<Pubkey, Arc<Vec<usize>>> {
slot_leaders
.iter()
.enumerate()
.map(|(i, pk)| (*pk, i))
.into_group_map()
.into_iter()
.map(|(k, v)| (k, Arc::new(v)))
.collect()
}

pub fn get_slot_leaders(&self) -> &[Pubkey] {
&self.slot_leaders
}
pub type LeaderSchedule = Box<dyn LeaderScheduleVariant>;

pub fn num_slots(&self) -> usize {
self.slot_leaders.len()
}
pub trait LeaderScheduleVariant:
std::fmt::Debug + Send + Sync + Index<u64, Output = Pubkey>
{
fn get_slot_leaders(&self) -> &[Pubkey];
fn get_leader_slots_map(&self) -> &HashMap<Pubkey, Arc<Vec<usize>>>;

/// 'offset' is an index into the leader schedule. The function returns an
/// iterator of indices i >= offset where the given pubkey is the leader.
pub(crate) fn get_indices(
fn get_leader_upcoming_slots(
&self,
pubkey: &Pubkey,
offset: usize, // Starting index.
) -> impl Iterator<Item = usize> {
let index = self.index.get(pubkey).cloned().unwrap_or_default();
let num_slots = self.slot_leaders.len();
) -> Box<dyn Iterator<Item = usize>> {
let index = self
.get_leader_slots_map()
.get(pubkey)
.cloned()
.unwrap_or_default();
let num_slots = self.num_slots();
let size = index.len();
#[allow(clippy::reversed_empty_ranges)]
let range = if index.is_empty() {
Expand All @@ -111,18 +50,38 @@ impl LeaderSchedule {
// for LeaderSchedule, where the schedule keeps repeating endlessly.
// The '%' returns where in a cycle we are and the '/' returns how many
// times the schedule is repeated.
range.map(move |k| index[k % size] + k / size * num_slots)
Box::new(range.map(move |k| index[k % size] + k / size * num_slots))
}
}

impl Index<u64> for LeaderSchedule {
type Output = Pubkey;
fn index(&self, index: u64) -> &Pubkey {
let index = index as usize;
&self.slot_leaders[index % self.slot_leaders.len()]
fn num_slots(&self) -> usize {
self.get_slot_leaders().len()
}
}

// Note: passing in zero stakers will cause a panic.
fn stake_weighted_slot_leaders(
mut keyed_stakes: Vec<(&Pubkey, u64)>,
epoch: Epoch,
len: u64,
repeat: u64,
) -> Vec<Pubkey> {
sort_stakes(&mut keyed_stakes);
let (keys, stakes): (Vec<_>, Vec<_>) = keyed_stakes.into_iter().unzip();
let weighted_index = WeightedIndex::new(stakes).unwrap();
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
let rng = &mut ChaChaRng::from_seed(seed);
let mut current_slot_leader = Pubkey::default();
(0..len)
.map(|i| {
if i % repeat == 0 {
current_slot_leader = keys[weighted_index.sample(rng)];
}
current_slot_leader
})
.collect()
}

fn sort_stakes(stakes: &mut Vec<(&Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.
Expand All @@ -141,118 +100,25 @@ fn sort_stakes(stakes: &mut Vec<(&Pubkey, u64)>) {

#[cfg(test)]
mod tests {
use {super::*, rand::Rng, std::iter::repeat_with};

#[test]
fn test_leader_schedule_index() {
let pubkey0 = solana_pubkey::new_rand();
let pubkey1 = solana_pubkey::new_rand();
let leader_schedule = LeaderSchedule::new_from_schedule(vec![pubkey0, pubkey1]);
assert_eq!(leader_schedule[0], pubkey0);
assert_eq!(leader_schedule[1], pubkey1);
assert_eq!(leader_schedule[2], pubkey0);
}

#[test]
fn test_leader_schedule_basic() {
let num_keys = 10;
let stakes: HashMap<_, _> = (0..num_keys)
.map(|i| (solana_pubkey::new_rand(), i))
.collect();

let epoch: Epoch = rand::random();
let len = num_keys * 10;
let leader_schedule =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1);
let leader_schedule2 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1);
assert_eq!(leader_schedule.num_slots() as u64, len);
// Check that the same schedule is reproducibly generated
assert_eq!(leader_schedule, leader_schedule2);
}

#[test]
fn test_repeated_leader_schedule() {
let num_keys = 10;
let stakes: HashMap<_, _> = (0..num_keys)
.map(|i| (solana_pubkey::new_rand(), i))
.collect();

let epoch = rand::random::<Epoch>();
let len = num_keys * 10;
let repeat = 8;
let leader_schedule =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, repeat);
assert_eq!(leader_schedule.num_slots() as u64, len);
let mut leader_node = Pubkey::default();
for (i, node) in leader_schedule.slot_leaders.iter().enumerate() {
if i % repeat as usize == 0 {
leader_node = *node;
} else {
assert_eq!(leader_node, *node);
}
}
}

#[test]
fn test_repeated_leader_schedule_specific() {
let alice_pubkey = solana_pubkey::new_rand();
let bob_pubkey = solana_pubkey::new_rand();
let stakes: HashMap<_, _> = [(alice_pubkey, 2), (bob_pubkey, 1)].into_iter().collect();

let epoch = 0;
let len = 8;
// What the schedule looks like without any repeats
let leaders1 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1).slot_leaders;

// What the schedule looks like with repeats
let leaders2 =
LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 2).slot_leaders;
assert_eq!(leaders1.len(), leaders2.len());

let leaders1_expected = vec![
alice_pubkey,
alice_pubkey,
alice_pubkey,
bob_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
];
let leaders2_expected = vec![
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
alice_pubkey,
bob_pubkey,
bob_pubkey,
];

assert_eq!(leaders1, leaders1_expected);
assert_eq!(leaders2, leaders2_expected);
}
use {super::*, itertools::Itertools, rand::Rng, std::iter::repeat_with};

#[test]
fn test_get_indices() {
fn test_get_leader_upcoming_slots() {
const NUM_SLOTS: usize = 97;
let mut rng = rand::thread_rng();
let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(4).collect();
let schedule: Vec<_> = repeat_with(|| pubkeys[rng.gen_range(0..3)])
.take(19)
.collect();
let schedule = LeaderSchedule::new_from_schedule(schedule);
let schedule = CurrentLeaderSchedule::new_from_schedule(schedule);
let leaders = (0..NUM_SLOTS)
.map(|i| (schedule[i as u64], i))
.into_group_map();
for pubkey in &pubkeys {
let index = leaders.get(pubkey).cloned().unwrap_or_default();
for offset in 0..NUM_SLOTS {
let schedule: Vec<_> = schedule
.get_indices(pubkey, offset)
.get_leader_upcoming_slots(pubkey, offset)
.take_while(|s| *s < NUM_SLOTS)
.collect();
let index: Vec<_> = index.iter().copied().skip_while(|s| *s < offset).collect();
Expand Down
Loading
Loading