Skip to content

Commit

Permalink
Merge branch 'main' into operator-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
NingLin-P authored Aug 7, 2023
2 parents e1abfdc + 588708f commit da80d3c
Show file tree
Hide file tree
Showing 20 changed files with 165 additions and 71 deletions.
2 changes: 1 addition & 1 deletion crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ pub fn create_archived_segment(kzg: Kzg) -> NewArchivedSegment {
let mut block = vec![0u8; RecordedHistorySegment::SIZE];
rand::thread_rng().fill(block.as_mut_slice());
archiver
.add_block(block, Default::default())
.add_block(block, Default::default(), true)
.into_iter()
.next()
.unwrap()
Expand Down
16 changes: 12 additions & 4 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::{
get_chain_constants, ArchivedSegmentNotification, BlockImportingNotification, SubspaceLink,
SubspaceNotificationSender,
SubspaceNotificationSender, SubspaceSyncOracle,
};
use codec::{Decode, Encode};
use futures::StreamExt;
Expand All @@ -29,6 +29,7 @@ use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
use sc_utils::mpsc::tracing_unbounded;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::SyncOracle;
use sp_consensus_subspace::{FarmerPublicKey, SubspaceApi};
use sp_objects::ObjectsApi;
use sp_runtime::generic::SignedBlock;
Expand Down Expand Up @@ -500,7 +501,8 @@ where
encoded_block.len() as f32 / 1024.0
);

