From cdc87fb1f467a0fb175bd7a09405313c6e127034 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 2 Oct 2024 17:01:02 +1000 Subject: [PATCH 1/6] Refactor archiving code for incremental mapping generation --- crates/sc-consensus-subspace/src/archiver.rs | 20 +- crates/subspace-archiving/src/archiver.rs | 201 ++++++++++++++++--- 2 files changed, 184 insertions(+), 37 deletions(-) diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 14286b04d1..2b04ba87aa 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -432,8 +432,11 @@ where let encoded_block = encode_block(signed_block); - let new_archived_segment = Archiver::new(kzg, erasure_coding) - .add_block(encoded_block, block_object_mappings, false) + // There are no mappings in the genesis block, so they can be ignored + let block_outcome = + Archiver::new(kzg, erasure_coding).add_block(encoded_block, block_object_mappings, false); + let new_archived_segment = block_outcome + .archived_segments .into_iter() .next() .expect("Genesis block always results in exactly one archived segment; qed"); @@ -671,9 +674,10 @@ where encoded_block.len() as f32 / 1024.0 ); - let archived_segments = - archiver.add_block(encoded_block, block_object_mappings, false); - let new_segment_headers: Vec = archived_segments + let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false); + // TODO: if we are in full mapping generation mode, send an archived object mappings notification for the mappings RPCs + let new_segment_headers: Vec = block_outcome + .archived_segments .iter() .map(|archived_segment| archived_segment.segment_header) .collect(); @@ -985,11 +989,13 @@ where ); let mut new_segment_headers = Vec::new(); - for archived_segment in archiver.add_block( + let block_outcome = archiver.add_block( encoded_block, block_object_mappings, !sync_oracle.is_major_syncing(), - ) { + ); + // TODO: send an archived object mappings notification for the mappings RPCs + for archived_segment in block_outcome.archived_segments { let segment_header = archived_segment.segment_header; segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?; diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index 3d906c0bb2..8bafc7b111 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -20,12 +20,13 @@ extern crate alloc; use crate::archiver::incremental_record_commitments::{ update_record_commitments, IncrementalRecordCommitmentsState, }; -use alloc::collections::VecDeque; +use alloc::collections::{BTreeMap, VecDeque}; #[cfg(not(feature = "std"))] use alloc::vec; #[cfg(not(feature = "std"))] use alloc::vec::Vec; use core::cmp::Ordering; +use core::mem::take; use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output}; #[cfg(feature = "parallel")] use rayon::prelude::*; @@ -126,6 +127,18 @@ impl Segment { let Self::V0 { items } = self; items.push(segment_item); } + + fn items(&self) -> &[SegmentItem] { + match self { + Segment::V0 { items } => items, + } + } + + fn len(&self) -> usize { + match self { + Segment::V0 { items } => items.len(), + } + } } /// Kinds of items that are contained within a segment @@ -207,6 +220,20 @@ impl NewArchivedSegment { } } +/// The outcome of adding a block to the archiver. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ArchiveBlockOutcome { + /// The new segments archived after adding the block. + /// There can be zero or more segments created after each block. + pub archived_segments: Vec, + + /// The new global object mappings after adding the block. + /// + /// There can be zero or more mappings from the block in the current segment, + /// and zero or more mappings from any block continuation from the previous segment. + pub global_mapping: Vec, +} + /// Archiver instantiation error #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] #[cfg_attr(feature = "thiserror", derive(thiserror::Error))] @@ -248,6 +275,8 @@ pub struct Archiver { buffer: VecDeque, /// Intermediate record commitments that are built incrementally as above buffer fills up. incremental_record_commitments: IncrementalRecordCommitmentsState, + /// Source piece object mappings, that are built incrementally as above buffer fills up. + incremental_object_mapping: Vec, /// Erasure coding data structure erasure_coding: ErasureCoding, /// KZG instance @@ -268,6 +297,7 @@ impl Archiver { incremental_record_commitments: IncrementalRecordCommitmentsState::with_capacity( RecordedHistorySegment::NUM_RAW_RECORDS, ), + incremental_object_mapping: Vec::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS), erasure_coding, kzg, segment_index: SegmentIndex::ZERO, @@ -355,7 +385,7 @@ impl Archiver { bytes: Vec, object_mapping: BlockObjectMapping, incremental: bool, - ) -> Vec { + ) -> ArchiveBlockOutcome { // Append new block to the buffer self.buffer.push_back(SegmentItem::Block { bytes, @@ -363,17 +393,29 @@ impl Archiver { }); let mut archived_segments = Vec::new(); - - while let Some(segment) = self.produce_segment(incremental) { - archived_segments.push(self.produce_archived_segment(segment)); + let mut global_mapping = Vec::new(); + + loop { + let (maybe_segment, mapping) = self.produce_segment(incremental); + global_mapping.extend(mapping); + if let Some(segment) = maybe_segment { + archived_segments.push(self.produce_archived_segment(segment)); + } else { + break; + } } - archived_segments + ArchiveBlockOutcome { + archived_segments, + global_mapping, + } } /// Try to slice buffer contents into segments if there is enough data, producing one segment at - /// a time - fn produce_segment(&mut self, incremental: bool) -> Option { + /// a time. Can produce new mappings, even if there is not enough data for a full segment. + /// + /// Returns a new segment, if one was produced, and any new global object mappings. + fn produce_segment(&mut self, incremental: bool) -> (Option, Vec) { let mut segment = Segment::V0 { items: Vec::with_capacity(self.buffer.len()), }; @@ -388,6 +430,30 @@ impl Archiver { let segment_item = match self.buffer.pop_front() { Some(segment_item) => segment_item, None => { + // Segment is not full, so we need to produce incremental mappings. + let new_segment_blocks = match segment.items() { + // If there is a BlockContinuation before this Block, we need to produce + // its mappings as well, because they weren't produced by the previous + // produce_segment() or with_initial_state() call. + [.., SegmentItem::BlockContinuation { .. }, SegmentItem::Block { .. }] => { + &segment.items()[segment.len() - 2..] + } + // Otherwise, just produce mappings for the latest Block. + [.., SegmentItem::Block { .. }] => &segment.items()[segment.len() - 1..], + // We can't tell the difference between an archiver initialized with a + // block continuation, and a spill over from a previous segment. So the + // only way to handle both cases without duplication is to skip a trailing + // continuation, then produce mappings for that continuation when the next + // block is added. (Either here or when the segment is full.) + [.., SegmentItem::BlockContinuation { .. }] => &[], + // If we've just finished the previous segment, the only item in the buffer + // is a SegmentHeader, so there are no new blocks to produce mappings for. + [SegmentItem::ParentSegmentHeader(_)] => &[], + _ => unreachable!("Caller always adds a new block; qed. {}", segment), + }; + let new_global_mapping = + self.update_object_mapping(segment_size, new_segment_blocks); + let existing_commitments = self.incremental_record_commitments.len(); let bytes_committed_to = existing_commitments * RawRecord::SIZE; // Run incremental archiver only when there is at least two records to archive, @@ -406,7 +472,7 @@ impl Archiver { self.buffer.push_front(segment_item); } - return None; + return (None, new_global_mapping); } }; @@ -599,27 +665,63 @@ impl Archiver { // Push back shortened segment item items.push(segment_item); + segment_size -= spill_over; } else { // Above code added bytes length even though it was assumed that all continuation bytes // fit into the segment, now we need to tweak that last_archived_block.set_complete(); } + // Segment is full, so we need to produce incremental mappings. + let new_segment_blocks = match segment.items() { + // If there is a BlockContinuation before this block, we need to produce its mappings + // as well, because they weren't produced by the previous produce_segment() or + // with_initial_state() call. + [.., SegmentItem::BlockContinuation { .. }, SegmentItem::Block { .. } | SegmentItem::BlockStart { .. }] => { + &segment.items()[segment.len() - 2..] + } + // Otherwise, just produce mappings for the final block, which can be continued or + // split. + [.., SegmentItem::BlockContinuation { .. } + | SegmentItem::Block { .. } + | SegmentItem::BlockStart { .. }] => &segment.items()[segment.len() - 1..], + // No new blocks, unreachable as long as the caller added a new block + // (or we're still processing a large block continuation) + _ => { + unreachable!( + "Caller always adds a new block, or previous call adds block continuation; qed. {}", + segment, + ) + } + }; + let new_global_mapping = self.update_object_mapping(segment_size, new_segment_blocks); + self.last_archived_block = last_archived_block; - Some(segment) + (Some(segment), new_global_mapping) } - // Take segment as an input, apply necessary transformations and produce archived segment - fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment { + // Take new segment blocks as an input, apply necessary transformations, and update incremental + // piece object mapping. Returns any new mappings. + fn update_object_mapping( + &mut self, + segment_size: usize, + new_segment_blocks: &[SegmentItem], + ) -> Vec { + if new_segment_blocks.is_empty() { + return Vec::new(); + } + + let mut base_offset_in_segment = segment_size + - new_segment_blocks + .iter() + .map(Encode::encoded_size) + .sum::(); + // Create mappings - let object_mapping = { - let mut corrected_object_mapping = - vec![PieceObjectMapping::default(); RecordedHistorySegment::NUM_RAW_RECORDS]; - let Segment::V0 { items } = &segment; - // `+1` corresponds to enum variant encoding - let mut base_offset_in_segment = 1; - for segment_item in items { + let mut object_mapping = { + let mut corrected_object_mapping = BTreeMap::::new(); + for segment_item in new_segment_blocks { match segment_item { SegmentItem::Padding => { unreachable!( @@ -644,22 +746,22 @@ impl Archiver { + 1 + Compact::compact_len(&(bytes.len() as u32)) + block_object.offset as usize; - let raw_piece_offset = - (offset_in_segment % RawRecord::SIZE).try_into().expect( - "Offset within piece should always fit in 32-bit integer; qed", - ); - if let Some(piece_object_mapping) = corrected_object_mapping - .get_mut(offset_in_segment / RawRecord::SIZE) - { - piece_object_mapping.objects_mut().push(PieceObject { + let raw_piece_offset = u32::try_from( + offset_in_segment % RawRecord::SIZE, + ) + .expect("Offset within piece should always fit in 32-bit integer; qed"); + corrected_object_mapping + .entry(offset_in_segment / RawRecord::SIZE) + .or_default() + .objects_mut() + .push(PieceObject { hash: block_object.hash, offset: raw_piece_offset, }); - } } } SegmentItem::ParentSegmentHeader(_) => { - // Ignore, no objects mappings here + unreachable!("Caller never provides headers as new blocks; qed"); } } @@ -668,6 +770,45 @@ impl Archiver { corrected_object_mapping }; + // Convert to global object mappings + let piece_indexes = self.segment_index.segment_piece_indexes_source_first(); + let new_global_mapping = object_mapping + .iter() + .flat_map(move |(source_position, piece_mapping)| { + // And then through each individual object mapping in the piece + let piece_mapping = piece_mapping.objects().to_vec(); + + piece_mapping.into_iter().map(move |piece_object| { + GlobalObject::new(piece_indexes[*source_position], &piece_object) + }) + }) + .collect(); + + // Update incremental mappings + if let Some(first_source_position) = object_mapping.keys().next().copied() { + if first_source_position + 1 == self.incremental_object_mapping.len() { + // We need to merge the mappings for the first piece with existing mappings for that piece + self.incremental_object_mapping[first_source_position] + .objects_mut() + .extend(object_mapping[&first_source_position].objects()); + object_mapping.remove(&first_source_position); + } + // The rest of the mappings are for new source pieces, and can be inserted without merging + for (source_position, mapping) in object_mapping { + // Some source pieces don't have any mappings, so we give them an empty list + if source_position > self.incremental_object_mapping.len() { + self.incremental_object_mapping + .resize(source_position - 1, PieceObjectMapping::default()); + } + self.incremental_object_mapping.push(mapping); + } + } + + new_global_mapping + } + + // Take segment as an input, apply necessary transformations and produce archived segment + fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment { let mut pieces = { // Serialize segment into concatenation of raw records let mut raw_record_shards = Vec::::with_capacity(RecordedHistorySegment::SIZE); @@ -836,7 +977,7 @@ impl Archiver { NewArchivedSegment { segment_header, pieces: pieces.to_shared(), - object_mapping, + object_mapping: take(&mut self.incremental_object_mapping), } } } From 9418fd35912116065913e78b8b9b76ccca545c9a Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 4 Oct 2024 14:30:12 +1000 Subject: [PATCH 2/6] Remove NewArchivedSegment.object_mapping --- crates/subspace-archiving/src/archiver.rs | 163 ++++++---------------- 1 file changed, 43 insertions(+), 120 deletions(-) diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index 8bafc7b111..9a41415d07 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -20,20 +20,17 @@ extern crate alloc; use crate::archiver::incremental_record_commitments::{ update_record_commitments, IncrementalRecordCommitmentsState, }; -use alloc::collections::{BTreeMap, VecDeque}; +use alloc::collections::VecDeque; #[cfg(not(feature = "std"))] use alloc::vec; #[cfg(not(feature = "std"))] use alloc::vec::Vec; use core::cmp::Ordering; -use core::mem::take; use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output}; #[cfg(feature = "parallel")] use rayon::prelude::*; use subspace_core_primitives::hashes::{blake3_254_hash_to_scalar, Blake3Hash}; -use subspace_core_primitives::objects::{ - BlockObject, BlockObjectMapping, GlobalObject, PieceObject, PieceObjectMapping, -}; +use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, GlobalObject}; use subspace_core_primitives::pieces::RawRecord; use subspace_core_primitives::segments::{ ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment, @@ -182,42 +179,14 @@ pub enum SegmentItem { ParentSegmentHeader(SegmentHeader), } -/// Newly archived segment as a combination of segment header hash, segment index and corresponding -/// archived history segment containing pieces +/// Newly archived segment as a combination of segment header and corresponding archived history +/// segment containing pieces #[derive(Debug, Clone, Eq, PartialEq)] pub struct NewArchivedSegment { /// Segment header pub segment_header: SegmentHeader, /// Segment of archived history containing pieces pub pieces: ArchivedHistorySegment, - /// Mappings for objects stored in corresponding pieces. - /// - /// NOTE: Only half (source pieces) will have corresponding mapping item in this `Vec`. - pub object_mapping: Vec, -} - -impl NewArchivedSegment { - /// Returns all the object mappings in this archived segment as a lazy iterator. - pub fn global_object_mappings(&self) -> impl Iterator + 'static { - // Save memory by only returning the necessary parts of NewArchivedSegment - let object_mapping = self.object_mapping.clone(); - let piece_indexes = self - .segment_header - .segment_index() - .segment_piece_indexes_source_first(); - - // Iterate through the object mapping vector for each piece - object_mapping.into_iter().zip(piece_indexes).flat_map( - move |(piece_mappings, piece_index)| { - // And then through each individual object mapping in the piece - let piece_mappings = piece_mappings.objects().to_vec(); - - piece_mappings - .into_iter() - .map(move |piece_object| GlobalObject::new(piece_index, &piece_object)) - }, - ) - } } /// The outcome of adding a block to the archiver. @@ -275,8 +244,6 @@ pub struct Archiver { buffer: VecDeque, /// Intermediate record commitments that are built incrementally as above buffer fills up. incremental_record_commitments: IncrementalRecordCommitmentsState, - /// Source piece object mappings, that are built incrementally as above buffer fills up. - incremental_object_mapping: Vec, /// Erasure coding data structure erasure_coding: ErasureCoding, /// KZG instance @@ -297,7 +264,6 @@ impl Archiver { incremental_record_commitments: IncrementalRecordCommitmentsState::with_capacity( RecordedHistorySegment::NUM_RAW_RECORDS, ), - incremental_object_mapping: Vec::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS), erasure_coding, kzg, segment_index: SegmentIndex::ZERO, @@ -701,8 +667,7 @@ impl Archiver { (Some(segment), new_global_mapping) } - // Take new segment blocks as an input, apply necessary transformations, and update incremental - // piece object mapping. Returns any new mappings. + // Take new segment blocks as an input, apply necessary transformations, and returns any new mappings. fn update_object_mapping( &mut self, segment_size: usize, @@ -717,94 +682,53 @@ impl Archiver { .iter() .map(Encode::encoded_size) .sum::(); + let piece_indexes = self.segment_index.segment_piece_indexes_source_first(); // Create mappings - let mut object_mapping = { - let mut corrected_object_mapping = BTreeMap::::new(); - for segment_item in new_segment_blocks { - match segment_item { - SegmentItem::Padding => { - unreachable!( - "Segment during archiving never contains SegmentItem::Padding; qed" - ); - } - SegmentItem::Block { - bytes, - object_mapping, - } - | SegmentItem::BlockStart { - bytes, - object_mapping, - } - | SegmentItem::BlockContinuation { - bytes, - object_mapping, - } => { - for block_object in object_mapping.objects() { - // `+1` corresponds to `SegmentItem::X {}` enum variant encoding - let offset_in_segment = base_offset_in_segment - + 1 - + Compact::compact_len(&(bytes.len() as u32)) - + block_object.offset as usize; - let raw_piece_offset = u32::try_from( - offset_in_segment % RawRecord::SIZE, - ) + let mut global_object_mapping = Vec::new(); + for segment_item in new_segment_blocks { + match segment_item { + SegmentItem::Padding => { + unreachable!( + "Segment during archiving never contains SegmentItem::Padding; qed" + ); + } + SegmentItem::ParentSegmentHeader(_) => { + unreachable!("Caller never provides headers as new blocks; qed"); + } + SegmentItem::Block { + bytes, + object_mapping, + } + | SegmentItem::BlockStart { + bytes, + object_mapping, + } + | SegmentItem::BlockContinuation { + bytes, + object_mapping, + } => { + for block_object in object_mapping.objects() { + // `+1` corresponds to `SegmentItem::X {}` enum variant encoding + let offset_in_segment = base_offset_in_segment + + 1 + + Compact::compact_len(&(bytes.len() as u32)) + + block_object.offset as usize; + let raw_piece_offset = u32::try_from(offset_in_segment % RawRecord::SIZE) .expect("Offset within piece should always fit in 32-bit integer; qed"); - corrected_object_mapping - .entry(offset_in_segment / RawRecord::SIZE) - .or_default() - .objects_mut() - .push(PieceObject { - hash: block_object.hash, - offset: raw_piece_offset, - }); - } - } - SegmentItem::ParentSegmentHeader(_) => { - unreachable!("Caller never provides headers as new blocks; qed"); + global_object_mapping.push(GlobalObject { + hash: block_object.hash, + piece_index: piece_indexes[offset_in_segment / RawRecord::SIZE], + offset: raw_piece_offset, + }); } } - - base_offset_in_segment += segment_item.encoded_size(); } - corrected_object_mapping - }; - // Convert to global object mappings - let piece_indexes = self.segment_index.segment_piece_indexes_source_first(); - let new_global_mapping = object_mapping - .iter() - .flat_map(move |(source_position, piece_mapping)| { - // And then through each individual object mapping in the piece - let piece_mapping = piece_mapping.objects().to_vec(); - - piece_mapping.into_iter().map(move |piece_object| { - GlobalObject::new(piece_indexes[*source_position], &piece_object) - }) - }) - .collect(); - - // Update incremental mappings - if let Some(first_source_position) = object_mapping.keys().next().copied() { - if first_source_position + 1 == self.incremental_object_mapping.len() { - // We need to merge the mappings for the first piece with existing mappings for that piece - self.incremental_object_mapping[first_source_position] - .objects_mut() - .extend(object_mapping[&first_source_position].objects()); - object_mapping.remove(&first_source_position); - } - // The rest of the mappings are for new source pieces, and can be inserted without merging - for (source_position, mapping) in object_mapping { - // Some source pieces don't have any mappings, so we give them an empty list - if source_position > self.incremental_object_mapping.len() { - self.incremental_object_mapping - .resize(source_position - 1, PieceObjectMapping::default()); - } - self.incremental_object_mapping.push(mapping); - } + base_offset_in_segment += segment_item.encoded_size(); } - new_global_mapping + global_object_mapping } // Take segment as an input, apply necessary transformations and produce archived segment @@ -977,7 +901,6 @@ impl Archiver { NewArchivedSegment { segment_header, pieces: pieces.to_shared(), - object_mapping: take(&mut self.incremental_object_mapping), } } } From 7ed00aeb71bc8d053b089c7c460fdd5faecf8fd4 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 8 Oct 2024 10:51:09 +1000 Subject: [PATCH 3/6] Improve debugging for segment types --- Cargo.lock | 1 + crates/subspace-archiving/Cargo.toml | 1 + crates/subspace-archiving/src/archiver.rs | 97 ++++++++++++++++++- .../subspace-core-primitives/src/segments.rs | 10 +- 4 files changed, 107 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a5227689b..2fa0047384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12393,6 +12393,7 @@ name = "subspace-archiving" version = "0.1.0" dependencies = [ "criterion", + "hex", "parity-scale-codec", "rand", "rayon", diff --git a/crates/subspace-archiving/Cargo.toml b/crates/subspace-archiving/Cargo.toml index 726250fd5d..e0e4949265 100644 --- a/crates/subspace-archiving/Cargo.toml +++ b/crates/subspace-archiving/Cargo.toml @@ -17,6 +17,7 @@ include = [ bench = false [dependencies] +hex = { version = "0.4.3", default-features = false, features = ["alloc"] } parity-scale-codec = { version = "3.6.12", default-features = false, features = ["derive"] } rayon = { version = "1.10.0", optional = true } serde = { version = "1.0.110", optional = true, features = ["derive"] } diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index 9a41415d07..ee5f5fde2e 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -26,6 +26,7 @@ use alloc::vec; #[cfg(not(feature = "std"))] use alloc::vec::Vec; use core::cmp::Ordering; +use core::fmt; use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output}; #[cfg(feature = "parallel")] use rayon::prelude::*; @@ -61,6 +62,20 @@ pub enum Segment { }, } +impl fmt::Display for Segment { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Segment::V0 { items } => { + write!(f, "Segment::V0 {{ items: [")?; + for item in items { + write!(f, "{}, ", item)?; + } + write!(f, "] }}") + } + } + } +} + impl Encode for Segment { fn size_hint(&self) -> usize { RecordedHistorySegment::SIZE @@ -139,7 +154,7 @@ impl Segment { } /// Kinds of items that are contained within a segment -#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)] +#[derive(Clone, Eq, PartialEq, Encode, Decode)] pub enum SegmentItem { /// Special dummy enum variant only used as an implementation detail for padding purposes #[codec(index = 0)] @@ -179,6 +194,86 @@ pub enum SegmentItem { ParentSegmentHeader(SegmentHeader), } +impl fmt::Debug for SegmentItem { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SegmentItem::Padding => write!(f, "SegmentItem::Padding"), + SegmentItem::Block { + bytes, + object_mapping, + } => write!( + f, + "SegmentItem::Block({}, {:?})", + hex::encode(bytes), + object_mapping + ), + SegmentItem::BlockStart { + bytes, + object_mapping, + } => write!( + f, + "SegmentItem::BlockStart({}, {:?})", + hex::encode(bytes), + object_mapping + ), + SegmentItem::BlockContinuation { + bytes, + object_mapping, + } => { + write!( + f, + "SegmentItem::BlockContinuation({}, {:?})", + hex::encode(bytes), + object_mapping + ) + } + SegmentItem::ParentSegmentHeader(header) => { + write!(f, "SegmentItem::ParentSegmentHeader({:?})", header) + } + } + } +} + +impl fmt::Display for SegmentItem { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SegmentItem::Padding => write!(f, "SegmentItem::Padding"), + SegmentItem::Block { + bytes, + object_mapping, + } => write!( + f, + "SegmentItem::Block({} bytes, {} mappings)", + bytes.len(), + object_mapping.objects().len() + ), + SegmentItem::BlockStart { + bytes, + object_mapping, + } => write!( + f, + "SegmentItem::BlockStart({} bytes, {} mappings)", + bytes.len(), + object_mapping.objects().len() + ), + SegmentItem::BlockContinuation { + bytes, + object_mapping, + } => { + write!( + f, + "SegmentItem::BlockContinuation({} bytes, {} mappings)", + bytes.len(), + object_mapping.objects().len() + ) + } + SegmentItem::ParentSegmentHeader(header) => { + write!(f, "SegmentItem::ParentSegmentHeader({:?})", header) + } + } + } +} + /// Newly archived segment as a combination of segment header and corresponding archived history /// segment containing pieces #[derive(Debug, Clone, Eq, PartialEq)] diff --git a/crates/subspace-core-primitives/src/segments.rs b/crates/subspace-core-primitives/src/segments.rs index 3eb6c1a1a3..bfd4ad699e 100644 --- a/crates/subspace-core-primitives/src/segments.rs +++ b/crates/subspace-core-primitives/src/segments.rs @@ -9,6 +9,7 @@ use crate::BlockNumber; #[cfg(not(feature = "std"))] use alloc::boxed::Box; use core::array::TryFromSliceError; +use core::fmt; use core::iter::Step; use core::num::NonZeroU64; use derive_more::{ @@ -130,7 +131,6 @@ impl SegmentIndex { /// Segment commitment contained within segment header. #[derive( - Debug, Copy, Clone, Eq, @@ -151,6 +151,14 @@ pub struct SegmentCommitment( #[cfg_attr(feature = "serde", serde(with = "hex"))] [u8; SegmentCommitment::SIZE], ); +impl fmt::Debug for SegmentCommitment { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("SegmentCommitment") + .field(&hex::encode(self.0)) + .finish() + } +} + impl Default for SegmentCommitment { #[inline] fn default() -> Self { From a63b5b1840a108ecba2c9319b663a8301410e875 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 4 Oct 2024 16:02:25 +1000 Subject: [PATCH 4/6] Switch RPCs to incremental object mapping generation --- crates/sc-consensus-subspace-rpc/src/lib.rs | 30 ++++++------- crates/sc-consensus-subspace/src/archiver.rs | 45 ++++++++++++++++++-- crates/sc-consensus-subspace/src/lib.rs | 15 ++++++- crates/subspace-archiving/src/archiver.rs | 4 +- crates/subspace-service/src/lib.rs | 11 ++++- crates/subspace-service/src/rpc.rs | 9 +++- 6 files changed, 90 insertions(+), 24 deletions(-) diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 45c9cdea97..b239d5326a 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -28,7 +28,8 @@ use jsonrpsee::{Extensions, PendingSubscriptionSink}; use parking_lot::Mutex; use sc_client_api::{AuxStore, BlockBackend}; use sc_consensus_subspace::archiver::{ - recreate_genesis_segment, ArchivedSegmentNotification, SegmentHeadersStore, + recreate_genesis_segment, ArchivedMappingNotification, ArchivedSegmentNotification, + SegmentHeadersStore, }; use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::slot_worker::{ @@ -234,6 +235,9 @@ where pub new_slot_notification_stream: SubspaceNotificationStream, /// Reward signing notification stream pub reward_signing_notification_stream: SubspaceNotificationStream, + /// Archived mapping notification stream + pub archived_mapping_notification_stream: + SubspaceNotificationStream, /// Archived segment notification stream pub archived_segment_notification_stream: SubspaceNotificationStream, @@ -259,6 +263,7 @@ where subscription_executor: SubscriptionTaskExecutor, new_slot_notification_stream: SubspaceNotificationStream, reward_signing_notification_stream: SubspaceNotificationStream, + archived_mapping_notification_stream: SubspaceNotificationStream, archived_segment_notification_stream: SubspaceNotificationStream, #[allow(clippy::type_complexity)] solution_response_senders: Arc>>>>, @@ -316,6 +321,7 @@ where subscription_executor: config.subscription_executor, new_slot_notification_stream: config.new_slot_notification_stream, reward_signing_notification_stream: config.reward_signing_notification_stream, + archived_mapping_notification_stream: config.archived_mapping_notification_stream, archived_segment_notification_stream: config.archived_segment_notification_stream, solution_response_senders: Arc::new(Mutex::new(LruMap::new(ByLength::new( solution_response_senders_capacity, @@ -844,17 +850,11 @@ where fn subscribe_archived_object_mappings(&self, pending: PendingSubscriptionSink) { // TODO: deny unsafe subscriptions? - // The genesis segment isn't included in this stream. In other methods we recreate is as the first segment, - // but there aren't any mappings in it, so we don't need to recreate it as part of this subscription. - let mapping_stream = self - .archived_segment_notification_stream + .archived_mapping_notification_stream .subscribe() - .flat_map(|archived_segment_notification| { - let objects = archived_segment_notification - .archived_segment - .global_object_mappings(); - + .flat_map(|archived_mapping_notification| { + let objects = archived_mapping_notification.object_mapping; stream::iter(objects) }) .ready_chunks(OBJECT_MAPPING_BATCH_SIZE) @@ -904,12 +904,12 @@ where // The genesis segment isn't included in this stream, see // `subscribe_archived_object_mappings` for details. let mapping_stream = self - .archived_segment_notification_stream + .archived_mapping_notification_stream .subscribe() - .flat_map(move |archived_segment_notification| { - let objects = archived_segment_notification - .archived_segment - .global_object_mappings() + .flat_map(move |archived_mapping_notification| { + let objects = archived_mapping_notification + .object_mapping + .into_iter() .filter(|object| hashes.remove(&object.hash)) .collect::>(); diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 2b04ba87aa..6cb965aeeb 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -76,7 +76,7 @@ use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use std::time::Duration; use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; -use subspace_core_primitives::objects::BlockObjectMapping; +use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject}; use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex}; use subspace_core_primitives::{BlockNumber, PublicKey}; use subspace_erasure_coding::ErasureCoding; @@ -345,6 +345,17 @@ pub struct ArchivedSegmentNotification { pub acknowledgement_sender: TracingUnboundedSender<()>, } +/// Notification with incrementally generated object mappings for a block (and any previous block +/// continuation) +#[derive(Debug, Clone)] +pub struct ArchivedMappingNotification { + /// Incremental archived object mappings for a block (and any previous block continuation). + /// + /// The archived data won't be available in pieces until the entire segment is full and archived. + pub object_mapping: Vec, + // TODO: add an acknowledgement_sender for backpressure if needed +} + fn find_last_archived_block( client: &Client, segment_headers_store: &SegmentHeadersStore, @@ -675,7 +686,14 @@ where ); let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false); - // TODO: if we are in full mapping generation mode, send an archived object mappings notification for the mappings RPCs + // RPC clients only want these mappings in full mapping mode + // TODO: turn this into a command-line argument named `--full-mapping` + if cfg!(feature = "full-archive") { + send_archived_mapping_notification( + &subspace_link.archived_mapping_notification_sender.clone(), + block_outcome.object_mapping, + ); + } let new_segment_headers: Vec = block_outcome .archived_segments .iter() @@ -757,6 +775,9 @@ fn finalize_block( /// processing, which is necessary for ensuring that when the next block is imported, inherents will /// contain segment header of newly archived block (must happen exactly in the next block). /// +/// When a block with object mappings is archived, notification ([`SubspaceLink::archived_mapping_notification_stream`]) +/// will be sent. If any mappings spill over to the next segment, they will be sent when the next block is archived. +/// /// Once segment header is archived, notification ([`SubspaceLink::archived_segment_notification_stream`]) /// will be sent and archiver will be paused until all receivers have provided an acknowledgement /// for it. @@ -815,6 +836,8 @@ where } = archiver; let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block; + let archived_mapping_notification_sender = + subspace_link.archived_mapping_notification_sender.clone(); let archived_segment_notification_sender = subspace_link.archived_segment_notification_sender.clone(); @@ -846,7 +869,7 @@ where "Checking if block needs to be skipped" ); - // TODO: replace this cfg! with a CLI option + // TODO: turn this into a command-line argument named `--full-mapping` let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive && !cfg!(feature = "full-archive"); if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks { @@ -906,6 +929,7 @@ where &*client, &sync_oracle, telemetry.clone(), + archived_mapping_notification_sender.clone(), archived_segment_notification_sender.clone(), best_archived_block_hash, block_number_to_archive, @@ -925,6 +949,7 @@ async fn archive_block( client: &Client, sync_oracle: &SubspaceSyncOracle, telemetry: Option, + archived_mapping_notification_sender: SubspaceNotificationSender, archived_segment_notification_sender: SubspaceNotificationSender, best_archived_block_hash: Block::Hash, block_number_to_archive: NumberFor, @@ -994,7 +1019,10 @@ where block_object_mappings, !sync_oracle.is_major_syncing(), ); - // TODO: send an archived object mappings notification for the mappings RPCs + send_archived_mapping_notification( + &archived_mapping_notification_sender, + block_outcome.object_mapping, + ); for archived_segment in block_outcome.archived_segments { let segment_header = archived_segment.segment_header; @@ -1037,6 +1065,15 @@ where Ok((block_hash_to_archive, block_number_to_archive)) } +fn send_archived_mapping_notification( + archived_mapping_notification_sender: &SubspaceNotificationSender, + object_mapping: Vec, +) { + let archived_mapping_notification = ArchivedMappingNotification { object_mapping }; + + archived_mapping_notification_sender.notify(move || archived_mapping_notification); +} + async fn send_archived_segment_notification( archived_segment_notification_sender: &SubspaceNotificationSender, archived_segment: NewArchivedSegment, diff --git a/crates/sc-consensus-subspace/src/lib.rs b/crates/sc-consensus-subspace/src/lib.rs index 744174c44f..b5d158d5a7 100644 --- a/crates/sc-consensus-subspace/src/lib.rs +++ b/crates/sc-consensus-subspace/src/lib.rs @@ -36,7 +36,7 @@ pub mod slot_worker; mod tests; pub mod verifier; -use crate::archiver::ArchivedSegmentNotification; +use crate::archiver::{ArchivedMappingNotification, ArchivedSegmentNotification}; use crate::block_import::BlockImportingNotification; use crate::notification::{SubspaceNotificationSender, SubspaceNotificationStream}; use crate::slot_worker::{NewSlotNotification, RewardSigningNotification}; @@ -52,6 +52,8 @@ pub struct SubspaceLink { new_slot_notification_stream: SubspaceNotificationStream, reward_signing_notification_sender: SubspaceNotificationSender, reward_signing_notification_stream: SubspaceNotificationStream, + archived_mapping_notification_sender: SubspaceNotificationSender, + archived_mapping_notification_stream: SubspaceNotificationStream, archived_segment_notification_sender: SubspaceNotificationSender, archived_segment_notification_stream: SubspaceNotificationStream, block_importing_notification_sender: @@ -70,6 +72,8 @@ impl SubspaceLink { notification::channel("subspace_new_slot_notification_stream"); let (reward_signing_notification_sender, reward_signing_notification_stream) = notification::channel("subspace_reward_signing_notification_stream"); + let (archived_mapping_notification_sender, archived_mapping_notification_stream) = + notification::channel("subspace_archived_mapping_notification_stream"); let (archived_segment_notification_sender, archived_segment_notification_stream) = notification::channel("subspace_archived_segment_notification_stream"); let (block_importing_notification_sender, block_importing_notification_stream) = @@ -80,6 +84,8 @@ impl SubspaceLink { new_slot_notification_stream, reward_signing_notification_sender, reward_signing_notification_stream, + archived_mapping_notification_sender, + archived_mapping_notification_stream, archived_segment_notification_sender, archived_segment_notification_stream, block_importing_notification_sender, @@ -103,6 +109,13 @@ impl SubspaceLink { self.reward_signing_notification_stream.clone() } + /// Get stream with notifications about archived mappings + pub fn archived_mapping_notification_stream( + &self, + ) -> SubspaceNotificationStream { + self.archived_mapping_notification_stream.clone() + } + /// Get stream with notifications about archived segment creation pub fn archived_segment_notification_stream( &self, diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index ee5f5fde2e..2833f64b20 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -295,7 +295,7 @@ pub struct ArchiveBlockOutcome { /// /// There can be zero or more mappings from the block in the current segment, /// and zero or more mappings from any block continuation from the previous segment. - pub global_mapping: Vec, + pub object_mapping: Vec, } /// Archiver instantiation error @@ -468,7 +468,7 @@ impl Archiver { ArchiveBlockOutcome { archived_segments, - global_mapping, + object_mapping: global_mapping, } } diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index dc86f7dd6e..d3b85a3579 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -66,7 +66,8 @@ use sc_consensus::{ }; use sc_consensus_slots::SlotProportion; use sc_consensus_subspace::archiver::{ - create_subspace_archiver, ArchivedSegmentNotification, SegmentHeadersStore, + create_subspace_archiver, ArchivedMappingNotification, ArchivedSegmentNotification, + SegmentHeadersStore, }; use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport}; use sc_consensus_subspace::notification::SubspaceNotificationStream; @@ -713,6 +714,9 @@ where /// Stream of notifications about blocks about to be imported. pub block_importing_notification_stream: SubspaceNotificationStream>, + /// Archived object mapping stream. + pub archived_mapping_notification_stream: + SubspaceNotificationStream, /// Archived segment stream. pub archived_segment_notification_stream: SubspaceNotificationStream, @@ -1179,6 +1183,7 @@ where let new_slot_notification_stream = subspace_link.new_slot_notification_stream(); let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream(); let block_importing_notification_stream = subspace_link.block_importing_notification_stream(); + let archived_mapping_notification_stream = subspace_link.archived_mapping_notification_stream(); let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream(); let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new( @@ -1290,6 +1295,7 @@ where let client = client.clone(); let new_slot_notification_stream = new_slot_notification_stream.clone(); let reward_signing_notification_stream = reward_signing_notification_stream.clone(); + let archived_mapping_notification_stream = archived_mapping_notification_stream.clone(); let archived_segment_notification_stream = archived_segment_notification_stream.clone(); let transaction_pool = transaction_pool.clone(); let backend = backend.clone(); @@ -1301,6 +1307,8 @@ where subscription_executor, new_slot_notification_stream: new_slot_notification_stream.clone(), reward_signing_notification_stream: reward_signing_notification_stream.clone(), + archived_mapping_notification_stream: archived_mapping_notification_stream + .clone(), archived_segment_notification_stream: archived_segment_notification_stream .clone(), dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(), @@ -1337,6 +1345,7 @@ where new_slot_notification_stream, reward_signing_notification_stream, block_importing_notification_stream, + archived_mapping_notification_stream, archived_segment_notification_stream, network_starter, transaction_pool, diff --git a/crates/subspace-service/src/rpc.rs b/crates/subspace-service/src/rpc.rs index e5b17a3edd..3d222a6bae 100644 --- a/crates/subspace-service/src/rpc.rs +++ b/crates/subspace-service/src/rpc.rs @@ -26,7 +26,9 @@ use jsonrpsee::RpcModule; use mmr_rpc::{Mmr, MmrApiServer}; use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer}; use sc_client_api::{AuxStore, BlockBackend}; -use sc_consensus_subspace::archiver::{ArchivedSegmentNotification, SegmentHeadersStore}; +use sc_consensus_subspace::archiver::{ + ArchivedMappingNotification, ArchivedSegmentNotification, SegmentHeadersStore, +}; use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::slot_worker::{ NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle, @@ -65,6 +67,9 @@ where /// A stream with notifications about headers that need to be signed with ability to send /// signature back. pub reward_signing_notification_stream: SubspaceNotificationStream, + /// A stream with notifications about archived object mappings. + pub archived_mapping_notification_stream: + SubspaceNotificationStream, /// A stream with notifications about archived segment creation. pub archived_segment_notification_stream: SubspaceNotificationStream, @@ -113,6 +118,7 @@ where subscription_executor, new_slot_notification_stream, reward_signing_notification_stream, + archived_mapping_notification_stream, archived_segment_notification_stream, dsn_bootstrap_nodes, segment_headers_store, @@ -131,6 +137,7 @@ where subscription_executor, new_slot_notification_stream, reward_signing_notification_stream, + archived_mapping_notification_stream, archived_segment_notification_stream, dsn_bootstrap_nodes, segment_headers_store, From 4228d3d24cfaa816d5da29f92ad86098a9d511bb Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 4 Oct 2024 16:03:37 +1000 Subject: [PATCH 5/6] Update tests to new Archiver API --- crates/pallet-subspace/src/mock.rs | 7 +- crates/sc-consensus-subspace/src/tests.rs | 5 +- .../tests/integration/archiver.rs | 228 ++++++++---------- .../tests/integration/piece_reconstruction.rs | 16 +- .../tests/integration/reconstructor.rs | 36 ++- .../benches/auditing.rs | 1 + .../benches/plotting.rs | 1 + .../benches/proving.rs | 1 + .../benches/reading.rs | 1 + test/subspace-test-client/src/lib.rs | 1 + 10 files changed, 159 insertions(+), 138 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 36e70266fb..5ba71a597b 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -287,11 +287,8 @@ pub fn create_archived_segment() -> &'static NewArchivedSegment { let mut block = vec![0u8; RecordedHistorySegment::SIZE]; rand::thread_rng().fill(block.as_mut_slice()); - archiver - .add_block(block, Default::default(), true) - .into_iter() - .next() - .unwrap() + let block_outcome = archiver.add_block(block, Default::default(), true); + block_outcome.archived_segments.into_iter().next().unwrap() }) } diff --git a/crates/sc-consensus-subspace/src/tests.rs b/crates/sc-consensus-subspace/src/tests.rs index 168d4d07d7..3fdddc45b7 100644 --- a/crates/sc-consensus-subspace/src/tests.rs +++ b/crates/sc-consensus-subspace/src/tests.rs @@ -452,8 +452,9 @@ // let mut archiver = Archiver::new(kzg).expect("Incorrect parameters for archiver"); // // let genesis_block = client.block(client.info().genesis_hash).unwrap().unwrap(); -// archiver -// .add_block(genesis_block.encode(), BlockObjectMapping::default()) +// let block_outcome = archiver.add_block(genesis_block.encode(), BlockObjectMapping::default(), true); +// block_outcome +// .archived_segments // .into_iter() // .map(|archived_segment| archived_segment.pieces) // .collect() diff --git a/crates/subspace-archiving/tests/integration/archiver.rs b/crates/subspace-archiving/tests/integration/archiver.rs index dbb2dfdb26..8f73d81f33 100644 --- a/crates/subspace-archiving/tests/integration/archiver.rs +++ b/crates/subspace-archiving/tests/integration/archiver.rs @@ -8,7 +8,7 @@ use std::iter; use std::num::NonZeroUsize; use subspace_archiving::archiver::{Archiver, ArchiverInstantiationError, SegmentItem}; use subspace_core_primitives::hashes::Blake3Hash; -use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, PieceObject}; +use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, GlobalObject}; use subspace_core_primitives::pieces::{Piece, Record}; use subspace_core_primitives::segments::{ ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment, @@ -47,15 +47,22 @@ fn extract_data_from_source_record>(record: &Record, offset: O) -> } #[track_caller] -fn compare_block_objects_to_piece_objects<'a>( +fn compare_block_objects_to_global_objects<'a>( block_objects: impl Iterator, - piece_objects: impl Iterator, + global_objects: impl Iterator, ) { - block_objects.zip(piece_objects).for_each( - |((block, block_object_mapping), (piece, piece_object_mapping))| { + block_objects.zip(global_objects).for_each( + |((block, block_object_mapping), (piece, global_object_mapping))| { + let piece_data = + extract_data_from_source_record(piece.record(), global_object_mapping.offset); + let block_data = extract_data(block, block_object_mapping.offset); assert_eq!( - extract_data_from_source_record(piece.record(), piece_object_mapping.offset), - extract_data(block, block_object_mapping.offset) + piece_data, + block_data, + "{global_object_mapping:?} data (len {}) does not match\n\ + {block_object_mapping:?} data (len {})", + piece_data.len(), + block_data.len(), ); }, ); @@ -98,10 +105,10 @@ fn archiver() { (block, object_mapping) }; + let block0_outcome = archiver.add_block(block_0.clone(), block_0_object_mapping.clone(), true); + let archived_segments = block0_outcome.archived_segments; // There is not enough data to produce archived segment yet - assert!(archiver - .add_block(block_0.clone(), block_0_object_mapping.clone(), true) - .is_empty()); + assert!(archived_segments.is_empty()); let (block_1, block_1_object_mapping) = { let mut block = vec![0u8; RecordedHistorySegment::SIZE / 3 * 2]; @@ -138,11 +145,11 @@ fn archiver() { (block, object_mapping) }; // This should produce 1 archived segment - let archived_segments = - archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true); + let block1_outcome = archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true); + let archived_segments = block1_outcome.archived_segments; assert_eq!(archived_segments.len(), 1); - let first_archived_segment = archived_segments.into_iter().next().unwrap(); + let first_archived_segment = archived_segments.first().cloned().unwrap(); assert_eq!( first_archived_segment.pieces.len(), ArchivedHistorySegment::NUM_PIECES @@ -163,30 +170,40 @@ fn archiver() { assert_eq!(last_archived_block.partial_archived(), Some(65011701)); } - assert_eq!( - first_archived_segment.object_mapping.len(), - RecordedHistorySegment::NUM_RAW_RECORDS - ); - // 4 objects fit into the first segment - assert_eq!( - first_archived_segment - .object_mapping - .iter() - .filter(|object_mapping| !object_mapping.objects().is_empty()) - .count(), - 4 - ); + // 4 objects fit into the first segment, 2 from block0, and 2 from block1 + let object_mapping = block0_outcome.object_mapping; + assert_eq!(object_mapping.len(), 2); + { + let block_objects = iter::repeat(block_0.as_ref()).zip(block_0_object_mapping.objects()); + let global_objects = object_mapping.iter().map(|object_mapping| { + ( + Piece::from( + &first_archived_segment.pieces[object_mapping.piece_index.position() as usize], + ), + object_mapping, + ) + }); + compare_block_objects_to_global_objects(block_objects, global_objects); + } + let object_mapping = block1_outcome.object_mapping; + assert_eq!(object_mapping.len(), 2); { - let block_objects = iter::repeat(block_0.as_ref()) - .zip(block_0_object_mapping.objects()) - .chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects())); - let piece_objects = first_archived_segment - .pieces - .source_pieces() - .zip(&first_archived_segment.object_mapping) - .flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(object_mapping.objects())); - - compare_block_objects_to_piece_objects(block_objects, piece_objects); + let block_objects = iter::repeat(block_1.as_ref()) + .zip(block_1_object_mapping.objects()) + .take(2); + let global_objects = object_mapping + .iter() + .map(|object_mapping| { + ( + Piece::from( + &first_archived_segment.pieces + [object_mapping.piece_index.position() as usize], + ), + object_mapping, + ) + }) + .take(2); + compare_block_objects_to_global_objects(block_objects, global_objects); } #[cfg(not(feature = "parallel"))] @@ -216,12 +233,11 @@ fn archiver() { block }; // This should be big enough to produce two archived segments in one go - let archived_segments = - archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true); - assert_eq!(archived_segments.len(), 2); + let block2_outcome = archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true); + assert_eq!(block2_outcome.archived_segments.len(), 2); // Check that initializing archiver with initial state before last block results in the same - // archived segments once last block is added + // archived segments and mappings once last block is added { let mut archiver_with_initial_state = Archiver::with_initial_state( kzg.clone(), @@ -238,46 +254,28 @@ fn archiver() { BlockObjectMapping::default(), true ), - archived_segments, + block2_outcome, ); } - assert_eq!( - archived_segments[0].object_mapping.len(), - RecordedHistorySegment::NUM_RAW_RECORDS - ); - // 1 object fits into the second segment - assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|object_mapping| !object_mapping.objects().is_empty()) - .count(), - 1 - ); - assert_eq!( - archived_segments[1].object_mapping.len(), - RecordedHistorySegment::NUM_RAW_RECORDS - ); - // 0 object fits into the second segment - assert_eq!( - archived_segments[1] - .object_mapping - .iter() - .filter(|object_mapping| !object_mapping.objects().is_empty()) - .count(), - 0 - ); + // 1 object fits into the second segment, and there are no objects left for the third segment + let archived_segments = block2_outcome.archived_segments; + let object_mapping = block2_outcome.object_mapping; + assert_eq!(object_mapping.len(), 1); { - let block_objects = - iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects().iter().skip(2)); - let piece_objects = archived_segments[0] - .pieces - .source_pieces() - .zip(&archived_segments[0].object_mapping) - .flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(object_mapping.objects())); - - compare_block_objects_to_piece_objects(block_objects, piece_objects); + let block_objects = iter::repeat(block_1.as_ref()) + .zip(block_1_object_mapping.objects()) + .skip(2) + .take(1); + let global_objects = object_mapping.iter().map(|object_mapping| { + ( + Piece::from( + &archived_segments[0].pieces[object_mapping.piece_index.position() as usize], + ), + object_mapping, + ) + }); + compare_block_objects_to_global_objects(block_objects, global_objects); } // Check archived bytes for block with index `2` in each archived segment @@ -343,12 +341,14 @@ fn archiver() { thread_rng().fill(block.as_mut_slice()); block }; - let archived_segments = - archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true); + let block3_outcome = archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true); + let archived_segments = block3_outcome.archived_segments.clone(); assert_eq!(archived_segments.len(), 1); + // There are no objects left for the fourth segment + assert_eq!(block3_outcome.object_mapping.len(), 0); // Check that initializing archiver with initial state before last block results in the same - // archived segments once last block is added + // archived segments and mappings once last block is added { let mut archiver_with_initial_state = Archiver::with_initial_state( kzg.clone(), @@ -361,11 +361,11 @@ fn archiver() { assert_eq!( archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true), - archived_segments, + block3_outcome, ); } - // Archived segment should fit exactly into the last archived segment (rare case) + // Block should fit exactly into the last archived segment (rare case) { let archived_segment = archived_segments.first().unwrap(); let last_archived_block = archived_segment.segment_header.last_archived_block(); @@ -492,6 +492,7 @@ fn one_byte_smaller_segment() { assert_eq!( Archiver::new(kzg.clone(), erasure_coding.clone()) .add_block(vec![0u8; block_size], BlockObjectMapping::default(), true) + .archived_segments .len(), 1 ); @@ -503,6 +504,7 @@ fn one_byte_smaller_segment() { BlockObjectMapping::default(), true ) + .archived_segments .is_empty()); } @@ -532,13 +534,14 @@ fn spill_over_edge_case() { - 3; assert!(archiver .add_block(vec![0u8; block_size], BlockObjectMapping::default(), true) + .archived_segments .is_empty()); // Here we add one more block with internal length that takes 4 bytes in compact length // encoding + one more for enum variant, this should result in new segment being created, but // the very first segment item will not include newly added block because it would result in // subtracting with overflow when trying to slice internal bytes of the segment item - let archived_segments = archiver.add_block( + let block_outcome = archiver.add_block( vec![0u8; RecordedHistorySegment::SIZE], BlockObjectMapping::V0 { objects: vec![BlockObject { @@ -548,23 +551,14 @@ fn spill_over_edge_case() { }, true, ); + let archived_segments = block_outcome.archived_segments; + let object_mapping = block_outcome.object_mapping; assert_eq!(archived_segments.len(), 2); // If spill over actually happened, we'll not find object mapping in the first segment + assert_eq!(object_mapping.len(), 1); assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 0 - ); - assert_eq!( - archived_segments[1] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 1 + object_mapping.first().unwrap().piece_index.segment_index(), + SegmentIndex::ONE ); } @@ -578,8 +572,10 @@ fn object_on_the_edge_of_segment() { .unwrap(); let mut archiver = Archiver::new(kzg, erasure_coding); let first_block = vec![0u8; RecordedHistorySegment::SIZE]; - let archived_segments = + let block1_outcome = archiver.add_block(first_block.clone(), BlockObjectMapping::default(), true); + let archived_segments = block1_outcome.archived_segments; + let object_mapping = block1_outcome.object_mapping; assert_eq!(archived_segments.len(), 1); let archived_segment = archived_segments.into_iter().next().unwrap(); let left_unarchived_from_first_block = first_block.len() as u32 @@ -589,6 +585,7 @@ fn object_on_the_edge_of_segment() { .archived_progress .partial() .unwrap(); + assert_eq!(object_mapping.len(), 0); let mut second_block = vec![0u8; RecordedHistorySegment::SIZE * 2]; let object_mapping = BlockObject { @@ -632,7 +629,7 @@ fn object_on_the_edge_of_segment() { // First ensure that any smaller offset will get translated into the first archived segment, // this is a protection against code regressions { - let archived_segments = archiver.clone().add_block( + let block2_outcome = archiver.clone().add_block( second_block.clone(), BlockObjectMapping::V0 { objects: vec![BlockObject { @@ -642,45 +639,34 @@ fn object_on_the_edge_of_segment() { }, true, ); + let archived_segments = block2_outcome.archived_segments; + let object_mapping = block2_outcome.object_mapping; assert_eq!(archived_segments.len(), 2); + assert_eq!(object_mapping.len(), 1); assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 1 + object_mapping.first().unwrap().piece_index.segment_index(), + archived_segments[0].segment_header.segment_index(), ); } - let archived_segments = archiver.add_block( + let block2_outcome = archiver.add_block( second_block, BlockObjectMapping::V0 { objects: vec![object_mapping], }, true, ); + let archived_segments = block2_outcome.archived_segments; + let object_mapping = block2_outcome.object_mapping; assert_eq!(archived_segments.len(), 2); - assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 0 - ); // Object should fall in the next archived segment + assert_eq!(object_mapping.len(), 1); assert_eq!( - archived_segments[1] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 1 + object_mapping.first().unwrap().piece_index.segment_index(), + archived_segments[1].segment_header.segment_index(), ); - assert_eq!(archived_segments[1].object_mapping[0].objects().len(), 1); // Ensure bytes are mapped correctly assert_eq!( @@ -689,7 +675,7 @@ fn object_on_the_edge_of_segment() { .to_raw_record_chunks() .flatten() .copied() - .skip(archived_segments[1].object_mapping[0].objects()[0].offset as usize) + .skip(object_mapping[0].offset as usize) .take(mapped_bytes.len()) .collect::>(), mapped_bytes diff --git a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs index 69bc2d7e7e..41175dd743 100644 --- a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs +++ b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs @@ -33,7 +33,9 @@ fn segment_reconstruction_works() { let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); @@ -79,7 +81,9 @@ fn piece_reconstruction_works() { // Block that fits into the segment fully let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); @@ -143,7 +147,9 @@ fn segment_reconstruction_fails() { // Block that fits into the segment fully let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); @@ -184,7 +190,9 @@ fn piece_reconstruction_fails() { // Block that fits into the segment fully let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); diff --git a/crates/subspace-archiving/tests/integration/reconstructor.rs b/crates/subspace-archiving/tests/integration/reconstructor.rs index 05af807097..895d2a5145 100644 --- a/crates/subspace-archiving/tests/integration/reconstructor.rs +++ b/crates/subspace-archiving/tests/integration/reconstructor.rs @@ -58,11 +58,28 @@ fn basic() { }; let archived_segments = archiver .add_block(block_0.clone(), BlockObjectMapping::default(), true) + .archived_segments .into_iter() - .chain(archiver.add_block(block_1.clone(), BlockObjectMapping::default(), true)) - .chain(archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true)) - .chain(archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true)) - .chain(archiver.add_block(block_4, BlockObjectMapping::default(), true)) + .chain( + archiver + .add_block(block_1.clone(), BlockObjectMapping::default(), true) + .archived_segments, + ) + .chain( + archiver + .add_block(block_2.clone(), BlockObjectMapping::default(), true) + .archived_segments, + ) + .chain( + archiver + .add_block(block_3.clone(), BlockObjectMapping::default(), true) + .archived_segments, + ) + .chain( + archiver + .add_block(block_4, BlockObjectMapping::default(), true) + .archived_segments, + ) .collect::>(); assert_eq!(archived_segments.len(), 5); @@ -271,8 +288,13 @@ fn partial_data() { }; let archived_segments = archiver .add_block(block_0.clone(), BlockObjectMapping::default(), true) + .archived_segments .into_iter() - .chain(archiver.add_block(block_1, BlockObjectMapping::default(), true)) + .chain( + archiver + .add_block(block_1, BlockObjectMapping::default(), true) + .archived_segments, + ) .collect::>(); assert_eq!(archived_segments.len(), 1); @@ -347,7 +369,9 @@ fn invalid_usage() { block }; - let archived_segments = archiver.add_block(block_0, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block_0, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 4); diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 04eabe50a3..42eb8adb1b 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -64,6 +64,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index afcb5ed1ae..b39e8aac28 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -52,6 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index bd719b5805..3d5bb3d6f1 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -72,6 +72,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 1826254020..3e2f6b1133 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -63,6 +63,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index edd8d49529..cc5cb1c1f4 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -217,6 +217,7 @@ where BlockObjectMapping::default(), true, ) + .archived_segments .into_iter() .next() .expect("First block is always producing one segment; qed"); From 545ce282be3e0ca609eaa19b46f903b6ed7edcab Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 8 Oct 2024 14:00:30 +1000 Subject: [PATCH 6/6] Make archiver tests easier to debug --- .../tests/integration/archiver.rs | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/crates/subspace-archiving/tests/integration/archiver.rs b/crates/subspace-archiving/tests/integration/archiver.rs index 8f73d81f33..f60c607af9 100644 --- a/crates/subspace-archiving/tests/integration/archiver.rs +++ b/crates/subspace-archiving/tests/integration/archiver.rs @@ -18,25 +18,29 @@ use subspace_erasure_coding::ErasureCoding; use subspace_kzg::Kzg; use subspace_verification::is_piece_valid; +#[track_caller] fn extract_data>(data: &[u8], offset: O) -> &[u8] { let offset: u64 = offset.into(); let Compact(size) = Compact::::decode(&mut &data[offset as usize..]).unwrap(); &data[offset as usize + Compact::compact_len(&size)..][..size as usize] } +#[track_caller] fn extract_data_from_source_record>(record: &Record, offset: O) -> Vec { let offset: u64 = offset.into(); - let Compact(size) = Compact::::decode( - &mut record - .to_raw_record_chunks() - .flatten() - .copied() - .skip(offset as usize) - .take(8) - .collect::>() - .as_slice(), - ) - .unwrap(); + let size_bytes = record + .to_raw_record_chunks() + .flatten() + .copied() + .skip(offset as usize) + .take(8) + .collect::>(); + let Compact(size) = Compact::::decode(&mut size_bytes.as_slice()).unwrap_or_else(|_| { + panic!( + "{} could not be decoded as Compact: offset {offset}", + hex::encode(&size_bytes) + ) + }); record .to_raw_record_chunks() .flatten() @@ -57,8 +61,8 @@ fn compare_block_objects_to_global_objects<'a>( extract_data_from_source_record(piece.record(), global_object_mapping.offset); let block_data = extract_data(block, block_object_mapping.offset); assert_eq!( - piece_data, - block_data, + hex::encode(&piece_data), + hex::encode(block_data), "{global_object_mapping:?} data (len {}) does not match\n\ {block_object_mapping:?} data (len {})", piece_data.len(), @@ -93,11 +97,11 @@ fn archiver() { let object_mapping = BlockObjectMapping::V0 { objects: vec![ BlockObject { - hash: Blake3Hash::default(), + hash: [0x01; 32].into(), offset: 0u32, }, BlockObject { - hash: Blake3Hash::default(), + hash: [0x02; 32].into(), offset: RecordedHistorySegment::SIZE as u32 / 3, }, ], @@ -129,15 +133,15 @@ fn archiver() { let object_mapping = BlockObjectMapping::V0 { objects: vec![ BlockObject { - hash: Blake3Hash::default(), + hash: [0x03; 32].into(), offset: RecordedHistorySegment::SIZE as u32 / 6, }, BlockObject { - hash: Blake3Hash::default(), + hash: [0x04; 32].into(), offset: RecordedHistorySegment::SIZE as u32 / 5, }, BlockObject { - hash: Blake3Hash::default(), + hash: [0x05; 32].into(), offset: RecordedHistorySegment::SIZE as u32 / 3 * 2 - 200, }, ], @@ -545,7 +549,7 @@ fn spill_over_edge_case() { vec![0u8; RecordedHistorySegment::SIZE], BlockObjectMapping::V0 { objects: vec![BlockObject { - hash: Blake3Hash::default(), + hash: [0x06; 32].into(), offset: 0, }], }, @@ -589,7 +593,7 @@ fn object_on_the_edge_of_segment() { let mut second_block = vec![0u8; RecordedHistorySegment::SIZE * 2]; let object_mapping = BlockObject { - hash: Blake3Hash::default(), + hash: [0x07; 32].into(), // Offset is designed to fall exactly on the edge of the segment offset: RecordedHistorySegment::SIZE as u32 // Segment enum variant