Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable IndexV2 #4583

Merged
merged 6 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Release channels have their own copy of this changelog:
<a name="edge-channel"></a>
## [2.2.0] - Unreleased
* Breaking:
* Blockstore Index column format change
* The Blockstore Index column format has been updated. The column format written in v2.2 is compatible with v2.1, but incompatible with v2.0 and older.
* Snapshot format change
* The snapshot format has been modified to implement SIMD-215. Since only adjacent versions are guaranteed to maintain snapshot compatibility, this means snapshots created with v2.2 are compatible with v2.1 and incompatible with v2.0 and older.
* Changes
Expand Down
32 changes: 31 additions & 1 deletion ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5372,7 +5372,7 @@ pub mod tests {
shred::{max_ticks_per_n_shreds, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
},
assert_matches::assert_matches,
bincode::serialize,
bincode::{serialize, Options},
crossbeam_channel::unbounded,
rand::{seq::SliceRandom, thread_rng},
solana_account_decoder::parse_token::UiTokenAmount,
Expand Down Expand Up @@ -5840,6 +5840,36 @@ pub mod tests {
test_insert_data_shreds_slots(true);
}

#[test]
fn test_index_fallback_deserialize() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let mut rng = rand::thread_rng();
let slot = rng.gen_range(0..100);
let bincode = bincode::DefaultOptions::new()
.reject_trailing_bytes()
.with_fixint_encoding();
steviez marked this conversation as resolved.
Show resolved Hide resolved

let data = 0..rng.gen_range(100..MAX_DATA_SHREDS_PER_SLOT as u64);
let coding = 0..rng.gen_range(100..MAX_DATA_SHREDS_PER_SLOT as u64);
let mut fallback = IndexFallback::new(slot);
for (d, c) in data.clone().zip(coding.clone()) {
fallback.data_mut().insert(d);
fallback.coding_mut().insert(c);
}

blockstore
.index_cf
.put_bytes(slot, &bincode.serialize(&fallback).unwrap())
.unwrap();

let current = blockstore.index_cf.get(slot).unwrap().unwrap();
for (d, c) in data.zip(coding) {
assert!(current.data().contains(d));
assert!(current.coding().contains(c));
}
}