let archived_segments = archiver.add_block(encoded_block, block_object_mappings);
let archived_segments =
archiver.add_block(encoded_block, block_object_mappings, false);
let new_segment_headers: Vec<SegmentHeader> = archived_segments
.iter()
.map(|archived_segment| archived_segment.segment_header)
Expand Down Expand Up @@ -584,10 +586,11 @@ fn finalize_block<Block, Backend, Client>(
/// `store_segment_header` extrinsic).
///
/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
pub fn create_subspace_archiver<Block, Backend, Client, AS>(
pub fn create_subspace_archiver<Block, Backend, Client, AS, SO>(
segment_headers_store: SegmentHeadersStore<AS>,
subspace_link: &SubspaceLink<Block>,
client: Arc<Client>,
sync_oracle: SubspaceSyncOracle<SO>,
telemetry: Option<TelemetryHandle>,
) -> impl Future<Output = ()> + Send + 'static
where
Expand All @@ -604,6 +607,7 @@ where
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
AS: AuxStore + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + 'static,
{
let client_info = client.info();
let best_block_hash = client_info.best_hash;
Expand Down Expand Up @@ -725,7 +729,11 @@ where
);

let mut new_segment_headers = Vec::new();
for archived_segment in archiver.add_block(encoded_block, block_object_mappings) {
for archived_segment in archiver.add_block(
encoded_block,
block_object_mappings,
!sync_oracle.is_major_syncing(),
) {
let segment_header = archived_segment.segment_header;

if let Err(error) =
Expand Down
2 changes: 1 addition & 1 deletion crates/sp-lightclient/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn archived_segment(kzg: Kzg) -> NewArchivedSegment {
let mut archiver = Archiver::new(kzg).unwrap();

archiver
.add_block(block, Default::default())
.add_block(block, Default::default(), true)
.into_iter()
.next()
.unwrap()
Expand Down
33 changes: 26 additions & 7 deletions crates/subspace-archiving/benches/archiving.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use rand::{thread_rng, Rng};
use subspace_archiving::archiver::Archiver;
use subspace_core_primitives::crypto::kzg;
use subspace_core_primitives::crypto::kzg::Kzg;

const AMOUNT_OF_DATA: usize = 1024 * 1024;
const AMOUNT_OF_DATA: usize = 5 * 1024 * 1024;
const SMALL_BLOCK_SIZE: usize = 500;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -15,17 +15,36 @@ fn criterion_benchmark(c: &mut Criterion) {

c.bench_function("segment-archiving-large-block", |b| {
b.iter(|| {
archiver
.clone()
.add_block(input.clone(), Default::default());
archiver.clone().add_block(
black_box(input.clone()),
black_box(Default::default()),
black_box(true),
);
})
});

c.bench_function("segment-archiving-small-blocks", |b| {
c.bench_function("segment-archiving-small-blocks/incremental", |b| {
b.iter(|| {
let mut archiver = archiver.clone();
for chunk in input.chunks(SMALL_BLOCK_SIZE) {
archiver.add_block(chunk.to_vec(), Default::default());
archiver.add_block(
black_box(chunk.to_vec()),
black_box(Default::default()),
black_box(true),
);
}
})
});

c.bench_function("segment-archiving-small-blocks/non-incremental", |b| {
b.iter(|| {
let mut archiver = archiver.clone();
for chunk in input.chunks(SMALL_BLOCK_SIZE) {
archiver.add_block(
black_box(chunk.to_vec()),
black_box(Default::default()),
black_box(false),
);
}
})
});
Expand Down
12 changes: 8 additions & 4 deletions crates/subspace-archiving/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,15 @@ impl Archiver {
}
}

/// Adds new block to internal buffer, potentially producing pieces and segment header headers
/// Adds new block to internal buffer, potentially producing pieces and segment header headers.
///
/// Incremental archiving can be enabled if amortized block addition cost is preferred over
/// throughput.
pub fn add_block(
&mut self,
bytes: Vec<u8>,
object_mapping: BlockObjectMapping,
incremental: bool,
) -> Vec<NewArchivedSegment> {
// Append new block to the buffer
self.buffer.push_back(SegmentItem::Block {
Expand All @@ -355,7 +359,7 @@ impl Archiver {

let mut archived_segments = Vec::new();

while let Some(segment) = self.produce_segment() {
while let Some(segment) = self.produce_segment(incremental) {
archived_segments.push(self.produce_archived_segment(segment));
}

Expand All @@ -364,7 +368,7 @@ impl Archiver {

/// Try to slice buffer contents into segments if there is enough data, producing one segment at
/// a time
fn produce_segment(&mut self) -> Option<Segment> {
fn produce_segment(&mut self, incremental: bool) -> Option<Segment> {
let mut segment = Segment::V0 {
items: Vec::with_capacity(self.buffer.len()),
};
Expand All @@ -383,7 +387,7 @@ impl Archiver {
let bytes_committed_to = existing_commitments * RawRecord::SIZE;
// Run incremental archiver only when there is at least two records to archive,
// otherwise we're wasting CPU cycles encoding segment over and over again
if segment_size - bytes_committed_to >= RawRecord::SIZE * 2 {
if incremental && segment_size - bytes_committed_to >= RawRecord::SIZE * 2 {
update_record_commitments(
&mut self.incremental_record_commitments,
&segment,
Expand Down
35 changes: 25 additions & 10 deletions crates/subspace-archiving/tests/integration/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn archiver() {
};
// There is not enough data to produce archived segment yet
assert!(archiver
.add_block(block_0.clone(), block_0_object_mapping.clone())
.add_block(block_0.clone(), block_0_object_mapping.clone(), true)
.is_empty());

let (block_1, block_1_object_mapping) = {
Expand Down Expand Up @@ -133,7 +133,8 @@ fn archiver() {
(block, object_mapping)
};
// This should produce 1 archived segment
let archived_segments = archiver.add_block(block_1.clone(), block_1_object_mapping.clone());
let archived_segments =
archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true);
assert_eq!(archived_segments.len(), 1);

let first_archived_segment = archived_segments.into_iter().next().unwrap();
Expand Down Expand Up @@ -210,7 +211,8 @@ fn archiver() {
block
};
// This should be big enough to produce two archived segments in one go
let archived_segments = archiver.add_block(block_2.clone(), BlockObjectMapping::default());
let archived_segments =
archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true);
assert_eq!(archived_segments.len(), 2);

// Check that initializing archiver with initial state before last block results in the same
Expand All @@ -225,7 +227,11 @@ fn archiver() {
.unwrap();

assert_eq!(
archiver_with_initial_state.add_block(block_2.clone(), BlockObjectMapping::default()),
archiver_with_initial_state.add_block(
block_2.clone(),
BlockObjectMapping::default(),
true
),
archived_segments,
);
}
Expand Down Expand Up @@ -331,7 +337,8 @@ fn archiver() {
thread_rng().fill(block.as_mut_slice());
block
};
let archived_segments = archiver.add_block(block_3.clone(), BlockObjectMapping::default());
let archived_segments =
archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true);
assert_eq!(archived_segments.len(), 1);

// Check that initializing archiver with initial state before last block results in the same
Expand All @@ -346,7 +353,7 @@ fn archiver() {
.unwrap();

assert_eq!(
archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default()),
archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true),
archived_segments,
);
}
Expand Down Expand Up @@ -466,15 +473,19 @@ fn one_byte_smaller_segment() {
assert_eq!(
Archiver::new(kzg.clone())
.unwrap()
.add_block(vec![0u8; block_size], BlockObjectMapping::default())
.add_block(vec![0u8; block_size], BlockObjectMapping::default(), true)
.len(),
1
);
// Cutting just one byte more is not sufficient to produce a segment, this is a protection
// against code regressions
assert!(Archiver::new(kzg)
.unwrap()
.add_block(vec![0u8; block_size - 1], BlockObjectMapping::default())
.add_block(
vec![0u8; block_size - 1],
BlockObjectMapping::default(),
true
)
.is_empty());
}

Expand All @@ -498,7 +509,7 @@ fn spill_over_edge_case() {
// We leave three bytes at the end intentionally
- 3;
assert!(archiver
.add_block(vec![0u8; block_size], BlockObjectMapping::default())
.add_block(vec![0u8; block_size], BlockObjectMapping::default(), true)
.is_empty());

// Here we add one more block with internal length that takes 4 bytes in compact length
Expand All @@ -513,6 +524,7 @@ fn spill_over_edge_case() {
offset: 0,
}],
},
true,
);
assert_eq!(archived_segments.len(), 2);
// If spill over actually happened, we'll not find object mapping in the first segment
Expand All @@ -539,7 +551,8 @@ fn object_on_the_edge_of_segment() {
let kzg = Kzg::new(embedded_kzg_settings());
let mut archiver = Archiver::new(kzg).unwrap();
let first_block = vec![0u8; RecordedHistorySegment::SIZE];
let archived_segments = archiver.add_block(first_block.clone(), BlockObjectMapping::default());
let archived_segments =
archiver.add_block(first_block.clone(), BlockObjectMapping::default(), true);
assert_eq!(archived_segments.len(), 1);
let archived_segment = archived_segments.into_iter().next().unwrap();
let left_unarchived_from_first_block = first_block.len() as u32
Expand Down Expand Up @@ -600,6 +613,7 @@ fn object_on_the_edge_of_segment() {
offset: object_mapping.offset() - 1,
}],
},
true,
);

assert_eq!(archived_segments.len(), 2);
Expand All @@ -618,6 +632,7 @@ fn object_on_the_edge_of_segment() {
BlockObjectMapping {
objects: vec![object_mapping],
},
true,
);

assert_eq!(archived_segments.len(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn segment_reconstruction_works() {

let block = get_random_block();

let archived_segments = archiver.add_block(block, BlockObjectMapping::default());
let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true);

assert_eq!(archived_segments.len(), 1);

Expand Down Expand Up @@ -66,7 +66,7 @@ fn piece_reconstruction_works() {
// Block that fits into the segment fully
let block = get_random_block();

let archived_segments = archiver.add_block(block, BlockObjectMapping::default());
let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true);

assert_eq!(archived_segments.len(), 1);

Expand Down Expand Up @@ -126,7 +126,7 @@ fn segment_reconstruction_fails() {
// Block that fits into the segment fully
let block = get_random_block();

let archived_segments = archiver.add_block(block, BlockObjectMapping::default());
let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true);

assert_eq!(archived_segments.len(), 1);

Expand Down Expand Up @@ -163,7 +163,7 @@ fn piece_reconstruction_fails() {
// Block that fits into the segment fully
let block = get_random_block();

let archived_segments = archiver.add_block(block, BlockObjectMapping::default());
let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true);

assert_eq!(archived_segments.len(), 1);

Expand Down
16 changes: 8 additions & 8 deletions crates/subspace-archiving/tests/integration/reconstructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ fn basic() {
block
};
let archived_segments = archiver
.add_block(block_0.clone(), BlockObjectMapping::default())
.add_block(block_0.clone(), BlockObjectMapping::default(), true)
.into_iter()
.chain(archiver.add_block(block_1.clone(), BlockObjectMapping::default()))
.chain(archiver.add_block(block_2.clone(), BlockObjectMapping::default()))
.chain(archiver.add_block(block_3.clone(), BlockObjectMapping::default()))
.chain(archiver.add_block(block_4, BlockObjectMapping::default()))
.chain(archiver.add_block(block_1.clone(), BlockObjectMapping::default(), true))
.chain(archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true))
.chain(archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true))
.chain(archiver.add_block(block_4, BlockObjectMapping::default(), true))
.collect::<Vec<_>>();

assert_eq!(archived_segments.len(), 5);
Expand Down Expand Up @@ -257,9 +257,9 @@ fn partial_data() {
block
};
let archived_segments = archiver
.add_block(block_0.clone(), BlockObjectMapping::default())
.add_block(block_0.clone(), BlockObjectMapping::default(), true)
.into_iter()
.chain(archiver.add_block(block_1, BlockObjectMapping::default()))
.chain(archiver.add_block(block_1, BlockObjectMapping::default(), true))
.collect::<Vec<_>>();

assert_eq!(archived_segments.len(), 1);
Expand Down Expand Up @@ -332,7 +332,7 @@ fn invalid_usage() {
block
};

let archived_segments = archiver.add_block(block_0, BlockObjectMapping::default());
let archived_segments = archiver.add_block(block_0, BlockObjectMapping::default(), true);

assert_eq!(archived_segments.len(), 4);

Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
.add_block(
AsRef::<[u8]>::as_ref(input.as_ref()).to_vec(),
Default::default(),
true,
)
.into_iter()
.next()
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fn criterion_benchmark(c: &mut Criterion) {
.add_block(
AsRef::<[u8]>::as_ref(input.as_ref()).to_vec(),
Default::default(),
true,
)
.into_iter()
.next()
Expand Down
Loading

0 comments on commit da80d3c

Please sign in to comment.