Skip to content

Commit

Permalink
Merge pull request #3040 from autonomys/obj-map-versions
Browse files Browse the repository at this point in the history
Move versioning to BlockObjectMapping and PieceObjectMapping
  • Loading branch information
teor2345 authored Sep 20, 2024
2 parents c983442 + 920c633 commit b6b34fd
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 120 deletions.
45 changes: 21 additions & 24 deletions crates/subspace-archiving/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,16 @@ impl NewArchivedSegment {
.segment_piece_indexes_source_first();

// Iterate through the object mapping vector for each piece
object_mapping
.into_iter()
.zip(piece_indexes)
.flat_map(|(piece_mappings, piece_index)| {
object_mapping.into_iter().zip(piece_indexes).flat_map(
move |(piece_mappings, piece_index)| {
// And then through each individual object mapping in the piece
let piece_mappings = piece_mappings.objects().to_vec();

piece_mappings
.objects
.into_iter()
.map(move |piece_object| GlobalObject::new(piece_index, &piece_object))
})
},
)
}
}

Expand Down Expand Up @@ -316,11 +316,10 @@ impl Archiver {
// Take part of the encoded block that wasn't archived yet and push to the
// buffer and block continuation
object_mapping
.objects
.objects_mut()
.retain_mut(|block_object: &mut BlockObject| {
let current_offset = block_object.offset();
if current_offset >= archived_block_bytes {
block_object.set_offset(current_offset - archived_block_bytes);
if block_object.offset >= archived_block_bytes {
block_object.offset -= archived_block_bytes;
true
} else {
false
Expand Down Expand Up @@ -509,13 +508,12 @@ impl Archiver {

bytes.truncate(split_point);

let continuation_object_mapping = BlockObjectMapping {
let continuation_object_mapping = BlockObjectMapping::V0 {
objects: object_mapping
.objects
.objects_mut()
.extract_if(|block_object: &mut BlockObject| {
let current_offset = block_object.offset();
if current_offset >= split_point as u32 {
block_object.set_offset(current_offset - split_point as u32);
if block_object.offset >= split_point as u32 {
block_object.offset -= split_point as u32;
true
} else {
false
Expand Down Expand Up @@ -553,13 +551,12 @@ impl Archiver {

bytes.truncate(split_point);

let continuation_object_mapping = BlockObjectMapping {
let continuation_object_mapping = BlockObjectMapping::V0 {
objects: object_mapping
.objects
.objects_mut()
.extract_if(|block_object: &mut BlockObject| {
let current_offset = block_object.offset();
if current_offset >= split_point as u32 {
block_object.set_offset(current_offset - split_point as u32);
if block_object.offset >= split_point as u32 {
block_object.offset -= split_point as u32;
true
} else {
false
Expand Down Expand Up @@ -640,21 +637,21 @@ impl Archiver {
bytes,
object_mapping,
} => {
for block_object in &object_mapping.objects {
for block_object in object_mapping.objects() {
// `+1` corresponds to `SegmentItem::X {}` enum variant encoding
let offset_in_segment = base_offset_in_segment
+ 1
+ Compact::compact_len(&(bytes.len() as u32))
+ block_object.offset() as usize;
+ block_object.offset as usize;
let raw_piece_offset =
(offset_in_segment % RawRecord::SIZE).try_into().expect(
"Offset within piece should always fit in 32-bit integer; qed",
);
if let Some(piece_object_mapping) = corrected_object_mapping
.get_mut(offset_in_segment / RawRecord::SIZE)
{
piece_object_mapping.objects.push(PieceObject::V0 {
hash: block_object.hash(),
piece_object_mapping.objects_mut().push(PieceObject {
hash: block_object.hash,
offset: raw_piece_offset,
});
}
Expand Down
66 changes: 33 additions & 33 deletions crates/subspace-archiving/tests/integration/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ fn compare_block_objects_to_piece_objects<'a>(
block_objects.zip(piece_objects).for_each(
|((block, block_object_mapping), (piece, piece_object_mapping))| {
assert_eq!(
extract_data_from_source_record(piece.record(), piece_object_mapping.offset()),
extract_data(block, block_object_mapping.offset())
extract_data_from_source_record(piece.record(), piece_object_mapping.offset),
extract_data(block, block_object_mapping.offset)
);
},
);
Expand Down Expand Up @@ -81,13 +81,13 @@ fn archiver() {
.as_mut()
.write_all(&Compact(128_u64).encode())
.unwrap();
let object_mapping = BlockObjectMapping {
let object_mapping = BlockObjectMapping::V0 {
objects: vec![
BlockObject::V0 {
BlockObject {
hash: Blake3Hash::default(),
offset: 0u32,
},
BlockObject::V0 {
BlockObject {
hash: Blake3Hash::default(),
offset: RecordedHistorySegment::SIZE as u32 / 3,
},
Expand Down Expand Up @@ -117,17 +117,17 @@ fn archiver() {
.as_mut()
.write_all(&Compact(100_u64).encode())
.unwrap();
let object_mapping = BlockObjectMapping {
let object_mapping = BlockObjectMapping::V0 {
objects: vec![
BlockObject::V0 {
BlockObject {
hash: Blake3Hash::default(),
offset: RecordedHistorySegment::SIZE as u32 / 6,
},
BlockObject::V0 {
BlockObject {
hash: Blake3Hash::default(),
offset: RecordedHistorySegment::SIZE as u32 / 5,
},
BlockObject::V0 {
BlockObject {
hash: Blake3Hash::default(),
offset: RecordedHistorySegment::SIZE as u32 / 3 * 2 - 200,
},
Expand Down Expand Up @@ -170,19 +170,19 @@ fn archiver() {
first_archived_segment
.object_mapping
.iter()
.filter(|object_mapping| !object_mapping.objects.is_empty())
.filter(|object_mapping| !object_mapping.objects().is_empty())
.count(),
4
);
{
let block_objects = iter::repeat(block_0.as_ref())
.zip(&block_0_object_mapping.objects)
.chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects.iter()));
.zip(block_0_object_mapping.objects())
.chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects()));
let piece_objects = first_archived_segment
.pieces
.source_pieces()
.zip(&first_archived_segment.object_mapping)
.flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(&object_mapping.objects));
.flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(object_mapping.objects()));

compare_block_objects_to_piece_objects(block_objects, piece_objects);
}
Expand Down Expand Up @@ -249,7 +249,7 @@ fn archiver() {
archived_segments[0]
.object_mapping
.iter()
.filter(|object_mapping| !object_mapping.objects.is_empty())
.filter(|object_mapping| !object_mapping.objects().is_empty())
.count(),
1
);
Expand All @@ -262,18 +262,18 @@ fn archiver() {
archived_segments[1]
.object_mapping
.iter()
.filter(|object_mapping| !object_mapping.objects.is_empty())
.filter(|object_mapping| !object_mapping.objects().is_empty())
.count(),
0
);
{
let block_objects =
iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects.iter().skip(2));
iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects().iter().skip(2));
let piece_objects = archived_segments[0]
.pieces
.source_pieces()
.zip(&archived_segments[0].object_mapping)
.flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(&object_mapping.objects));
.flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(object_mapping.objects()));

compare_block_objects_to_piece_objects(block_objects, piece_objects);
}
Expand Down Expand Up @@ -538,8 +538,8 @@ fn spill_over_edge_case() {
// subtracting with overflow when trying to slice internal bytes of the segment item
let archived_segments = archiver.add_block(
vec![0u8; RecordedHistorySegment::SIZE],
BlockObjectMapping {
objects: vec![BlockObject::V0 {
BlockObjectMapping::V0 {
objects: vec![BlockObject {
hash: Blake3Hash::default(),
offset: 0,
}],
Expand All @@ -552,15 +552,15 @@ fn spill_over_edge_case() {
archived_segments[0]
.object_mapping
.iter()
.filter(|o| !o.objects.is_empty())
.filter(|o| !o.objects().is_empty())
.count(),
0
);
assert_eq!(
archived_segments[1]
.object_mapping
.iter()
.filter(|o| !o.objects.is_empty())
.filter(|o| !o.objects().is_empty())
.count(),
1
);
Expand Down Expand Up @@ -589,7 +589,7 @@ fn object_on_the_edge_of_segment() {
.unwrap();

let mut second_block = vec![0u8; RecordedHistorySegment::SIZE * 2];
let object_mapping = BlockObject::V0 {
let object_mapping = BlockObject {
hash: Blake3Hash::default(),
// Offset is designed to fall exactly on the edge of the segment
offset: RecordedHistorySegment::SIZE as u32
Expand Down Expand Up @@ -624,18 +624,18 @@ fn object_on_the_edge_of_segment() {
};
let mapped_bytes = rand::random::<[u8; 32]>().to_vec().encode();
// Write mapped bytes at expected offset in source data
second_block[object_mapping.offset() as usize..][..mapped_bytes.len()]
second_block[object_mapping.offset as usize..][..mapped_bytes.len()]
.copy_from_slice(&mapped_bytes);

// First ensure that any smaller offset will get translated into the first archived segment,
// this is a protection against code regressions
{
let archived_segments = archiver.clone().add_block(
second_block.clone(),
BlockObjectMapping {
objects: vec![BlockObject::V0 {
hash: object_mapping.hash(),
offset: object_mapping.offset() - 1,
BlockObjectMapping::V0 {
objects: vec![BlockObject {
hash: object_mapping.hash,
offset: object_mapping.offset - 1,
}],
},
true,
Expand All @@ -646,15 +646,15 @@ fn object_on_the_edge_of_segment() {
archived_segments[0]
.object_mapping
.iter()
.filter(|o| !o.objects.is_empty())
.filter(|o| !o.objects().is_empty())
.count(),
1
);
}

let archived_segments = archiver.add_block(
second_block,
BlockObjectMapping {
BlockObjectMapping::V0 {
objects: vec![object_mapping],
},
true,
Expand All @@ -665,7 +665,7 @@ fn object_on_the_edge_of_segment() {
archived_segments[0]
.object_mapping
.iter()
.filter(|o| !o.objects.is_empty())
.filter(|o| !o.objects().is_empty())
.count(),
0
);
Expand All @@ -674,11 +674,11 @@ fn object_on_the_edge_of_segment() {
archived_segments[1]
.object_mapping
.iter()
.filter(|o| !o.objects.is_empty())
.filter(|o| !o.objects().is_empty())
.count(),
1
);
assert_eq!(archived_segments[1].object_mapping[0].objects.len(), 1);
assert_eq!(archived_segments[1].object_mapping[0].objects().len(), 1);

// Ensure bytes are mapped correctly
assert_eq!(
Expand All @@ -687,7 +687,7 @@ fn object_on_the_edge_of_segment() {
.to_raw_record_chunks()
.flatten()
.copied()
.skip(archived_segments[1].object_mapping[0].objects[0].offset() as usize)
.skip(archived_segments[1].object_mapping[0].objects()[0].offset as usize)
.take(mapped_bytes.len())
.collect::<Vec<_>>(),
mapped_bytes
Expand Down
Loading

0 comments on commit b6b34fd

Please sign in to comment.