Skip to content

Commit

Permalink
Delay plotting after piece cache scan (#2590)
Browse files Browse the repository at this point in the history
* Switch back to fs2 due to macOS allocation bug

* Wait for local piece cache to be read before starting plotting

* Remove farmer cache initialization acknowledgement as redundant

* Tiny formatting fix
  • Loading branch information
nazar-pc authored Mar 8, 2024
1 parent 232cb06 commit 2e145fa
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 107 deletions.
186 changes: 107 additions & 79 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use anyhow::anyhow;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
use futures::channel::oneshot;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
Expand Down Expand Up @@ -627,95 +628,122 @@ where
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
.unwrap_or_else(recommended_number_of_farming_threads);

let single_disk_farms = tokio::task::block_in_place(|| {
let (single_disk_farms, plotting_delay_senders) = tokio::task::block_in_place(|| {
let handle = Handle::current();
let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len())
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, Vec<_>>();

disk_farms
let single_disk_farms = disk_farms
.into_par_iter()
.zip(plotting_delay_receivers)
.enumerate()
.map(move |(disk_farm_index, disk_farm)| {
let _tokio_handle_guard = handle.enter();

debug!(url = %node_rpc_url, %disk_farm_index, "Connecting to node RPC");
let node_client = handle.block_on(NodeRpcClient::new(&node_rpc_url))?;

let single_disk_farm_fut = SingleDiskFarm::new::<_, _, PosTable>(
SingleDiskFarmOptions {
directory: disk_farm.directory.clone(),
farmer_app_info: farmer_app_info.clone(),
allocated_space: disk_farm.allocated_plotting_space,
max_pieces_in_sector,
node_client,
reward_address,
kzg: kzg.clone(),
erasure_coding: erasure_coding.clone(),
piece_getter: piece_getter.clone(),
cache_percentage,
downloading_semaphore: Arc::clone(&downloading_semaphore),
record_encoding_concurrency,
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
disable_farm_locking,
},
disk_farm_index,
);
.map(
move |(disk_farm_index, (disk_farm, plotting_delay_receiver))| {
let _tokio_handle_guard = handle.enter();

debug!(url = %node_rpc_url, %disk_farm_index, "Connecting to node RPC");
let node_client = handle.block_on(NodeRpcClient::new(&node_rpc_url))?;

let single_disk_farm_fut = SingleDiskFarm::new::<_, _, PosTable>(
SingleDiskFarmOptions {
directory: disk_farm.directory.clone(),
farmer_app_info: farmer_app_info.clone(),
allocated_space: disk_farm.allocated_plotting_space,
max_pieces_in_sector,
node_client,
reward_address,
kzg: kzg.clone(),
erasure_coding: erasure_coding.clone(),
piece_getter: piece_getter.clone(),
cache_percentage,
downloading_semaphore: Arc::clone(&downloading_semaphore),
record_encoding_concurrency,
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
plotting_delay: Some(plotting_delay_receiver),
disable_farm_locking,
},
disk_farm_index,
);

let single_disk_farm = match handle.block_on(single_disk_farm_fut) {
Ok(single_disk_farm) => single_disk_farm,
Err(SingleDiskFarmError::InsufficientAllocatedSpace {
min_space,
allocated_space,
}) => {
return Err(anyhow::anyhow!(
"Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \
{} bytes to be exact)",
bytesize::to_string(allocated_space, true),
bytesize::to_string(allocated_space, false),
bytesize::to_string(min_space, true),
bytesize::to_string(min_space, false),
min_space
));
}
Err(error) => {
return Err(error.into());
let single_disk_farm = match handle.block_on(single_disk_farm_fut) {
Ok(single_disk_farm) => single_disk_farm,
Err(SingleDiskFarmError::InsufficientAllocatedSpace {
min_space,
allocated_space,
}) => {
return Err(anyhow::anyhow!(
"Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \
{} bytes to be exact)",
bytesize::to_string(allocated_space, true),
bytesize::to_string(allocated_space, false),
bytesize::to_string(min_space, true),
bytesize::to_string(min_space, false),
min_space
));
}
Err(error) => {
return Err(error.into());
}
};

if !no_info {
let info = single_disk_farm.info();
println!("Single disk farm {disk_farm_index}:");
println!(" ID: {}", info.id());
println!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash()));
println!(" Public key: 0x{}", hex::encode(info.public_key()));
println!(
" Allocated space: {} ({})",
bytesize::to_string(info.allocated_space(), true),
bytesize::to_string(info.allocated_space(), false)
);
println!(" Directory: {}", disk_farm.directory.display());
}
};

if !no_info {
let info = single_disk_farm.info();
println!("Single disk farm {disk_farm_index}:");
println!(" ID: {}", info.id());
println!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash()));
println!(" Public key: 0x{}", hex::encode(info.public_key()));
println!(
" Allocated space: {} ({})",
bytesize::to_string(info.allocated_space(), true),
bytesize::to_string(info.allocated_space(), false)
);
println!(" Directory: {}", disk_farm.directory.display());
}
Ok(single_disk_farm)
},
)
.collect::<Result<Vec<_>, _>>()?;

Ok(single_disk_farm)
})
.collect::<Result<Vec<_>, _>>()
anyhow::Ok((single_disk_farms, plotting_delay_senders))
})?;