/*
#[test]
pub fn test_iteration_order() {
Expand Down
2 changes: 1 addition & 1 deletion ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ impl TypedColumn for columns::Index {
match index {
Ok(index) => Ok(index),
Err(_) => {
let index: blockstore_meta::IndexV2 = config.deserialize(data)?;
let index: blockstore_meta::IndexFallback = config.deserialize(data)?;
Ok(index.into())
}
}
Expand Down
77 changes: 56 additions & 21 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,20 @@ mod serde_compat {
}
}

pub type Index = IndexV2;
pub type ShredIndex = ShredIndexV2;
/// We currently support falling back to the previous format for migration purposes.
///
/// See https://github.com/anza-xyz/agave/issues/3570.
pub type IndexFallback = IndexV1;
pub type ShredIndexFallback = ShredIndexV1;

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
/// Index recording presence/absence of shreds
pub struct Index {
pub struct IndexV1 {
pub slot: Slot,
data: ShredIndex,
coding: ShredIndex,
data: ShredIndexV1,
coding: ShredIndexV1,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
Expand All @@ -122,18 +130,18 @@ pub struct IndexV2 {
coding: ShredIndexV2,
}

impl From<IndexV2> for Index {
impl From<IndexV2> for IndexV1 {
fn from(index: IndexV2) -> Self {
Index {
IndexV1 {
slot: index.slot,
data: index.data.into(),
coding: index.coding.into(),
}
}
}

impl From<Index> for IndexV2 {
fn from(index: Index) -> Self {
impl From<IndexV1> for IndexV2 {
fn from(index: IndexV1) -> Self {
IndexV2 {
slot: index.slot,
data: index.data.into(),
Expand All @@ -143,7 +151,7 @@ impl From<Index> for IndexV2 {
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct ShredIndex {
pub struct ShredIndexV1 {
/// Map representing presence/absence of shreds
index: BTreeSet<u64>,
}
Expand Down Expand Up @@ -251,7 +259,7 @@ pub struct FrozenHashStatus {

impl Index {
pub(crate) fn new(slot: Slot) -> Self {
Index {
Self {
slot,
data: ShredIndex::default(),
coding: ShredIndex::default(),
Expand All @@ -273,11 +281,39 @@ impl Index {
}
}

#[cfg(test)]
#[allow(unused)]
impl IndexFallback {
pub(crate) fn new(slot: Slot) -> Self {
Self {
slot,
data: ShredIndexFallback::default(),
coding: ShredIndexFallback::default(),
}
}

pub fn data(&self) -> &ShredIndexFallback {
&self.data
}
pub fn coding(&self) -> &ShredIndexFallback {
&self.coding
}

pub(crate) fn data_mut(&mut self) -> &mut ShredIndexFallback {
&mut self.data
}
pub(crate) fn coding_mut(&mut self) -> &mut ShredIndexFallback {
&mut self.coding
}
}

/// Superseded by [`ShredIndexV2`].
///
/// TODO: Remove this once new [`ShredIndexV2`] is fully rolled out
/// and no longer relies on it for fallback.
impl ShredIndex {
#[cfg(test)]
#[allow(unused)]
impl ShredIndexV1 {
pub fn num_shreds(&self) -> usize {
self.index.len()
}
Expand All @@ -297,7 +333,6 @@ impl ShredIndex {
self.index.insert(index);
}

#[cfg(test)]
fn remove(&mut self, index: u64) {
self.index.remove(&index);
}
Expand Down Expand Up @@ -498,23 +533,23 @@ impl FromIterator<u64> for ShredIndexV2 {
}
}

impl FromIterator<u64> for ShredIndex {
impl FromIterator<u64> for ShredIndexV1 {
fn from_iter<T: IntoIterator<Item = u64>>(iter: T) -> Self {
ShredIndex {
ShredIndexV1 {
index: iter.into_iter().collect(),
}
}
}

impl From<ShredIndex> for ShredIndexV2 {
fn from(value: ShredIndex) -> Self {
impl From<ShredIndexV1> for ShredIndexV2 {
fn from(value: ShredIndexV1) -> Self {
value.index.into_iter().collect()
}
}

impl From<ShredIndexV2> for ShredIndex {
impl From<ShredIndexV2> for ShredIndexV1 {
fn from(value: ShredIndexV2) -> Self {
ShredIndex {
ShredIndexV1 {
index: value.iter().collect(),
}
}
Expand Down Expand Up @@ -922,7 +957,7 @@ mod test {
shreds in rand_range(0..MAX_DATA_SHREDS_PER_SLOT as u64),
range in rand_range(0..MAX_DATA_SHREDS_PER_SLOT as u64)
) {
let mut legacy = ShredIndex::default();
let mut legacy = ShredIndexV1::default();
let mut v2 = ShredIndexV2::default();

for i in shreds {
Expand All @@ -942,7 +977,7 @@ mod test {
);

assert_eq!(ShredIndexV2::from(legacy.clone()), v2.clone());
assert_eq!(ShredIndex::from(v2), legacy);
assert_eq!(ShredIndexV1::from(v2), legacy);
}

/// Property: [`Index`] cannot be deserialized from [`IndexV2`].
Expand All @@ -965,7 +1000,7 @@ mod test {
slot,
};
let config = bincode::DefaultOptions::new().with_fixint_encoding().reject_trailing_bytes();
let legacy = config.deserialize::<Index>(&config.serialize(&index).unwrap());
let legacy = config.deserialize::<IndexV1>(&config.serialize(&index).unwrap());
prop_assert!(legacy.is_err());
}

Expand All @@ -983,7 +1018,7 @@ mod test {
data_indices in rand_range(0..MAX_DATA_SHREDS_PER_SLOT as u64),
slot in 0..u64::MAX
) {
let index = Index {
let index = IndexV1 {
coding: coding_indices.into_iter().collect(),
data: data_indices.into_iter().collect(),
slot,
Expand Down
Loading