From 3a80c8214d6aec886bedd156973647e69014919c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 9 Mar 2024 07:23:38 +0200 Subject: [PATCH] Add internal benchmark in `SingleDiskFarm` to identify faster `ReadSectorRecordChunksMode` --- .../src/bin/subspace-farmer/commands/farm.rs | 2 + .../subspace-farmer/src/single_disk_farm.rs | 78 ++++++++++++++++--- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index c3f74ac7bdb..e2239cb5d95 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -630,6 +630,7 @@ where let (single_disk_farms, plotting_delay_senders) = tokio::task::block_in_place(|| { let handle = Handle::current(); + let faster_read_sector_record_chunks_mode_concurrency = &Semaphore::new(1); let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len()) .map(|_| oneshot::channel()) .unzip::<_, _, Vec<_>, Vec<_>>(); @@ -664,6 +665,7 @@ where plotting_thread_pool_manager: plotting_thread_pool_manager.clone(), plotting_delay: Some(plotting_delay_receiver), disable_farm_locking, + faster_read_sector_record_chunks_mode_concurrency, }, disk_farm_index, ); diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index f7dc23924ab..9f2bdecc6f9 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -36,6 +36,7 @@ use futures::stream::FuturesUnordered; use futures::{select, FutureExt, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; +use rand::prelude::*; use rayon::prelude::*; use rayon::ThreadPoolBuilder; use serde::{Deserialize, Serialize}; @@ -48,10 +49,10 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{fs, io, mem}; -use subspace_core_primitives::crypto::blake3_hash; use subspace_core_primitives::crypto::kzg::Kzg; +use subspace_core_primitives::crypto::{blake3_hash, Scalar}; use subspace_core_primitives::{ Blake3Hash, HistorySize, Piece, PieceOffset, PublicKey, Record, SectorId, SectorIndex, SegmentIndex, @@ -261,7 +262,7 @@ impl PlotMetadataHeader { } /// Options used to open single disk farm -pub struct SingleDiskFarmOptions { +pub struct SingleDiskFarmOptions<'a, NC, PG> { /// Path to directory where farm is stored. pub directory: PathBuf, /// Information necessary for farmer application @@ -299,6 +300,8 @@ pub struct SingleDiskFarmOptions { pub plotting_delay: Option>, /// Disable farm locking, for example if file system doesn't support it pub disable_farm_locking: bool, + /// Limit concurrency of internal benchmarking between different farms + pub faster_read_sector_record_chunks_mode_concurrency: &'a Semaphore, } /// Errors happening when trying to create/open single disk farm @@ -602,7 +605,7 @@ impl SingleDiskFarm { /// /// NOTE: Though this function is async, it will do some blocking I/O. pub async fn new( - options: SingleDiskFarmOptions, + options: SingleDiskFarmOptions<'_, NC, PG>, disk_farm_index: usize, ) -> Result where @@ -628,6 +631,7 @@ impl SingleDiskFarm { plotting_delay, farm_during_initial_plotting, disable_farm_locking, + faster_read_sector_record_chunks_mode_concurrency, } = options; fs::create_dir_all(&directory)?; @@ -928,10 +932,13 @@ impl SingleDiskFarm { sector_size, ); - let read_sector_record_chunks_mode = faster_read_sector_record_chunks_mode( - &*plot_file, - sector_size, - )?; + let read_sector_record_chunks_mode = { + // Error doesn't matter here + let _permit = faster_read_sector_record_chunks_mode_concurrency + .acquire() + .await; + faster_read_sector_record_chunks_mode(&*plot_file, sector_size)? + }; let (error_sender, error_receiver) = oneshot::channel(); let error_sender = Arc::new(Mutex::new(Some(error_sender))); @@ -2063,12 +2070,59 @@ fn write_dummy_sector_metadata( } fn faster_read_sector_record_chunks_mode

( - _plot: &P, - _sector_size: usize, + plot: &P, + sector_size: usize, ) -> Result where - P: FileExt, + P: FileExt + Sync, { + info!("Benchmarking faster proving method"); + + let mut sector_bytes = vec![0u8; sector_size]; + + plot.read_exact_at(&mut sector_bytes, 0)?; + + if sector_bytes.iter().all(|byte| *byte == 0) { + thread_rng().fill_bytes(&mut sector_bytes); + plot.write_all_at(§or_bytes, 0)?; + } + + let mut fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks; + let mut fastest_time = Duration::MAX; + + for _ in 0..3 { + // A lot simplified version of concurrent chunks + { + let start = Instant::now(); + (0..Record::NUM_S_BUCKETS) + .into_par_iter() + .try_for_each(|_| { + let offset = thread_rng().gen_range(0_usize..sector_size / Scalar::FULL_BYTES) + * Scalar::FULL_BYTES; + plot.read_exact_at(&mut [0; Scalar::FULL_BYTES], offset as u64) + })?; + let elapsed = start.elapsed(); + + if fastest_time > elapsed { + fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks; + fastest_time = elapsed; + } + } + // Reading the whole sector at once + { + let start = Instant::now(); + plot.read_exact_at(&mut sector_bytes, 0)?; + let elapsed = start.elapsed(); + + if fastest_time > elapsed { + fastest_mode = ReadSectorRecordChunksMode::WholeSector; + fastest_time = elapsed; + } + } + } + + info!(?fastest_mode, "Faster proving method found"); + // TODO: Do quick benchmark and select correct value dynamically - Ok(ReadSectorRecordChunksMode::ConcurrentChunks) + Ok(fastest_mode) }