// Acknowledgement is not necessary
drop(
farmer_cache
.replace_backing_caches(
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.piece_cache())
.collect(),
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.plot_cache())
.collect(),
)
.await,
);
{
let handler_id = Arc::new(Mutex::new(None));
// Wait for piece cache to read already cached contents before starting plotting to improve
// cache hit ratio
handler_id
.lock()
.replace(farmer_cache.on_sync_progress(Arc::new({
let handler_id = Arc::clone(&handler_id);
let plotting_delay_senders = Mutex::new(plotting_delay_senders);

move |_progress| {
for plotting_delay_sender in plotting_delay_senders.lock().drain(..) {
// Doesn't matter if receiver is gone
let _ = plotting_delay_sender.send(());
}

// Unsubscribe from this event
handler_id.lock().take();
}
})));
}
farmer_cache
.replace_backing_caches(
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.piece_cache())
.collect(),
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.plot_cache())
.collect(),
)
.await;
drop(farmer_cache);

// Store piece readers so we can reference them later
Expand Down
30 changes: 6 additions & 24 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset};
use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult};
use crate::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::oneshot;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
use parking_lot::RwLock;
Expand Down Expand Up @@ -52,7 +51,6 @@ struct DiskPieceCacheState {
enum WorkerCommand {
ReplaceBackingCaches {
new_piece_caches: Vec<DiskPieceCache>,
acknowledgement: oneshot::Sender<()>,
},
ForgetKey {
key: RecordKey,
Expand Down Expand Up @@ -101,15 +99,11 @@ where
.take()
.expect("Always set during worker instantiation");

if let Some(WorkerCommand::ReplaceBackingCaches {
new_piece_caches,
acknowledgement,
}) = worker_receiver.recv().await
if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
worker_receiver.recv().await
{
self.initialize(&piece_getter, &mut worker_state, new_piece_caches)
.await;
// Doesn't matter if receiver is still waiting for acknowledgement
let _ = acknowledgement.send(());
} else {
// Piece cache is dropped before backing caches were sent
return;
Expand Down Expand Up @@ -162,14 +156,9 @@ where
PG: PieceGetter,
{
match command {
WorkerCommand::ReplaceBackingCaches {
new_piece_caches,
acknowledgement,
} => {
WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
self.initialize(piece_getter, worker_state, new_piece_caches)
.await;
// Doesn't matter if receiver is still waiting for acknowledgement
let _ = acknowledgement.send(());
}
// TODO: Consider implementing optional re-sync of the piece instead of just forgetting
WorkerCommand::ForgetKey { key } => {
Expand Down Expand Up @@ -932,28 +921,21 @@ impl FarmerCache {
}
}

/// Initialize replacement of backing caches, returns acknowledgement receiver that can be used
/// to identify when cache initialization has finished
/// Initialize replacement of backing caches
pub async fn replace_backing_caches(
&self,
new_piece_caches: Vec<DiskPieceCache>,
new_plot_caches: Vec<DiskPlotCache>,
) -> oneshot::Receiver<()> {
let (sender, receiver) = oneshot::channel();
) {
if let Err(error) = self
.worker_sender
.send(WorkerCommand::ReplaceBackingCaches {
new_piece_caches,
acknowledgement: sender,
})
.send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
.await
{
warn!(%error, "Failed to replace backing caches, worker exited");
}

*self.plot_caches.write() = new_plot_caches;

receiver
}

/// Subscribe to cache sync notifications
Expand Down
36 changes: 32 additions & 4 deletions crates/subspace-farmer/src/farmer_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,21 @@ async fn basic() {
let farmer_cache_worker_exited =
tokio::spawn(farmer_cache_worker.run(piece_getter.clone()));

let initialized_fut = farmer_cache
let (sender, receiver) = oneshot::channel();
farmer_cache
.on_sync_progress(Arc::new({
let sender = Mutex::new(Some(sender));

move |progress| {
if *progress == 100.0 {
if let Some(sender) = sender.lock().take() {
sender.send(()).unwrap();
}
}
}
}))
.detach();
farmer_cache
.replace_backing_caches(
vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
Expand All @@ -201,7 +215,7 @@ async fn basic() {
.await;

// Wait for piece cache to be initialized
initialized_fut.await.unwrap();
receiver.await.unwrap();

// These 2 pieces are requested from node during initialization
{
Expand Down Expand Up @@ -375,8 +389,22 @@ async fn basic() {

let farmer_cache_worker_exited = tokio::spawn(farmer_cache_worker.run(piece_getter));

let (sender, receiver) = oneshot::channel();
farmer_cache
.on_sync_progress(Arc::new({
let sender = Mutex::new(Some(sender));

move |progress| {
if *progress == 100.0 {
if let Some(sender) = sender.lock().take() {
sender.send(()).unwrap();
}
}
}
}))
.detach();
// Reopen with the same backing caches
let initialized_fut = farmer_cache
farmer_cache
.replace_backing_caches(
vec![
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
Expand All @@ -388,7 +416,7 @@ async fn basic() {
drop(farmer_cache);

// Wait for piece cache to be initialized
initialized_fut.await.unwrap();
receiver.await.unwrap();

// Same state as before, no pieces should be requested during initialization
assert_eq!(pieces.lock().len(), 0);
Expand Down
11 changes: 11 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ pub struct SingleDiskFarmOptions<NC, PG> {
pub farming_thread_pool_size: usize,
/// Thread pool manager used for plotting
pub plotting_thread_pool_manager: PlottingThreadPoolManager,
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
/// Disable farm locking, for example if file system doesn't support it
pub disable_farm_locking: bool,
}
Expand Down Expand Up @@ -621,6 +624,7 @@ impl SingleDiskFarm {
record_encoding_concurrency,
farming_thread_pool_size,
plotting_thread_pool_manager,
plotting_delay,
farm_during_initial_plotting,
disable_farm_locking,
} = options;
Expand Down Expand Up @@ -994,6 +998,13 @@ impl SingleDiskFarm {
return Ok(());
}

if let Some(plotting_delay) = plotting_delay {
if plotting_delay.await.is_err() {
// Dropped before resolving
return Ok(());
}
}

plotting::<_, _, PosTable>(plotting_options).await
};

Expand Down

0 comments on commit 2e145fa

Please sign in to comment.