diff --git a/Cargo.lock b/Cargo.lock index b2335386b54..389a61f0359 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -948,9 +948,9 @@ dependencies = [ [[package]] name = "async-nats" -version = "0.34.0" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eea7b126ebfa4db78e9e788b2a792b6329f35b4f2fdd56dbc646dedc2beec7a5" +checksum = "d5e47d2f7305524258908449aff6c86db36697a9b4219bfb1777e0ca1945358d" dependencies = [ "base64 0.22.0", "bytes", @@ -973,7 +973,7 @@ dependencies = [ "thiserror", "time", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls 0.26.0", "tracing", "tryhard", "url", @@ -9453,6 +9453,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afabcee0551bd1aa3e18e5adbf2c0544722014b899adb31bd186ec638d3da97e" +dependencies = [ + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.3", + "subtle 2.5.0", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -13383,6 +13397,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.5", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index b33c483d2d8..82c6c0f3e52 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -14,7 +14,7 @@ include = [ [dependencies] anyhow = "1.0.82" async-lock = "3.3.0" -async-nats = "0.34.0" +async-nats = "0.35.0" async-trait = "0.1.80" backoff = { version = "0.4.0", features = ["futures", "tokio"] } base58 = "0.2.0" diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs index 5f5ad446651..0d6936e9c83 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands.rs @@ -1,4 +1,5 @@ pub(crate) mod benchmark; +pub(crate) mod cluster; pub(crate) mod farm; mod info; mod scrub; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs new file mode 100644 index 00000000000..056e82d93f8 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs @@ -0,0 +1,140 @@ +mod cache; +mod controller; +mod farmer; +mod plotter; + +use crate::commands::cluster::cache::{cache, CacheArgs}; +use crate::commands::cluster::controller::{controller, ControllerArgs}; +use crate::commands::cluster::farmer::{farmer, FarmerArgs}; +use crate::commands::cluster::plotter::{plotter, PlotterArgs}; +use crate::utils::shutdown_signal; +use anyhow::anyhow; +use async_nats::ServerAddr; +use backoff::ExponentialBackoff; +use clap::{Parser, Subcommand}; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, StreamExt}; +use prometheus_client::registry::Registry; +use std::net::SocketAddr; +use std::num::NonZeroUsize; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::utils::AsyncJoinOnDrop; +use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; +use subspace_proof_of_space::Table; + +/// Arguments for cluster +#[derive(Debug, Parser)] +pub(crate) struct ClusterArgs { + /// Shared arguments for all subcommands + #[clap(flatten)] + shared_args: SharedArgs, + /// Cluster subcommands + #[clap(subcommand)] + subcommand: ClusterSubcommand, +} + +/// Shared arguments +#[derive(Debug, Parser)] +struct SharedArgs { + /// NATS server address, typically in `nats://server1:port1` format, can be specified multiple + /// times. + /// + /// NOTE: NATS must be configured for message sizes of 2MiB or larger (1MiB is the default), + /// which can be done by starting NATS server with config file containing `max_payload = 2MB`. + #[arg(long, alias = "nats-server", required = true)] + nats_servers: Vec, + /// Size of connection pool of NATS clients. + /// + /// Pool size can be increased in case of large number of farms or high plotting capacity of + /// this instance. + #[arg(long, default_value = "16")] + nats_pool_size: NonZeroUsize, + /// Defines endpoints for the prometheus metrics server. It doesn't start without at least + /// one specified endpoint. Format: 127.0.0.1:8080 + #[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])] + prometheus_listen_on: Vec, +} + +/// Cluster subcommands +#[derive(Debug, Subcommand)] +enum ClusterSubcommand { + /// Farming cluster controller + Controller(ControllerArgs), + /// Farming cluster farmer + Farmer(FarmerArgs), + /// Farming cluster plotter + Plotter(PlotterArgs), + /// Farming cluster cache + Cache(CacheArgs), +} + +pub(crate) async fn cluster(cluster_args: ClusterArgs) -> anyhow::Result<()> +where + PosTable: Table, +{ + let signal = shutdown_signal(); + + let ClusterArgs { + shared_args, + subcommand, + } = cluster_args; + let SharedArgs { + nats_servers, + nats_pool_size, + prometheus_listen_on, + } = shared_args; + + let nats_client = NatsClient::new( + nats_servers, + ExponentialBackoff { + max_elapsed_time: None, + ..ExponentialBackoff::default() + }, + nats_pool_size, + ) + .await + .map_err(|error| anyhow!("Failed to connect to NATS server: {error}"))?; + let mut registry = Registry::default(); + + let mut tasks = FuturesUnordered::new(); + + // TODO: Support running multiple components at once + tasks.push(match subcommand { + ClusterSubcommand::Controller(controller_args) => { + controller(nats_client, &mut registry, controller_args).await? + } + ClusterSubcommand::Farmer(farmer_args) => { + farmer::(nats_client, &mut registry, farmer_args).await? + } + ClusterSubcommand::Plotter(plotter_args) => { + plotter::(nats_client, &mut registry, plotter_args).await? + } + ClusterSubcommand::Cache(cache_args) => { + cache(nats_client, &mut registry, cache_args).await? + } + }); + + if !prometheus_listen_on.is_empty() { + let prometheus_task = start_prometheus_metrics_server( + prometheus_listen_on, + RegistryAdapter::PrometheusClient(registry), + )?; + + let join_handle = tokio::spawn(prometheus_task); + tasks.push(Box::pin(async move { + Ok(AsyncJoinOnDrop::new(join_handle, true).await??) + })); + } + + select! { + // Signal future + _ = signal.fuse() => { + Ok(()) + }, + + // Run future + result = tasks.next() => { + result.expect("List of tasks is not empty; qed") + }, + } +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs new file mode 100644 index 00000000000..75019a8e84f --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs @@ -0,0 +1,176 @@ +use anyhow::anyhow; +use bytesize::ByteSize; +use clap::Parser; +use prometheus_client::registry::Registry; +use std::fs; +use std::future::Future; +use std::path::PathBuf; +use std::pin::Pin; +use std::str::FromStr; +use std::time::Duration; +use subspace_farmer::cluster::cache::cache_service; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::piece_cache::PieceCache; + +/// Interval between cache self-identification broadcast messages +pub(super) const CACHE_IDENTIFICATION_BROADCAST_INTERVAL: Duration = Duration::from_secs(5); + +#[derive(Debug, Clone)] +struct DiskCache { + /// Path to directory where cache is stored + directory: PathBuf, + /// How much space in bytes can cache use + allocated_space: u64, +} + +impl FromStr for DiskCache { + type Err = String; + + #[inline] + fn from_str(s: &str) -> anyhow::Result { + let parts = s.split(',').collect::>(); + if parts.len() != 2 { + return Err("Must contain 2 coma-separated components".to_string()); + } + + let mut plot_directory = None; + let mut allocated_space = None; + + for part in parts { + let part = part.splitn(2, '=').collect::>(); + if part.len() != 2 { + return Err("Each component must contain = separating key from value".to_string()); + } + + let key = *part.first().expect("Length checked above; qed"); + let value = *part.get(1).expect("Length checked above; qed"); + + match key { + "path" => { + plot_directory.replace(PathBuf::from(value)); + } + "size" => { + allocated_space.replace( + value + .parse::() + .map_err(|error| { + format!("Failed to parse `size` \"{value}\": {error}") + })? + .as_u64(), + ); + } + key => { + return Err(format!( + "Key \"{key}\" is not supported, only `path` or `size`" + )); + } + } + } + + Ok(DiskCache { + directory: plot_directory.ok_or( + "`path` key is required with path to directory where cache will be stored", + )?, + allocated_space: allocated_space + .ok_or("`size` key is required with allocated amount of disk space")?, + }) + } +} + +/// Arguments for cache +#[derive(Debug, Parser)] +pub(super) struct CacheArgs { + /// One or more caches located at specified path, each with its own allocated space. + /// + /// Format for each cache is coma-separated list of strings like this: + /// + /// path=/path/to/directory,size=5T + /// + /// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that + /// cache will make sure to not exceed (and will pre-allocated all the space on startup to + /// ensure it will not run out of space in runtime). + disk_caches: Vec, + /// Run temporary cache with specified farm size in human-readable format (e.g. 10GB, 2TiB) or + /// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the + /// end of the process. + #[arg(long, conflicts_with = "disk_caches")] + tmp: Option, + /// Cache group to use, the same cache group must be also specified on corresponding controller + #[arg(long, default_value = "default")] + cache_group: String, +} + +pub(super) async fn cache( + nats_client: NatsClient, + _registry: &mut Registry, + cache_args: CacheArgs, +) -> anyhow::Result>>>> { + let CacheArgs { + mut disk_caches, + tmp, + cache_group, + } = cache_args; + + let tmp_directory = if let Some(plot_size) = tmp { + let tmp_directory = tempfile::Builder::new() + .prefix("subspace-cache-") + .tempdir()?; + + disk_caches = vec![DiskCache { + directory: tmp_directory.as_ref().to_path_buf(), + allocated_space: plot_size.as_u64(), + }]; + + Some(tmp_directory) + } else { + if disk_caches.is_empty() { + return Err(anyhow!("There must be at least one disk cache provided")); + } + + for cache in &disk_caches { + if !cache.directory.exists() { + if let Err(error) = fs::create_dir(&cache.directory) { + return Err(anyhow!( + "Directory {} doesn't exist and can't be created: {}", + cache.directory.display(), + error + )); + } + } + } + None + }; + + // TODO: Metrics + + let caches = disk_caches + .iter() + .map(|disk_cache| { + PieceCache::open( + &disk_cache.directory, + u32::try_from(disk_cache.allocated_space / PieceCache::element_size() as u64) + .unwrap_or(u32::MAX), + ) + .map_err(|error| { + anyhow!( + "Failed to open piece cache at {}: {error}", + disk_cache.directory.display() + ) + }) + }) + .collect::, _>>()?; + + Ok(Box::pin(async move { + cache_service( + nats_client, + &caches, + &cache_group, + CACHE_IDENTIFICATION_BROADCAST_INTERVAL, + ) + .await?; + + drop(tmp_directory); + + Ok(()) + })) +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs new file mode 100644 index 00000000000..da4e11c3359 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -0,0 +1,248 @@ +mod caches; +mod farms; + +use crate::commands::cluster::controller::caches::maintain_caches; +use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex}; +use crate::commands::shared::derive_libp2p_keypair; +use crate::commands::shared::network::{configure_network, NetworkArgs}; +use anyhow::anyhow; +use async_lock::RwLock as AsyncRwLock; +use backoff::ExponentialBackoff; +use clap::{Parser, ValueHint}; +use futures::{select, FutureExt}; +use prometheus_client::registry::Registry; +use std::future::Future; +use std::path::PathBuf; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::Duration; +use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; +use subspace_farmer::cluster::controller::controller_service; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::farmer_cache::FarmerCache; +use subspace_farmer::node_client::node_rpc_client::NodeRpcClient; +use subspace_farmer::node_client::NodeClient; +use subspace_farmer::utils::farmer_piece_getter::{DsnCacheRetryPolicy, FarmerPieceGetter}; +use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; +use subspace_farmer::utils::plotted_pieces::PlottedPieces; +use subspace_farmer::utils::run_future_in_dedicated_thread; +use subspace_farmer::Identity; +use subspace_networking::utils::piece_provider::PieceProvider; +use tracing::info; + +/// Get piece retry attempts number. +const PIECE_GETTER_MAX_RETRIES: u16 = 7; +/// Defines initial duration between get_piece calls. +const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(5); +/// Defines max duration between get_piece calls. +const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40); + +// TODO: Metrics for controller and sub-components +/// Arguments for controller +#[derive(Debug, Parser)] +pub(super) struct ControllerArgs { + /// Base path where to store P2P network identity + #[arg(long, value_hint = ValueHint::DirPath)] + base_path: Option, + /// WebSocket RPC URL of the Subspace node to connect to + #[arg(long, value_hint = ValueHint::Url, default_value = "ws://127.0.0.1:9944")] + node_rpc_url: String, + /// Cache group managed by this controller, each controller must have its dedicated cache group. + /// + /// It is strongly recommended to use alphanumeric values for cache group, the same cache group + /// must be also specified on corresponding caches. + #[arg(long, default_value = "default")] + cache_group: String, + /// Network parameters + #[clap(flatten)] + network_args: NetworkArgs, + /// Sets some flags that are convenient during development, currently `--allow-private-ips` + #[arg(long)] + dev: bool, + /// Run temporary controller identity + #[arg(long, conflicts_with = "base_path")] + tmp: bool, +} + +pub(super) async fn controller( + nats_client: NatsClient, + registry: &mut Registry, + controller_args: ControllerArgs, +) -> anyhow::Result>>>> { + let ControllerArgs { + base_path, + node_rpc_url, + cache_group, + mut network_args, + dev, + tmp, + } = controller_args; + + // Override flags with `--dev` + network_args.allow_private_ips = network_args.allow_private_ips || dev; + + let (base_path, tmp_directory) = if tmp { + let tmp_directory = tempfile::Builder::new() + .prefix("subspace-cluster-controller-") + .tempdir()?; + + (tmp_directory.as_ref().to_path_buf(), Some(tmp_directory)) + } else { + let Some(base_path) = base_path else { + return Err(anyhow!("--base-path must be specified explicitly")); + }; + + (base_path, None) + }; + + let plotted_pieces = Arc::new(AsyncRwLock::new(PlottedPieces::::default())); + + info!(url = %node_rpc_url, "Connecting to node RPC"); + let node_client = NodeRpcClient::new(&node_rpc_url).await?; + + let farmer_app_info = node_client + .farmer_app_info() + .await + .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; + + let identity = Identity::open_or_create(&base_path) + .map_err(|error| anyhow!("Failed to open or create identity: {error}"))?; + let keypair = derive_libp2p_keypair(identity.secret_key()); + let peer_id = keypair.public().to_peer_id(); + let instance = peer_id.to_string(); + + let (farmer_cache, farmer_cache_worker) = FarmerCache::new(node_client.clone(), peer_id); + + // TODO: Metrics + + let (node, mut node_runner) = { + if network_args.bootstrap_nodes.is_empty() { + network_args + .bootstrap_nodes + .clone_from(&farmer_app_info.dsn_bootstrap_nodes); + } + + configure_network( + hex::encode(farmer_app_info.genesis_hash), + &base_path, + keypair, + network_args, + Arc::downgrade(&plotted_pieces), + node_client.clone(), + farmer_cache.clone(), + Some(registry), + )? + }; + + let kzg = Kzg::new(embedded_kzg_settings()); + let validator = Some(SegmentCommitmentPieceValidator::new( + node.clone(), + node_client.clone(), + kzg.clone(), + )); + let piece_provider = PieceProvider::new(node.clone(), validator.clone()); + + let piece_getter = FarmerPieceGetter::new( + piece_provider, + farmer_cache.clone(), + node_client.clone(), + Arc::clone(&plotted_pieces), + DsnCacheRetryPolicy { + max_retries: PIECE_GETTER_MAX_RETRIES, + backoff: ExponentialBackoff { + initial_interval: GET_PIECE_INITIAL_INTERVAL, + max_interval: GET_PIECE_MAX_INTERVAL, + // Try until we get a valid piece + max_elapsed_time: None, + multiplier: 1.75, + ..ExponentialBackoff::default() + }, + }, + ); + + let farmer_cache_worker_fut = run_future_in_dedicated_thread( + { + let future = farmer_cache_worker.run(piece_getter.downgrade()); + + move || future + }, + "controller-cache-worker".to_string(), + )?; + + let controller_service_fut = run_future_in_dedicated_thread( + { + let nats_client = nats_client.clone(); + let instance = instance.clone(); + + move || async move { + controller_service(&nats_client, &node_client, &piece_getter, &instance).await + } + }, + "controller-service".to_string(), + )?; + + let farms_fut = run_future_in_dedicated_thread( + { + let nats_client = nats_client.clone(); + + move || async move { maintain_farms(&instance, &nats_client, &plotted_pieces).await } + }, + "controller-farms".to_string(), + )?; + + let caches_fut = run_future_in_dedicated_thread( + move || async move { maintain_caches(&cache_group, &nats_client, farmer_cache).await }, + "controller-caches".to_string(), + )?; + + let networking_fut = run_future_in_dedicated_thread( + move || async move { node_runner.run().await }, + "controller-networking".to_string(), + )?; + + Ok(Box::pin(async move { + // This defines order in which things are dropped + let networking_fut = networking_fut; + let farms_fut = farms_fut; + let caches_fut = caches_fut; + let farmer_cache_worker_fut = farmer_cache_worker_fut; + let controller_service_fut = controller_service_fut; + + let networking_fut = pin!(networking_fut); + let farms_fut = pin!(farms_fut); + let caches_fut = pin!(caches_fut); + let farmer_cache_worker_fut = pin!(farmer_cache_worker_fut); + let controller_service_fut = pin!(controller_service_fut); + + select! { + // Networking future + _ = networking_fut.fuse() => { + info!("Node runner exited.") + }, + + // Farms future + result = farms_fut.fuse() => { + result??; + }, + + // Caches future + result = caches_fut.fuse() => { + result??; + }, + + // Piece cache worker future + _ = farmer_cache_worker_fut.fuse() => { + info!("Farmer cache worker exited.") + }, + + // Controller service future + result = controller_service_fut.fuse() => { + result??; + }, + } + + drop(tmp_directory); + + Ok(()) + })) +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs new file mode 100644 index 00000000000..543eeeeda77 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs @@ -0,0 +1,201 @@ +//! This module exposed implementation of caches maintenance. +//! +//! The goal is to observe caches in a particular cache group and keep controller's data structures +//! about which pieces are stored where up to date. Implementation automatically handles dynamic +//! cache addition and removal, tries to reduce number of reinitializations that result in potential +//! piece cache sync, etc. + +use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL; +use anyhow::anyhow; +use futures::channel::oneshot; +use futures::future::FusedFuture; +use futures::{select, FutureExt, StreamExt}; +use parking_lot::Mutex; +use std::future::{ready, Future}; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use subspace_farmer::cluster::cache::{ + ClusterCacheId, ClusterCacheIdentifyBroadcast, ClusterPieceCache, +}; +use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::farm::PieceCache; +use subspace_farmer::farmer_cache::FarmerCache; +use tokio::time::MissedTickBehavior; +use tracing::{info, trace, warn}; + +const SCHEDULE_REINITIALIZATION_DELAY: Duration = Duration::from_secs(3); + +#[derive(Debug)] +struct KnownCache { + cache_id: ClusterCacheId, + last_identification: Instant, + piece_cache: Arc, +} + +#[derive(Debug, Default)] +struct KnownCaches { + known_caches: Vec, +} + +impl KnownCaches { + fn get_all(&self) -> Vec> { + self.known_caches + .iter() + .map(|known_cache| Arc::clone(&known_cache.piece_cache) as Arc<_>) + .collect() + } + + /// Return `true` if farmer cache reinitialization is required + fn update( + &mut self, + cache_id: ClusterCacheId, + max_num_elements: u32, + nats_client: &NatsClient, + ) -> bool { + if self.known_caches.iter_mut().any(|known_cache| { + if known_cache.cache_id == cache_id { + known_cache.last_identification = Instant::now(); + true + } else { + false + } + }) { + return false; + } + + let piece_cache = Arc::new(ClusterPieceCache::new( + cache_id, + max_num_elements, + nats_client.clone(), + )); + self.known_caches.push(KnownCache { + cache_id, + last_identification: Instant::now(), + piece_cache, + }); + true + } + + fn remove_expired(&mut self) -> impl Iterator + '_ { + self.known_caches.extract_if(|known_cache| { + known_cache.last_identification.elapsed() > CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2 + }) + } +} + +pub(super) async fn maintain_caches( + cache_group: &str, + nats_client: &NatsClient, + farmer_cache: FarmerCache, +) -> anyhow::Result<()> { + let mut known_caches = KnownCaches::default(); + + let mut scheduled_reinitialization_for = None; + // Farm that is being added/removed right now (if any) + let mut cache_reinitialization = + (Box::pin(ready(())) as Pin>>).fuse(); + + let cache_identify_subscription = pin!(nats_client + .subscribe_to_broadcasts::(None, None) + .await + .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?); + + // Request cache to identify themselves + if let Err(error) = nats_client + .broadcast(&ClusterControllerCacheIdentifyBroadcast, cache_group) + .await + { + warn!(%error, "Failed to send cache identification broadcast"); + } + + let mut cache_identify_subscription = cache_identify_subscription.fuse(); + let mut cache_pruning_interval = tokio::time::interval_at( + (Instant::now() + CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), + CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2, + ); + cache_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + if cache_reinitialization.is_terminated() + && let Some(time) = scheduled_reinitialization_for + && time >= Instant::now() + { + scheduled_reinitialization_for.take(); + + let new_piece_caches = known_caches.get_all(); + let new_cache_reinitialization = async { + let (sync_finish_sender, sync_finish_receiver) = oneshot::channel::<()>(); + let sync_finish_sender = Mutex::new(Some(sync_finish_sender)); + + let _handler_id = farmer_cache.on_sync_progress(Arc::new(move |&progress| { + if progress == 100.0 { + if let Some(sync_finish_sender) = sync_finish_sender.lock().take() { + // Result doesn't matter + let _ = sync_finish_sender.send(()); + } + } + })); + + farmer_cache + .replace_backing_caches(new_piece_caches, Vec::new()) + .await; + + // Wait for piece cache sync to finish before potentially staring a new one, result + // doesn't matter + let _ = sync_finish_receiver.await; + }; + + cache_reinitialization = + (Box::pin(new_cache_reinitialization) as Pin>>).fuse(); + } + + select! { + maybe_identify_message = cache_identify_subscription.next() => { + let Some(identify_message) = maybe_identify_message else { + return Err(anyhow!("Cache identify stream ended")); + }; + + let ClusterCacheIdentifyBroadcast { + cache_id, + max_num_elements, + } = identify_message; + if known_caches.update(cache_id, max_num_elements, nats_client) { + info!( + %cache_id, + "New cache discovered, scheduling reinitialization" + ); + scheduled_reinitialization_for.replace( + Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, + ); + } else { + trace!( + %cache_id, + "Received identification for already known cache" + ); + } + } + _ = cache_pruning_interval.tick().fuse() => { + let mut reinit = false; + for removed_cache in known_caches.remove_expired() { + reinit = true; + + warn!( + cache_id = %removed_cache.cache_id, + "Cache expired and removed, scheduling reinitialization" + ); + } + + if reinit { + scheduled_reinitialization_for.replace( + Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, + ); + } + } + _ = cache_reinitialization => { + // Nothing left to do + } + } + } +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs new file mode 100644 index 00000000000..a85c43b3842 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -0,0 +1,393 @@ +//! This module exposed implementation of farms maintenance. +//! +//! The goal is to observe farms in a cluster and keep controller's data structures +//! about which pieces are plotted in which sectors of which farm up to date. Implementation +//! automatically handles dynamic farm addition and removal, etc. + +use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL; +use anyhow::anyhow; +use async_lock::RwLock as AsyncRwLock; +use futures::channel::oneshot; +use futures::future::FusedFuture; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, StreamExt}; +use parking_lot::Mutex; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::future::{pending, ready, Future}; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::Instant; +use subspace_core_primitives::{Blake3Hash, SectorIndex}; +use subspace_farmer::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; +use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast}; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; +use subspace_farmer::utils::plotted_pieces::PlottedPieces; +use tokio::time::MissedTickBehavior; +use tracing::{error, info, trace, warn}; + +type AddRemoveFuture<'a> = + Pin, Box)>> + 'a>>; + +pub(super) type FarmIndex = u16; + +#[derive(Debug)] +struct KnownFarm { + farm_id: FarmId, + fingerprint: Blake3Hash, + last_identification: Instant, + expired_sender: oneshot::Sender<()>, +} + +enum KnownFarmInsertResult { + Inserted { + farm_index: FarmIndex, + expired_receiver: oneshot::Receiver<()>, + }, + FingerprintUpdated { + farm_index: FarmIndex, + expired_receiver: oneshot::Receiver<()>, + }, + NotInserted, +} + +#[derive(Debug, Default)] +struct KnownFarms { + known_farms: HashMap, +} + +impl KnownFarms { + fn insert_or_update( + &mut self, + farm_id: FarmId, + fingerprint: Blake3Hash, + ) -> KnownFarmInsertResult { + if let Some(existing_result) = + self.known_farms + .iter_mut() + .find_map(|(&farm_index, known_farm)| { + if known_farm.farm_id == farm_id { + if known_farm.fingerprint == fingerprint { + known_farm.last_identification = Instant::now(); + Some(KnownFarmInsertResult::NotInserted) + } else { + let (expired_sender, expired_receiver) = oneshot::channel(); + + known_farm.fingerprint = fingerprint; + known_farm.expired_sender = expired_sender; + + Some(KnownFarmInsertResult::FingerprintUpdated { + farm_index, + expired_receiver, + }) + } + } else { + None + } + }) + { + return existing_result; + } + + for farm_index in FarmIndex::MIN..=FarmIndex::MAX { + if let Entry::Vacant(entry) = self.known_farms.entry(farm_index) { + let (expired_sender, expired_receiver) = oneshot::channel(); + + entry.insert(KnownFarm { + farm_id, + fingerprint, + last_identification: Instant::now(), + expired_sender, + }); + + return KnownFarmInsertResult::Inserted { + farm_index, + expired_receiver, + }; + } + } + + warn!(%farm_id, max_supported_farm_index = %FarmIndex::MAX, "Too many farms, ignoring"); + KnownFarmInsertResult::NotInserted + } + + fn remove_expired(&mut self) -> impl Iterator + '_ { + self.known_farms.extract_if(|_farm_index, known_farm| { + known_farm.last_identification.elapsed() > FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2 + }) + } + + fn remove(&mut self, farm_index: FarmIndex) { + self.known_farms.remove(&farm_index); + } +} + +pub(super) async fn maintain_farms( + instance: &str, + nats_client: &NatsClient, + plotted_pieces: &AsyncRwLock>, +) -> anyhow::Result<()> { + let mut known_farms = KnownFarms::default(); + + // Futures that need to be processed sequentially in order to add/remove farms, if farm was + // added, future will resolve with `Some`, `None` if removed + let mut farms_to_add_remove = VecDeque::::new(); + // Farm that is being added/removed right now (if any) + let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture).fuse(); + // Initialize with pending future so it never ends + let mut farms = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin)>>> + ]); + + let farmer_identify_subscription = pin!(nats_client + .subscribe_to_broadcasts::(None, None) + .await + .map_err(|error| anyhow!( + "Failed to subscribe to farmer identify farm broadcast: {error}" + ))?); + + // Request farmer to identify themselves + if let Err(error) = nats_client + .broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance) + .await + { + warn!(%error, "Failed to send farmer identification broadcast"); + } + + let mut farmer_identify_subscription = farmer_identify_subscription.fuse(); + let mut farm_pruning_interval = tokio::time::interval_at( + (Instant::now() + FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), + FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2, + ); + farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + if farm_add_remove_in_progress.is_terminated() { + if let Some(fut) = farms_to_add_remove.pop_front() { + farm_add_remove_in_progress = fut.fuse(); + } + } + + select! { + (farm_index, result) = farms.select_next_some() => { + known_farms.remove(farm_index); + farms_to_add_remove.push_back(Box::pin(async move { + plotted_pieces.write().await.delete_farm(farm_index); + + None + })); + + match result { + Ok(()) => { + info!(%farm_index, "Farm exited successfully"); + } + Err(error) => { + error!(%farm_index, %error, "Farm exited with error"); + } + } + } + maybe_identify_message = farmer_identify_subscription.next() => { + let Some(identify_message) = maybe_identify_message else { + return Err(anyhow!("Farmer identify stream ended")); + }; + + process_farm_identify_message( + identify_message, + nats_client, + &mut known_farms, + &mut farms_to_add_remove, + plotted_pieces, + ); + } + _ = farm_pruning_interval.tick().fuse() => { + for (farm_index, removed_farm) in known_farms.remove_expired() { + if removed_farm.expired_sender.send(()).is_ok() { + warn!( + %farm_index, + farm_id = %removed_farm.farm_id, + "Farm expired and removed" + ); + } else { + warn!( + %farm_index, + farm_id = %removed_farm.farm_id, + "Farm exited before expiration notification" + ); + } + plotted_pieces.write().await.delete_farm(farm_index); + } + } + result = farm_add_remove_in_progress => { + if let Some((farm_index, expired_receiver, farm)) = result { + farms.push(Box::pin(async move { + select! { + result = farm.run().fuse() => { + (farm_index, result) + } + _ = expired_receiver.fuse() => { + // Nothing to do + (farm_index, Ok(())) + } + } + })); + } + } + } + } +} + +fn process_farm_identify_message<'a>( + identify_message: ClusterFarmerIdentifyFarmBroadcast, + nats_client: &'a NatsClient, + known_farms: &mut KnownFarms, + farms_to_add_remove: &mut VecDeque>, + plotted_pieces: &'a AsyncRwLock>, +) { + let ClusterFarmerIdentifyFarmBroadcast { + farm_id, + total_sectors_count, + fingerprint, + } = identify_message; + let (farm_index, expired_receiver, add, remove) = + match known_farms.insert_or_update(farm_id, fingerprint) { + KnownFarmInsertResult::Inserted { + farm_index, + expired_receiver, + } => { + info!( + %farm_index, + %farm_id, + "Discovered new farm, initializing" + ); + + (farm_index, expired_receiver, true, false) + } + KnownFarmInsertResult::FingerprintUpdated { + farm_index, + expired_receiver, + } => { + info!( + %farm_index, + %farm_id, + "Farm fingerprint updated, re-initializing" + ); + + (farm_index, expired_receiver, true, true) + } + KnownFarmInsertResult::NotInserted => { + trace!( + %farm_id, + "Received identification for already known farm" + ); + // Nothing to do here + return; + } + }; + + if remove { + farms_to_add_remove.push_back(Box::pin(async move { + plotted_pieces.write().await.delete_farm(farm_index); + + None + })); + } + + if add { + farms_to_add_remove.push_back(Box::pin(async move { + match initialize_farm( + farm_index, + farm_id, + total_sectors_count, + plotted_pieces, + nats_client, + ) + .await + { + Ok(farm) => { + if remove { + info!( + %farm_index, + %farm_id, + "Farm re-initialized successfully" + ); + } else { + info!( + %farm_index, + %farm_id, + "Farm initialized successfully" + ); + } + + Some((farm_index, expired_receiver, Box::new(farm) as Box<_>)) + } + Err(error) => { + warn!( + %error, + "Failed to initialize farm {farm_id}" + ); + None + } + } + })); + } +} + +async fn initialize_farm( + farm_index: FarmIndex, + farm_id: FarmId, + total_sectors_count: SectorIndex, + plotted_pieces: &AsyncRwLock>, + nats_client: &NatsClient, +) -> anyhow::Result { + let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone()) + .await + .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?; + + let mut plotted_pieces = plotted_pieces.write().await; + plotted_pieces.add_farm(farm_index, farm.piece_reader()); + + // Buffer sectors that are plotted while already plotted sectors are being iterated over + let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new())); + let sector_update_handler = farm.on_sector_update(Arc::new({ + let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer); + + move |(_sector_index, sector_update)| { + if let SectorUpdate::Plotting(SectorPlottingDetails::Finished { + plotted_sector, + old_plotted_sector, + .. + }) = sector_update + { + plotted_sectors_buffer + .lock() + .push((plotted_sector.clone(), old_plotted_sector.clone())); + } + } + })); + + // Add plotted sectors of the farm to global plotted pieces + let plotted_sectors = farm.plotted_sectors(); + let mut plotted_sectors = plotted_sectors + .get() + .await + .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?; + while let Some(plotted_sector_result) = plotted_sectors.next().await { + let plotted_sector = plotted_sector_result + .map_err(|error| anyhow!("Failed to get plotted sector for farm {farm_id}: {error}"))?; + + plotted_pieces.add_sector(farm_index, &plotted_sector); + } + + // Add sectors that were plotted while above iteration was happening to plotted sectors + // too + drop(sector_update_handler); + for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer.lock().drain(..) { + if let Some(old_plotted_sector) = old_plotted_sector { + plotted_pieces.delete_sector(farm_index, &old_plotted_sector); + } + plotted_pieces.add_sector(farm_index, &plotted_sector); + } + + Ok(farm) +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs new file mode 100644 index 00000000000..a8ef42dae64 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -0,0 +1,511 @@ +use crate::commands::shared::metrics::{FarmerMetrics, SectorState}; +use crate::commands::shared::DiskFarm; +use anyhow::anyhow; +use async_lock::Mutex as AsyncMutex; +use backoff::ExponentialBackoff; +use bytesize::ByteSize; +use clap::Parser; +use futures::stream::{FuturesOrdered, FuturesUnordered}; +use futures::{select, FutureExt, StreamExt, TryStreamExt}; +use prometheus_client::registry::Registry; +use std::fs; +use std::future::Future; +use std::num::NonZeroUsize; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::Duration; +use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; +use subspace_core_primitives::{PublicKey, Record}; +use subspace_erasure_coding::ErasureCoding; +use subspace_farmer::cluster::controller::ClusterNodeClient; +use subspace_farmer::cluster::farmer::farmer_service; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::cluster::plotter::ClusterPlotter; +use subspace_farmer::farm::{ + Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, +}; +use subspace_farmer::node_client::NodeClient; +use subspace_farmer::single_disk_farm::{ + SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, +}; +use subspace_farmer::utils::ss58::parse_ss58_reward_address; +use subspace_farmer::utils::{ + recommended_number_of_farming_threads, run_future_in_dedicated_thread, AsyncJoinOnDrop, +}; +use subspace_proof_of_space::Table; +use tokio::sync::{Barrier, Semaphore}; +use tracing::{error, info, info_span, warn, Instrument}; + +const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); +/// Interval between farmer self-identification broadcast messages +pub(super) const FARMER_IDENTIFICATION_BROADCAST_INTERVAL: Duration = Duration::from_secs(5); + +/// Arguments for farmer +#[derive(Debug, Parser)] +pub(super) struct FarmerArgs { + /// One or more farm located at specified path, each with its own allocated space. + /// + /// In case of multiple disks, it is recommended to specify them individually rather than using + /// RAID 0, that way farmer will be able to better take advantage of concurrency of individual + /// drives. + /// + /// Format for each farm is coma-separated list of strings like this: + /// + /// path=/path/to/directory,size=5T + /// + /// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that + /// farmer will make sure to not exceed (and will pre-allocated all the space on startup to + /// ensure it will not run out of space in runtime). Optionally, `record-chunks-mode` can be + /// set to `ConcurrentChunks` or `WholeSector` in order to avoid internal benchmarking during + /// startup. + disk_farms: Vec, + /// Address for farming rewards + #[arg(long, value_parser = parse_ss58_reward_address)] + reward_address: PublicKey, + /// Run temporary farmer with specified farm size in human-readable format (e.g. 10GB, 2TiB) or + /// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the + /// end of the process. + #[arg(long, conflicts_with = "disk_farms")] + tmp: Option, + /// Maximum number of pieces in sector (can override protocol value to something lower). + /// + /// This will make plotting of individual sectors faster, decrease load on CPU proving, but also + /// proportionally increase amount of disk reads during audits since every sector needs to be + /// audited and there will be more of them. + /// + /// This is primarily for development and not recommended to use by regular users. + #[arg(long)] + max_pieces_in_sector: Option, + /// Do not print info about configured farms on startup + #[arg(long)] + no_info: bool, + /// Defines max number sectors farmer will encode concurrently, defaults to 8. Might be limited + /// by plotting capacity available in the cluster. + /// + /// Increase will result in higher memory usage. + #[arg(long, default_value = "8")] + sector_encoding_concurrency: NonZeroUsize, + /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some + /// compute-intensive operations during proving), defaults to number of logical CPUs + /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but + /// not more than 32 threads + #[arg(long)] + farming_thread_pool_size: Option, + /// Disable farm locking, for example if file system doesn't support it + #[arg(long)] + disable_farm_locking: bool, + /// Whether to create missing farms during start. + /// + /// If set to `false` farmer will exit with error if one of the farms doesn't already exist. + #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] + create: bool, + /// Exit on farm error. + /// + /// By default, farmer will continue running if there are still other working farms. + #[arg(long)] + exit_on_farm_error: bool, +} + +pub(super) async fn farmer( + nats_client: NatsClient, + registry: &mut Registry, + farmer_args: FarmerArgs, +) -> anyhow::Result>>>> +where + PosTable: Table, +{ + let FarmerArgs { + mut disk_farms, + reward_address, + tmp, + max_pieces_in_sector, + no_info, + sector_encoding_concurrency, + farming_thread_pool_size, + disable_farm_locking, + create, + exit_on_farm_error, + } = farmer_args; + + let tmp_directory = if let Some(plot_size) = tmp { + let tmp_directory = tempfile::Builder::new() + .prefix("subspace-farmer-") + .tempdir()?; + + disk_farms = vec![DiskFarm { + directory: tmp_directory.as_ref().to_path_buf(), + allocated_space: plot_size.as_u64(), + read_sector_record_chunks_mode: None, + }]; + + Some(tmp_directory) + } else { + if disk_farms.is_empty() { + return Err(anyhow!("There must be at least one disk farm provided")); + } + + for farm in &disk_farms { + if !farm.directory.exists() { + if let Err(error) = fs::create_dir(&farm.directory) { + return Err(anyhow!( + "Directory {} doesn't exist and can't be created: {}", + farm.directory.display(), + error + )); + } + } + } + None + }; + + let node_client = ClusterNodeClient::new(nats_client.clone()); + + let farmer_app_info = node_client + .farmer_app_info() + .await + .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; + + let farmer_metrics = FarmerMetrics::new(registry); + + let kzg = Kzg::new(embedded_kzg_settings()); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; + + let max_pieces_in_sector = match max_pieces_in_sector { + Some(max_pieces_in_sector) => { + if max_pieces_in_sector > farmer_app_info.protocol_info.max_pieces_in_sector { + warn!( + protocol_value = farmer_app_info.protocol_info.max_pieces_in_sector, + desired_value = max_pieces_in_sector, + "Can't set max pieces in sector higher than protocol value, using protocol \ + value" + ); + + farmer_app_info.protocol_info.max_pieces_in_sector + } else { + max_pieces_in_sector + } + } + None => farmer_app_info.protocol_info.max_pieces_in_sector, + }; + + let farming_thread_pool_size = farming_thread_pool_size + .map(|farming_thread_pool_size| farming_thread_pool_size.get()) + .unwrap_or_else(recommended_number_of_farming_threads); + + let global_mutex = Arc::default(); + let plotter = Arc::new(ClusterPlotter::new( + nats_client.clone(), + sector_encoding_concurrency, + ExponentialBackoff { + max_elapsed_time: None, + ..ExponentialBackoff::default() + }, + )); + + let farms = { + let node_client = node_client.clone(); + let info_mutex = &AsyncMutex::new(()); + let faster_read_sector_record_chunks_mode_barrier = + Arc::new(Barrier::new(disk_farms.len())); + let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1)); + + let mut farms = Vec::with_capacity(disk_farms.len()); + let mut farms_stream = disk_farms + .into_iter() + .enumerate() + .map(|(farm_index, disk_farm)| { + let farmer_app_info = farmer_app_info.clone(); + let node_client = node_client.clone(); + let kzg = kzg.clone(); + let erasure_coding = erasure_coding.clone(); + let plotter = Arc::clone(&plotter); + let global_mutex = Arc::clone(&global_mutex); + let faster_read_sector_record_chunks_mode_barrier = + Arc::clone(&faster_read_sector_record_chunks_mode_barrier); + let faster_read_sector_record_chunks_mode_concurrency = + Arc::clone(&faster_read_sector_record_chunks_mode_concurrency); + + async move { + let farm_fut = SingleDiskFarm::new::<_, _, PosTable>( + SingleDiskFarmOptions { + directory: disk_farm.directory.clone(), + farmer_app_info, + allocated_space: disk_farm.allocated_space, + max_pieces_in_sector, + node_client, + reward_address, + kzg, + erasure_coding, + // Cache is provided by dedicated caches in farming cluster + cache_percentage: 0, + farming_thread_pool_size, + plotting_delay: None, + global_mutex, + disable_farm_locking, + read_sector_record_chunks_mode: disk_farm + .read_sector_record_chunks_mode, + faster_read_sector_record_chunks_mode_barrier, + faster_read_sector_record_chunks_mode_concurrency, + plotter, + create, + }, + farm_index, + ); + + let farm = match farm_fut.await { + Ok(farm) => farm, + Err(SingleDiskFarmError::InsufficientAllocatedSpace { + min_space, + allocated_space, + }) => { + return ( + farm_index, + Err(anyhow!( + "Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \ + {} bytes to be exact)", + bytesize::to_string(allocated_space, true), + bytesize::to_string(allocated_space, false), + bytesize::to_string(min_space, true), + bytesize::to_string(min_space, false), + min_space + )), + ); + } + Err(error) => { + return (farm_index, Err(error.into())); + } + }; + + if !no_info { + let _info_guard = info_mutex.lock().await; + + let info = farm.info(); + info!("Farm {farm_index}:"); + info!(" ID: {}", info.id()); + info!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash())); + info!(" Public key: 0x{}", hex::encode(info.public_key())); + info!( + " Allocated space: {} ({})", + bytesize::to_string(info.allocated_space(), true), + bytesize::to_string(info.allocated_space(), false) + ); + info!(" Directory: {}", disk_farm.directory.display()); + } + + (farm_index, Ok(Box::new(farm) as Box)) + } + .instrument(info_span!("", %farm_index)) + }) + .collect::>(); + + while let Some((farm_index, farm)) = farms_stream.next().await { + if let Err(error) = &farm { + let span = info_span!("", %farm_index); + let _span_guard = span.enter(); + + error!(%error, "Farm creation failed"); + } + farms.push((farm_index, farm?)); + } + + // Restore order after unordered initialization + farms.sort_unstable_by_key(|(farm_index, _farm)| *farm_index); + + farms + .into_iter() + .map(|(_farm_index, farm)| farm) + .collect::>() + }; + + let total_and_plotted_sectors = farms + .iter() + .enumerate() + .map(|(farm_index, farm)| async move { + let total_sector_count = farm.total_sectors_count(); + let mut plotted_sectors_count = 0; + let plotted_sectors = farm.plotted_sectors(); + let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { + anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}") + })?; + while let Some(plotted_sector_result) = plotted_sectors.next().await { + plotted_sectors_count += 1; + plotted_sector_result.map_err(|error| { + anyhow!( + "Failed reading plotted sector on startup for farm {farm_index}: {error}" + ) + })?; + } + + anyhow::Ok((total_sector_count, plotted_sectors_count)) + }) + .collect::>() + .try_collect::>() + .await?; + + let farmer_service_fut = farmer_service( + nats_client, + farms.as_slice(), + FARMER_IDENTIFICATION_BROADCAST_INTERVAL, + ); + let farmer_service_fut = run_future_in_dedicated_thread( + move || farmer_service_fut, + "controller-service".to_string(), + )?; + + let mut farms_stream = (0u8..) + .zip(farms) + .zip(total_and_plotted_sectors) + .map(|((farm_index, farm), sector_counts)| { + let (total_sector_count, plotted_sectors_count) = sector_counts; + farmer_metrics.update_sectors_total( + farm.id(), + total_sector_count - plotted_sectors_count, + SectorState::NotPlotted, + ); + farmer_metrics.update_sectors_total( + farm.id(), + plotted_sectors_count, + SectorState::Plotted, + ); + farm.on_sector_update(Arc::new({ + let farm_id = *farm.id(); + let farmer_metrics = farmer_metrics.clone(); + + move |(_sector_index, sector_state)| match sector_state { + SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => { + farmer_metrics.sector_plotting.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => { + farmer_metrics.sector_downloading.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => { + farmer_metrics.observe_sector_downloading_time(&farm_id, time); + farmer_metrics.sector_downloaded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => { + farmer_metrics.sector_encoding.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => { + farmer_metrics.observe_sector_encoding_time(&farm_id, time); + farmer_metrics.sector_encoded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Writing) => { + farmer_metrics.sector_writing.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => { + farmer_metrics.observe_sector_writing_time(&farm_id, time); + farmer_metrics.sector_written.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Finished { time, .. }) => { + farmer_metrics.observe_sector_plotting_time(&farm_id, time); + farmer_metrics.sector_plotted.inc(); + farmer_metrics.update_sector_state(&farm_id, SectorState::Plotted); + } + SectorUpdate::Plotting(SectorPlottingDetails::Error(_)) => { + farmer_metrics.sector_plotting_error.inc(); + } + SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { + farmer_metrics.update_sector_state(&farm_id, SectorState::AboutToExpire); + } + SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { + farmer_metrics.update_sector_state(&farm_id, SectorState::Expired); + } + SectorUpdate::Expiration(SectorExpirationDetails::Determined { .. }) => { + // Not interested in here + } + } + })) + .detach(); + + farm.on_farming_notification(Arc::new({ + let farm_id = *farm.id(); + let farmer_metrics = farmer_metrics.clone(); + + move |farming_notification| match farming_notification { + FarmingNotification::Auditing(auditing_details) => { + farmer_metrics.observe_auditing_time(&farm_id, &auditing_details.time); + } + FarmingNotification::Proving(proving_details) => { + farmer_metrics.observe_proving_time( + &farm_id, + &proving_details.time, + proving_details.result, + ); + } + FarmingNotification::NonFatalError(error) => { + farmer_metrics.note_farming_error(&farm_id, error); + } + } + })) + .detach(); + + farm.run().map(move |result| (farm_index, result)) + }) + .collect::>(); + + let mut farm_errors = Vec::new(); + + let farm_fut = run_future_in_dedicated_thread( + move || async move { + while let Some((farm_index, result)) = farms_stream.next().await { + match result { + Ok(()) => { + info!(%farm_index, "Farm exited successfully"); + } + Err(error) => { + error!(%farm_index, %error, "Farm exited with error"); + + if farms_stream.is_empty() || exit_on_farm_error { + return Err(error); + } else { + farm_errors.push(AsyncJoinOnDrop::new( + tokio::spawn(async move { + loop { + tokio::time::sleep(FARM_ERROR_PRINT_INTERVAL).await; + + error!( + %farm_index, + %error, + "Farm errored and stopped" + ); + } + }), + true, + )) + } + } + } + } + anyhow::Ok(()) + }, + "farmer-farm".to_string(), + )?; + + Ok(Box::pin(async move { + let farm_fut = farm_fut; + let farmer_service_fut = farmer_service_fut; + + let farm_fut = pin!(farm_fut); + let farmer_service_fut = pin!(farmer_service_fut); + + select! { + // Farm future + result = farm_fut.fuse() => { + result??; + }, + + // Piece cache worker future + result = farmer_service_fut.fuse() => { + result??; + }, + } + + drop(tmp_directory); + + Ok(()) + })) +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs new file mode 100644 index 00000000000..b44ab3961e0 --- /dev/null +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -0,0 +1,166 @@ +use crate::commands::shared::PlottingThreadPriority; +use anyhow::anyhow; +use clap::Parser; +use prometheus_client::registry::Registry; +use std::future::Future; +use std::num::NonZeroUsize; +use std::pin::Pin; +use std::sync::Arc; +use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; +use subspace_core_primitives::Record; +use subspace_erasure_coding::ErasureCoding; +use subspace_farmer::cluster::controller::ClusterPieceGetter; +use subspace_farmer::cluster::nats_client::NatsClient; +use subspace_farmer::cluster::plotter::plotter_service; +use subspace_farmer::plotter::cpu::CpuPlotter; +use subspace_farmer::utils::{ + create_plotting_thread_pool_manager, parse_cpu_cores_sets, thread_pool_core_indices, +}; +use subspace_proof_of_space::Table; +use tokio::sync::Semaphore; +use tracing::info; + +/// Arguments for plotter +#[derive(Debug, Parser)] +pub(super) struct PlotterArgs { + /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of + /// the plotting process, defaults to `--sector-encoding-concurrency` + 1 to download future + /// sector ahead of time. + /// + /// Increase will result in higher memory usage. + #[arg(long)] + sector_downloading_concurrency: Option, + /// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and + /// number of NUMA nodes on NUMA system or L3 cache groups on large CPUs. It is further + /// restricted by + /// `--sector-downloading-concurrency` and setting this option higher than + /// `--sector-downloading-concurrency` will have no effect. + /// + /// Increase will result in higher memory usage. + #[arg(long)] + sector_encoding_concurrency: Option, + /// Defines how many records farmer will encode in a single sector concurrently, defaults to one + /// record per 2 cores, but not more than 8 in total. Higher concurrency means higher memory + /// usage and typically more efficient CPU utilization. + #[arg(long)] + record_encoding_concurrency: Option, + /// Size of one thread pool used for plotting, defaults to number of logical CPUs available + /// on UMA system and number of logical CPUs available in NUMA node on NUMA system or L3 cache + /// groups on large CPUs. + /// + /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different + /// thread pools might have different number of threads if NUMA nodes do not have the same size. + /// + /// Threads will be pinned to corresponding CPU cores at creation. + #[arg(long)] + plotting_thread_pool_size: Option, + /// Specify exact CPU cores to be used for plotting bypassing any custom logic farmer might use + /// otherwise. It replaces both `--sector-encoding-concurrency` and + /// `--plotting-thread-pool-size` options if specified. Requires `--replotting-cpu-cores` to be + /// specified with the same number of CPU cores groups (or not specified at all, in which case + /// it'll use the same thread pool as plotting). + /// + /// Cores are coma-separated, with whitespace separating different thread pools/encoding + /// instances. For example "0,1 2,3" will result in two sectors being encoded at the same time, + /// each with a pair of CPU cores. + #[arg(long, conflicts_with_all = & ["sector_encoding_concurrency", "plotting_thread_pool_size"])] + plotting_cpu_cores: Option, + /// Plotting thread priority, by default de-prioritizes plotting threads in order to make sure + /// farming is successful and computer can be used comfortably for other things. Can be set to + /// "min", "max" or "default". + #[arg(long, default_value_t = PlottingThreadPriority::Min)] + plotting_thread_priority: PlottingThreadPriority, +} + +pub(super) async fn plotter( + nats_client: NatsClient, + _registry: &mut Registry, + plotter_args: PlotterArgs, +) -> anyhow::Result>>>> +where + PosTable: Table, +{ + let PlotterArgs { + sector_downloading_concurrency, + sector_encoding_concurrency, + record_encoding_concurrency, + plotting_thread_pool_size, + plotting_cpu_cores, + plotting_thread_priority, + } = plotter_args; + + let kzg = Kzg::new(embedded_kzg_settings()); + let erasure_coding = ErasureCoding::new( + NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) + .expect("Not zero; qed"), + ) + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; + let piece_getter = ClusterPieceGetter::new(nats_client.clone()); + + let plotting_thread_pool_core_indices; + if let Some(plotting_cpu_cores) = plotting_cpu_cores { + plotting_thread_pool_core_indices = parse_cpu_cores_sets(&plotting_cpu_cores) + .map_err(|error| anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?; + } else { + plotting_thread_pool_core_indices = + thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency); + + if plotting_thread_pool_core_indices.len() > 1 { + info!( + l3_cache_groups = %plotting_thread_pool_core_indices.len(), + "Multiple L3 cache groups detected" + ); + } + } + + let downloading_semaphore = Arc::new(Semaphore::new( + sector_downloading_concurrency + .map(|sector_downloading_concurrency| sector_downloading_concurrency.get()) + .unwrap_or(plotting_thread_pool_core_indices.len() + 1), + )); + + let record_encoding_concurrency = record_encoding_concurrency.unwrap_or_else(|| { + let cpu_cores = plotting_thread_pool_core_indices + .first() + .expect("Guaranteed to have some CPU cores; qed"); + + NonZeroUsize::new((cpu_cores.cpu_cores().len() / 2).clamp(1, 8)).expect("Not zero; qed") + }); + + info!( + ?plotting_thread_pool_core_indices, + "Preparing plotting thread pools" + ); + + let replotting_thread_pool_core_indices = plotting_thread_pool_core_indices + .clone() + .into_iter() + .map(|mut cpu_core_set| { + // We'll not use replotting threads at all, so just limit them to 1 core so we don't + // have too many threads hanging unnecessarily + cpu_core_set.truncate(1); + cpu_core_set + }); + let plotting_thread_pool_manager = create_plotting_thread_pool_manager( + plotting_thread_pool_core_indices + .into_iter() + .zip(replotting_thread_pool_core_indices), + plotting_thread_priority.into(), + )?; + let global_mutex = Arc::default(); + let cpu_plotter = CpuPlotter::<_, PosTable>::new( + piece_getter, + downloading_semaphore, + plotting_thread_pool_manager, + record_encoding_concurrency, + Arc::clone(&global_mutex), + kzg.clone(), + erasure_coding.clone(), + ); + + // TODO: Metrics + + Ok(Box::pin(async move { + plotter_service(&nats_client, &cpu_plotter).await + })) +} diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index e5409fa8000..ce07e9d06b9 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -131,7 +131,7 @@ pub(crate) struct FarmingArgs { /// Increase will result in higher memory usage. #[arg(long)] sector_encoding_concurrency: Option, - /// Defines how many record farmer will encode in a single sector concurrently, defaults to one + /// Defines how many records farmer will encode in a single sector concurrently, defaults to one /// record per 2 cores, but not more than 8 in total. Higher concurrency means higher memory /// usage and typically more efficient CPU utilization. #[arg(long)] @@ -184,7 +184,8 @@ pub(crate) struct FarmingArgs { #[arg(long, conflicts_with_all = & ["sector_encoding_concurrency", "replotting_thread_pool_size"])] replotting_cpu_cores: Option, /// Plotting thread priority, by default de-prioritizes plotting threads in order to make sure - /// farming is successful and computer can be used comfortably for other things + /// farming is successful and computer can be used comfortably for other things. Can be set to + /// "min", "max" or "default". #[arg(long, default_value_t = PlottingThreadPriority::Min)] plotting_thread_priority: PlottingThreadPriority, /// Enable plot cache. diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 2f368efd1f0..3522f376a0d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -1,4 +1,10 @@ -#![feature(const_option, type_changing_struct_update)] +#![feature( + const_option, + extract_if, + hash_extract_if, + let_chains, + type_changing_struct_update +)] mod commands; mod utils; @@ -24,6 +30,8 @@ type PosTable = ChiaTable; enum Command { /// Start a farmer, does plotting and farming Farm(commands::farm::FarmingArgs), + /// Farming cluster + Cluster(commands::cluster::ClusterArgs), /// Run various benchmarks #[clap(subcommand)] Benchmark(commands::benchmark::BenchmarkArgs), @@ -86,6 +94,9 @@ async fn main() -> anyhow::Result<()> { Command::Farm(farming_args) => { commands::farm::farm::(farming_args).await?; } + Command::Cluster(cluster_args) => { + commands::cluster::cluster::(cluster_args).await?; + } Command::Benchmark(benchmark_args) => { commands::benchmark::benchmark(benchmark_args)?; } diff --git a/crates/subspace-farmer/src/cluster.rs b/crates/subspace-farmer/src/cluster.rs index 313273ca268..2de6eead0ef 100644 --- a/crates/subspace-farmer/src/cluster.rs +++ b/crates/subspace-farmer/src/cluster.rs @@ -1 +1,57 @@ +//! Cluster version of the farmer +//! +//! This module contains isolated set of modules that implement cluster-specific functionality for +//! the farmer, allowing to distribute cooperating components across machines, while still working +//! together. +//! +//! Specifically, 4 separate components are extracted: +//! * controller +//! * farmer +//! * plotter +//! * cache +//! +//! ### Controller +//! +//! Controller connects to the node via RPC and DSN. It handles notifications from node and +//! orchestrates other components. It will send slot notifications to farmers, store and retrieve +//! pieces from caches on requests from DSN, etc. +//! +//! While there could be multiple controllers shared between farmers, each controller must have its +//! dedicated pool of caches and each cache should belong to a single controller. This allows to +//! shut down some controllers for upgrades and other maintenance tasks without affecting farmer's +//! ability to farm and receive rewards. +//! +//! ### Farmer +//! +//! Farmer maintains farms with plotted pieces and corresponding metadata. Farmer does audits and +//! proving, retrieves pieces from plotted sectors on request, but doesn’t do any caching or P2P +//! networking with DSN. When sectors need to be plotted/replotted, request will be sent to Plotter +//! to do that instead of doing it locally, though plotter and farmer can be co-located. +//! +//! Farmers receive (de-duplicated) slot notifications from all controllers and will send solution +//! back to the controller from which they received slot notification. +//! +//! ### Plotter +//! +//! Plotter needs to be able to do heavy compute with proportional amount of RAM for plotting +//! purposes. +//! +//! There could be any number of plotters in a cluster, adding more will increase total cluster +//! ability to plot concurrent sectors. +//! +//! ### Cache +//! +//! Cache helps with plotting process and with serving data to DSN. At the same time, writes and +//! reads are while random, they are done in large size and low frequency comparing in contrast to +//! farmer. Fast retrieval is important for plotters to not stay idle, but generally cache can work +//! even on HDDs. +//! +//! There could be any number of caches in the cluster, but each cache instance belongs to one of +//! the controllers. So if multiple controllers are present in the cluster, you'll want at least one +//! cache connected to each as well for optimal performance. + +pub mod cache; +pub mod controller; +pub mod farmer; pub mod nats_client; +pub mod plotter; diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs new file mode 100644 index 00000000000..e4f543752b7 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -0,0 +1,754 @@ +//! Farming cluster cache +//! +//! Cache is responsible for caching pieces within allocated space to accelerate plotting and serve +//! pieces in response to DSN requests. +//! +//! This module exposes some data structures for NATS communication, custom piece cache +//! implementation designed to work with cluster cache and a service function to drive the backend +//! part of the cache. + +use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; +use crate::cluster::nats_client::{ + GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, +}; +use crate::farm::{FarmError, PieceCache, PieceCacheOffset}; +use anyhow::anyhow; +use async_nats::Message; +use async_trait::async_trait; +use derive_more::Display; +use futures::stream::FuturesUnordered; +use futures::{select, stream, FutureExt, Stream, StreamExt}; +use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output}; +use std::future::{pending, Future}; +use std::pin::{pin, Pin}; +use std::time::Duration; +use subspace_core_primitives::{Piece, PieceIndex}; +use tokio::time::MissedTickBehavior; +use tracing::{debug, error, trace, warn}; +use ulid::Ulid; + +/// An ephemeral identifier for a cache +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)] +pub enum ClusterCacheId { + Ulid(Ulid), +} + +impl Encode for ClusterCacheId { + #[inline] + fn size_hint(&self) -> usize { + 1_usize + + match self { + ClusterCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)), + } + } + + #[inline] + fn encode_to(&self, output: &mut O) { + match self { + ClusterCacheId::Ulid(ulid) => { + output.push_byte(0); + Encode::encode_to(&ulid.0, output); + } + } + } +} + +impl EncodeLike for ClusterCacheId {} + +impl Decode for ClusterCacheId { + #[inline] + fn decode(input: &mut I) -> Result { + match input + .read_byte() + .map_err(|e| e.chain("Could not decode `CacheId`, failed to read variant byte"))? + { + 0 => u128::decode(input) + .map(|ulid| ClusterCacheId::Ulid(Ulid(ulid))) + .map_err(|e| e.chain("Could not decode `CacheId::Ulid.0`")), + _ => Err("Could not decode `CacheId`, variant doesn't exist".into()), + } + } +} + +#[allow(clippy::new_without_default)] +impl ClusterCacheId { + /// Creates new ID + #[inline] + pub fn new() -> Self { + Self::Ulid(Ulid::new()) + } +} + +/// Broadcast with identification details by caches +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterCacheIdentifyBroadcast { + /// Cache ID + pub cache_id: ClusterCacheId, + /// Max number of elements in this cache + pub max_num_elements: u32, +} + +impl GenericBroadcast for ClusterCacheIdentifyBroadcast { + const SUBJECT: &'static str = "subspace.cache.*.identify"; +} + +/// Write piece into cache +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterCacheWritePieceRequest { + offset: PieceCacheOffset, + piece_index: PieceIndex, + piece: Piece, +} + +impl GenericRequest for ClusterCacheWritePieceRequest { + const SUBJECT: &'static str = "subspace.cache.*.write-piece"; + type Response = Result<(), String>; +} + +/// Read piece index from cache +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterCacheReadPieceIndexRequest { + offset: PieceCacheOffset, +} + +impl GenericRequest for ClusterCacheReadPieceIndexRequest { + const SUBJECT: &'static str = "subspace.cache.*.read-piece-index"; + type Response = Result, String>; +} + +/// Read piece from cache +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterCacheReadPieceRequest { + offset: PieceCacheOffset, +} + +impl GenericRequest for ClusterCacheReadPieceRequest { + const SUBJECT: &'static str = "subspace.cache.*.read-piece"; + type Response = Result, String>; +} + +/// Request plotted from farmer, request +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterCacheContentsRequest; + +impl GenericStreamRequest for ClusterCacheContentsRequest { + const SUBJECT: &'static str = "subspace.cache.*.contents"; + type Response = Result<(PieceCacheOffset, Option), String>; +} + +/// Cluster cache implementation +#[derive(Debug)] +pub struct ClusterPieceCache { + cache_id_string: String, + max_num_elements: u32, + nats_client: NatsClient, +} + +#[async_trait] +impl PieceCache for ClusterPieceCache { + #[inline] + fn max_num_elements(&self) -> u32 { + self.max_num_elements + } + + async fn contents( + &self, + ) -> Result< + Box< + dyn Stream), FarmError>> + + Unpin + + Send + + '_, + >, + FarmError, + > { + Ok(Box::new( + self.nats_client + .stream_request(ClusterCacheContentsRequest, Some(&self.cache_id_string)) + .await? + .map(|response| response.map_err(FarmError::from)), + )) + } + + async fn write_piece( + &self, + offset: PieceCacheOffset, + piece_index: PieceIndex, + piece: &Piece, + ) -> Result<(), FarmError> { + Ok(self + .nats_client + .request( + &ClusterCacheWritePieceRequest { + offset, + piece_index, + piece: piece.clone(), + }, + Some(&self.cache_id_string), + ) + .await??) + } + + async fn read_piece_index( + &self, + offset: PieceCacheOffset, + ) -> Result, FarmError> { + Ok(self + .nats_client + .request( + &ClusterCacheReadPieceIndexRequest { offset }, + Some(&self.cache_id_string), + ) + .await??) + } + + async fn read_piece(&self, offset: PieceCacheOffset) -> Result, FarmError> { + Ok(self + .nats_client + .request( + &ClusterCacheReadPieceRequest { offset }, + Some(&self.cache_id_string), + ) + .await??) + } +} + +impl ClusterPieceCache { + /// Create new instance using information from previously received + /// [`ClusterCacheIdentifyBroadcast`] + #[inline] + pub fn new( + cache_id: ClusterCacheId, + max_num_elements: u32, + nats_client: NatsClient, + ) -> ClusterPieceCache { + Self { + cache_id_string: cache_id.to_string(), + max_num_elements, + nats_client, + } + } +} + +#[derive(Debug)] +struct CacheDetails<'a, C> { + cache_id: ClusterCacheId, + cache_id_string: String, + cache: &'a C, +} + +/// Create cache service for specified caches that will be processing incoming requests and send +/// periodic identify notifications +pub async fn cache_service( + nats_client: NatsClient, + caches: &[C], + cache_group: &str, + identification_broadcast_interval: Duration, +) -> anyhow::Result<()> +where + C: PieceCache, +{ + let caches_details = caches + .iter() + .map(|cache| { + let cache_id = ClusterCacheId::new(); + + CacheDetails { + cache_id, + cache_id_string: cache_id.to_string(), + cache, + } + }) + .collect::>(); + + select! { + result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { + result + }, + result = write_piece_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = read_piece_index_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = read_piece_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = contents_responder(&nats_client, &caches_details).fuse() => { + result + }, + } +} + +/// Listen for cache identification broadcast from controller and publish identification +/// broadcast in response, also send periodic notifications reminding that cache exists. +/// +/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times +/// per controller instance in order to parallelize more work across threads if needed. +async fn identify_responder( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], + cache_group: &str, + identification_broadcast_interval: Duration, +) -> anyhow::Result<()> +where + C: PieceCache, +{ + let mut subscription = nats_client + .subscribe_to_broadcasts::( + Some(cache_group), + Some(cache_group.to_string()), + ) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to cache identify broadcast requests: {error}") + })? + .fuse(); + // Also send periodic updates in addition to the subscription response + let mut interval = tokio::time::interval(identification_broadcast_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + debug!("Identify broadcast stream ended"); + break; + }; + + trace!(?message, "Cache received identify broadcast message"); + + send_identify_broadcast(nats_client, caches_details).await; + interval.reset(); + } + _ = interval.tick().fuse() => { + trace!("Cache self-identification"); + + send_identify_broadcast(nats_client, caches_details).await; + } + } + } + + Ok(()) +} + +async fn send_identify_broadcast( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], +) where + C: PieceCache, +{ + caches_details + .iter() + .map(|cache| async move { + if let Err(error) = nats_client + .broadcast( + &ClusterCacheIdentifyBroadcast { + cache_id: cache.cache_id, + max_num_elements: cache.cache.max_num_elements(), + }, + &cache.cache_id_string, + ) + .await + { + warn!( + cache_id = %cache.cache_id, + %error, + "Failed to send cache identify notification" + ); + } + }) + .collect::>() + .collect::>() + .await; +} + +async fn write_piece_responder( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], +) -> anyhow::Result<()> +where + C: PieceCache, +{ + caches_details + .iter() + .map(|cache_details| async move { + // Initialize with pending future so it never ends + let mut processing = + FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + let mut subscription = nats_client + .queue_subscribe( + ClusterCacheWritePieceRequest::SUBJECT + .replace('*', &cache_details.cache_id_string), + cache_details.cache_id_string.clone(), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to write piece requests for cache {}: {}", + cache_details.cache_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_write_piece_request( + nats_client, + cache_details, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No caches"))? +} + +async fn process_write_piece_request( + nats_client: &NatsClient, + cache_details: &CacheDetails<'_, C>, + message: Message, +) where + C: PieceCache, +{ + let Some(reply_subject) = message.reply else { + return; + }; + + let ClusterCacheWritePieceRequest { + offset, + piece_index, + piece, + } = match ClusterCacheWritePieceRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode write piece request" + ); + return; + } + }; + + let response: ::Response = cache_details + .cache + .write_piece(offset, piece_index, &piece) + .await + .map_err(|error| error.to_string()); + + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send write piece response"); + } +} + +async fn read_piece_index_responder( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], +) -> anyhow::Result<()> +where + C: PieceCache, +{ + caches_details + .iter() + .map(|cache_details| async move { + // Initialize with pending future so it never ends + let mut processing = + FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + let mut subscription = nats_client + .queue_subscribe( + ClusterCacheReadPieceIndexRequest::SUBJECT + .replace('*', &cache_details.cache_id_string), + cache_details.cache_id_string.clone(), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to read piece index requests for cache {}: {}", + cache_details.cache_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_read_piece_index_request( + nats_client, + cache_details, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No caches"))? +} + +async fn process_read_piece_index_request( + nats_client: &NatsClient, + cache_details: &CacheDetails<'_, C>, + message: Message, +) where + C: PieceCache, +{ + let Some(reply_subject) = message.reply else { + return; + }; + + let ClusterCacheReadPieceIndexRequest { offset } = + match ClusterCacheReadPieceIndexRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode read piece index request" + ); + return; + } + }; + + let response: ::Response = cache_details + .cache + .read_piece_index(offset) + .await + .map_err(|error| error.to_string()); + + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send read piece index response"); + } +} + +async fn read_piece_responder( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], +) -> anyhow::Result<()> +where + C: PieceCache, +{ + caches_details + .iter() + .map(|cache_details| async move { + // Initialize with pending future so it never ends + let mut processing = + FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + let mut subscription = nats_client + .queue_subscribe( + ClusterCacheReadPieceRequest::SUBJECT + .replace('*', &cache_details.cache_id_string), + cache_details.cache_id_string.clone(), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to read piece requests for cache {}: {}", + cache_details.cache_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_read_piece_request( + nats_client, + cache_details, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No caches"))? +} + +async fn process_read_piece_request( + nats_client: &NatsClient, + cache_details: &CacheDetails<'_, C>, + message: Message, +) where + C: PieceCache, +{ + let Some(reply_subject) = message.reply else { + return; + }; + + let ClusterCacheReadPieceRequest { offset } = + match ClusterCacheReadPieceRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode read piece request" + ); + return; + } + }; + + let response: ::Response = cache_details + .cache + .read_piece(offset) + .await + .map_err(|error| error.to_string()); + + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send read piece response"); + } +} + +async fn contents_responder( + nats_client: &NatsClient, + caches_details: &[CacheDetails<'_, C>], +) -> anyhow::Result<()> +where + C: PieceCache, +{ + caches_details + .iter() + .map(|cache_details| async move { + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin + Send>> + ]); + let mut subscription = nats_client + .subscribe_to_stream_requests( + Some(&cache_details.cache_id_string), + Some(cache_details.cache_id_string.clone()), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to contents requests for cache {}: {}", + cache_details.cache_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_contents_request( + nats_client, + cache_details, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No caches"))? +} + +async fn process_contents_request( + nats_client: &NatsClient, + cache_details: &CacheDetails<'_, C>, + request: StreamRequest, +) where + C: PieceCache, +{ + trace!(?request, "Contents request"); + + match cache_details.cache.contents().await { + Ok(contents) => { + nats_client + .stream_response::( + request.response_subject, + contents.map(|maybe_cache_element| { + maybe_cache_element.map_err(|error| error.to_string()) + }), + ) + .await; + } + Err(error) => { + error!( + %error, + cache_id = %cache_details.cache_id, + "Failed to get contents" + ); + + nats_client + .stream_response::( + request.response_subject, + pin!(stream::once(async move { + Err(format!("Failed to get contents: {error}")) + })), + ) + .await; + } + } +} diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs new file mode 100644 index 00000000000..0c277a4cc4e --- /dev/null +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -0,0 +1,727 @@ +//! Farming cluster controller +//! +//! Controller is responsible for managing farming cluster. +//! +//! This module exposes some data structures for NATS communication, custom piece getter and node +//! client implementations designed to work with cluster controller and a service function to drive +//! the backend part of the controller. + +use crate::cluster::nats_client::{ + GenericBroadcast, GenericNotification, GenericRequest, NatsClient, +}; +use crate::node_client::{Error as NodeClientError, NodeClient}; +use anyhow::anyhow; +use async_nats::{HeaderValue, Message}; +use async_trait::async_trait; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, Stream, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use parking_lot::Mutex; +use std::error::Error; +use std::future::{pending, Future}; +use std::pin::Pin; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; +use subspace_farmer_components::PieceGetter; +use subspace_rpc_primitives::{ + FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, +}; +use tracing::{debug, trace, warn}; + +const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1); + +/// Broadcast sent by controllers requesting farmers to identify themselves +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub struct ClusterControllerFarmerIdentifyBroadcast; + +impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast { + const SUBJECT: &'static str = "subspace.controller.farmer-identify"; +} + +/// Broadcast sent by controllers requesting caches in cache group to identify themselves +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub struct ClusterControllerCacheIdentifyBroadcast; + +impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast { + /// `*` here stands for cache group + const SUBJECT: &'static str = "subspace.controller.*.cache-identify"; +} + +/// Broadcast with slot info sent by controllers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerSlotInfoBroadcast { + pub slot_info: SlotInfo, + pub instance: String, +} + +impl GenericBroadcast for ClusterControllerSlotInfoBroadcast { + const SUBJECT: &'static str = "subspace.controller.slot-info"; + + fn deterministic_message_id(&self) -> Option { + // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might + // be simplified to just a slot number + Some(HeaderValue::from( + format!("slot-info-{}", self.slot_info.slot_number).as_str(), + )) + } +} + +/// Broadcast with reward signing info by controllers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerRewardSigningBroadcast { + pub reward_signing_info: RewardSigningInfo, +} + +impl GenericBroadcast for ClusterControllerRewardSigningBroadcast { + const SUBJECT: &'static str = "subspace.controller.reward-signing-info"; +} + +/// Broadcast with archived segment headers by controllers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerArchivedSegmentHeaderBroadcast { + pub archived_segment_header: SegmentHeader, +} + +impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast { + const SUBJECT: &'static str = "subspace.controller.archived-segment-header"; + + fn deterministic_message_id(&self) -> Option { + // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might + // be simplified to just a segment index + Some(HeaderValue::from( + format!( + "archived-segment-{}", + self.archived_segment_header.segment_index() + ) + .as_str(), + )) + } +} + +/// Notification messages with solution by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerSolutionNotification { + pub solution_response: SolutionResponse, +} + +impl GenericNotification for ClusterControllerSolutionNotification { + const SUBJECT: &'static str = "subspace.controller.*.solution"; +} + +/// Notification messages with reward signature by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerRewardSignatureNotification { + pub reward_signature: RewardSignatureResponse, +} + +impl GenericNotification for ClusterControllerRewardSignatureNotification { + const SUBJECT: &'static str = "subspace.controller.*.reward-signature"; +} + +/// Request farmer app info from controller +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerFarmerAppInfoRequest; + +impl GenericRequest for ClusterControllerFarmerAppInfoRequest { + const SUBJECT: &'static str = "subspace.controller.farmer-app-info"; + type Response = FarmerAppInfo; +} + +/// Request segment headers with specified segment indices +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerSegmentHeadersRequest { + pub segment_indices: Vec, +} + +impl GenericRequest for ClusterControllerSegmentHeadersRequest { + const SUBJECT: &'static str = "subspace.controller.segment-headers"; + type Response = Vec>; +} + +/// Request piece with specified index +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterControllerPieceRequest { + pub piece_index: PieceIndex, +} + +impl GenericRequest for ClusterControllerPieceRequest { + const SUBJECT: &'static str = "subspace.controller.piece"; + type Response = Option; +} + +/// Cluster piece getter +#[derive(Debug, Clone)] +pub struct ClusterPieceGetter { + nats_client: NatsClient, +} + +#[async_trait] +impl PieceGetter for ClusterPieceGetter { + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box> { + Ok(self + .nats_client + .request(&ClusterControllerPieceRequest { piece_index }, None) + .await?) + } +} + +impl ClusterPieceGetter { + /// Create new instance + #[inline] + pub fn new(nats_client: NatsClient) -> Self { + Self { nats_client } + } +} + +/// [`NodeClient`] used in cluster environment that connects to node through a controller instead +/// of to the node directly +#[derive(Debug, Clone)] +pub struct ClusterNodeClient { + nats_client: NatsClient, + // Store last slot info instance that can be used to send solution response to (some instances + // may be not synced and not able to receive solution responses) + last_slot_info_instance: Arc>, +} + +impl ClusterNodeClient { + /// Create a new instance + #[inline] + pub fn new(nats_client: NatsClient) -> Self { + Self { + nats_client, + last_slot_info_instance: Arc::default(), + } + } +} + +#[async_trait] +impl NodeClient for ClusterNodeClient { + async fn farmer_app_info(&self) -> Result { + Ok(self + .nats_client + .request(&ClusterControllerFarmerAppInfoRequest, None) + .await?) + } + + async fn subscribe_slot_info( + &self, + ) -> Result + Send + 'static>>, NodeClientError> { + let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance); + let subscription = self + .nats_client + .subscribe_to_broadcasts::(None, None) + .await? + .map(move |broadcast| { + *last_slot_info_instance.lock() = broadcast.instance; + + broadcast.slot_info + }); + + Ok(Box::pin(subscription)) + } + + async fn submit_solution_response( + &self, + solution_response: SolutionResponse, + ) -> Result<(), NodeClientError> { + let last_slot_info_instance = self.last_slot_info_instance.lock().clone(); + Ok(self + .nats_client + .notification( + &ClusterControllerSolutionNotification { solution_response }, + Some(&last_slot_info_instance), + ) + .await?) + } + + async fn subscribe_reward_signing( + &self, + ) -> Result + Send + 'static>>, NodeClientError> + { + let subscription = self + .nats_client + .subscribe_to_broadcasts::(None, None) + .await? + .map(|broadcast| broadcast.reward_signing_info); + + Ok(Box::pin(subscription)) + } + + /// Submit a block signature + async fn submit_reward_signature( + &self, + reward_signature: RewardSignatureResponse, + ) -> Result<(), NodeClientError> { + let last_slot_info_instance = self.last_slot_info_instance.lock().clone(); + Ok(self + .nats_client + .notification( + &ClusterControllerRewardSignatureNotification { reward_signature }, + Some(&last_slot_info_instance), + ) + .await?) + } + + async fn subscribe_archived_segment_headers( + &self, + ) -> Result + Send + 'static>>, NodeClientError> { + let subscription = self + .nats_client + .subscribe_to_broadcasts::(None, None) + .await? + .map(|broadcast| broadcast.archived_segment_header); + + Ok(Box::pin(subscription)) + } + + async fn segment_headers( + &self, + segment_indices: Vec, + ) -> Result>, NodeClientError> { + Ok(self + .nats_client + .request( + &ClusterControllerSegmentHeadersRequest { segment_indices }, + None, + ) + .await?) + } + + async fn piece(&self, piece_index: PieceIndex) -> Result, NodeClientError> { + Ok(self + .nats_client + .request(&ClusterControllerPieceRequest { piece_index }, None) + .await?) + } + + async fn acknowledge_archived_segment_header( + &self, + _segment_index: SegmentIndex, + ) -> Result<(), NodeClientError> { + // Acknowledgement is unnecessary/unsupported + Ok(()) + } +} + +// TODO: Ensure another controller with the same cache group doesn't already exist +/// Create controller service that handles things like broadcasting information (for example slot +/// notifications) as well as responding to incoming requests (like piece requests). +/// +/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times +/// per controller instance in order to parallelize more work across threads if needed. +pub async fn controller_service( + nats_client: &NatsClient, + node_client: &NC, + piece_getter: &PG, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, + PG: PieceGetter + Sync, +{ + select! { + result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = solution_response_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = farmer_app_info_responder(nats_client, node_client).fuse() => { + result + }, + result = segment_headers_responder(nats_client, node_client).fuse() => { + result + }, + result = piece_responder(nats_client, piece_getter).fuse() => { + result + }, + } +} + +async fn slot_info_broadcaster( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut slot_info_notifications = node_client + .subscribe_slot_info() + .await + .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?; + + while let Some(slot_info) = slot_info_notifications.next().await { + debug!(?slot_info, "New slot"); + + let slot = slot_info.slot_number; + + if let Err(error) = nats_client + .broadcast( + &ClusterControllerSlotInfoBroadcast { + slot_info, + instance: instance.to_string(), + }, + instance, + ) + .await + { + warn!(%slot, %error, "Failed to broadcast slot info"); + } + } + + Ok(()) +} + +async fn reward_signing_broadcaster( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut reward_signing_notifications = node_client + .subscribe_reward_signing() + .await + .map_err(|error| anyhow!("Failed to subscribe to reward signing notifications: {error}"))?; + + while let Some(reward_signing_info) = reward_signing_notifications.next().await { + trace!(?reward_signing_info, "New reward signing notification"); + + if let Err(error) = nats_client + .broadcast( + &ClusterControllerRewardSigningBroadcast { + reward_signing_info, + }, + instance, + ) + .await + { + warn!(%error, "Failed to broadcast reward signing info"); + } + } + + Ok(()) +} + +async fn archived_segment_headers_broadcaster( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut archived_segments_notifications = node_client + .subscribe_archived_segment_headers() + .await + .map_err(|error| { + anyhow!("Failed to subscribe to archived segment header notifications: {error}") + })?; + + while let Some(archived_segment_header) = archived_segments_notifications.next().await { + trace!( + ?archived_segment_header, + "New archived archived segment header notification" + ); + + node_client + .acknowledge_archived_segment_header(archived_segment_header.segment_index()) + .await + .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?; + + if let Err(error) = nats_client + .broadcast( + &ClusterControllerArchivedSegmentHeaderBroadcast { + archived_segment_header, + }, + instance, + ) + .await + { + warn!(%error, "Failed to broadcast archived segment header info"); + } + } + + Ok(()) +} + +async fn solution_response_forwarder( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .subscribe_to_notifications::( + Some(instance), + Some(instance.to_string()), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?; + + while let Some(notification) = subscription.next().await { + debug!(?notification, "Solution notification"); + + if let Err(error) = node_client + .submit_solution_response(notification.solution_response) + .await + { + warn!(%error, "Failed to send solution response"); + } + } + + Ok(()) +} + +async fn reward_signature_forwarder( + nats_client: &NatsClient, + node_client: &NC, + instance: &str, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .subscribe_to_notifications::( + Some(instance), + Some(instance.to_string()), + ) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to reward signature notifications: {error}") + })?; + + while let Some(notification) = subscription.next().await { + debug!(?notification, "Reward signature notification"); + + if let Err(error) = node_client + .submit_reward_signature(notification.reward_signature) + .await + { + warn!(%error, "Failed to send reward signature"); + } + } + + Ok(()) +} + +async fn farmer_app_info_responder( + nats_client: &NatsClient, + node_client: &NC, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .queue_subscribe( + ClusterControllerFarmerAppInfoRequest::SUBJECT, + "subspace.controller".to_string(), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to farmer app info requests: {error}"))?; + + let mut last_farmer_app_info: ::Response = node_client + .farmer_app_info() + .await + .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; + let mut last_farmer_app_info_request = Instant::now(); + + while let Some(message) = subscription.next().await { + trace!("Farmer app info request"); + + if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW { + match node_client.farmer_app_info().await { + Ok(new_last_farmer_app_info) => { + last_farmer_app_info = new_last_farmer_app_info; + last_farmer_app_info_request = Instant::now(); + } + Err(error) => { + warn!(%error, "Failed to get farmer app info"); + } + } + } + + if let Some(reply_subject) = message.reply { + if let Err(error) = nats_client + .publish(reply_subject, last_farmer_app_info.encode().into()) + .await + { + warn!(%error, "Failed to send farmer app info response"); + } + } + } + + Ok(()) +} + +async fn segment_headers_responder( + nats_client: &NatsClient, + node_client: &NC, +) -> anyhow::Result<()> +where + NC: NodeClient, +{ + let mut subscription = nats_client + .queue_subscribe( + ClusterControllerSegmentHeadersRequest::SUBJECT, + "subspace.controller".to_string(), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to segment headers requests: {error}"))?; + + let mut last_request_response = None::<( + ClusterControllerSegmentHeadersRequest, + ::Response, + )>; + + while let Some(message) = subscription.next().await { + let request = + match ClusterControllerSegmentHeadersRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode segment headers request" + ); + continue; + } + }; + trace!(?request, "Segment headers request"); + + let response = if let Some((last_request, response)) = &last_request_response + && last_request.segment_indices == request.segment_indices + { + response + } else { + match node_client + .segment_headers(request.segment_indices.clone()) + .await + { + Ok(segment_headers) => &last_request_response.insert((request, segment_headers)).1, + Err(error) => { + warn!( + %error, + segment_indices = ?request.segment_indices, + "Failed to get segment headers" + ); + continue; + } + } + }; + + if let Some(reply_subject) = message.reply { + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send farmer app info response"); + } + } + } + + Ok(()) +} + +// TODO: Smarter piece handling with requests for cached pieces being redirected to cache instances +// instead +async fn piece_responder(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()> +where + PG: PieceGetter + Sync, +{ + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + + let mut subscription = nats_client + .queue_subscribe( + ClusterControllerPieceRequest::SUBJECT, + "subspace.controller".to_string(), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to piece requests: {error}"))? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_piece_request( + nats_client, + piece_getter, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) +} + +async fn process_piece_request(nats_client: &NatsClient, piece_getter: &PG, message: Message) +where + PG: PieceGetter, +{ + let request = match ClusterControllerPieceRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode piece request" + ); + return; + } + }; + trace!(?request, "Piece request"); + + // TODO: It would be great to send cached pieces from cache instance directly to requested + // rather than proxying through controller, but it is awkward with current architecture + + let maybe_piece: ::Response = + match piece_getter.get_piece(request.piece_index).await { + Ok(maybe_piece) => maybe_piece, + Err(error) => { + warn!( + %error, + piece_index = %request.piece_index, + "Failed to get piece" + ); + return; + } + }; + + if let Some(reply_subject) = message.reply { + if let Err(error) = nats_client + .publish(reply_subject, maybe_piece.encode().into()) + .await + { + warn!(%error, "Failed to send farmer app info response"); + } + } +} diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs new file mode 100644 index 00000000000..1ed0c35ccd0 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -0,0 +1,835 @@ +//! Farming cluster farmer +//! +//! Farmer is responsible for maintaining farms, doing audits and generating proofs when solution is +//! found in one of the plots. +//! +//! This module exposes some data structures for NATS communication, custom farm implementation +//! designed to work with cluster farmer and a service function to drive the backend part +//! of the farmer. + +use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; +use crate::cluster::nats_client::{ + GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, +}; +use crate::farm::{ + Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, MaybePieceStoredResult, + PieceCache, PieceCacheOffset, PieceReader, PlotCache, PlottedSectors, SectorUpdate, +}; +use crate::utils::AsyncJoinOnDrop; +use anyhow::anyhow; +use async_nats::Message; +use async_trait::async_trait; +use event_listener_primitives::Bag; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::{select, stream, FutureExt, Stream, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use std::future::{pending, Future}; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use subspace_core_primitives::crypto::blake3_hash_list; +use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex, PieceOffset, SectorIndex}; +use subspace_farmer_components::plotting::PlottedSector; +use subspace_networking::libp2p::kad::RecordKey; +use subspace_rpc_primitives::SolutionResponse; +use tokio::time::MissedTickBehavior; +use tracing::{debug, error, trace, warn}; + +const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000; +const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); + +type Handler = Bag, A>; + +/// Broadcast with identification details by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerIdentifyFarmBroadcast { + /// Farm ID + pub farm_id: FarmId, + /// Total number of sectors in the farm + pub total_sectors_count: SectorIndex, + /// Farm fingerprint changes when something about farm changes (like allocated space) + pub fingerprint: Blake3Hash, +} + +impl GenericBroadcast for ClusterFarmerIdentifyFarmBroadcast { + const SUBJECT: &'static str = "subspace.farmer.*.identify"; +} + +/// Broadcast with sector updates by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerSectorUpdateBroadcast { + /// Farm ID + pub farm_id: FarmId, + /// Sector index + pub sector_index: SectorIndex, + /// Sector update + pub sector_update: SectorUpdate, +} + +impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast { + const SUBJECT: &'static str = "subspace.farmer.*.sector-update"; +} + +/// Broadcast with farming notifications by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerFarmingNotificationBroadcast { + /// Farm ID + pub farm_id: FarmId, + /// Farming notification + pub farming_notification: FarmingNotification, +} + +impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast { + const SUBJECT: &'static str = "subspace.farmer.*.farming-notification"; +} + +/// Broadcast with solutions by farmers +#[derive(Debug, Clone, Encode, Decode)] +pub struct ClusterFarmerSolutionBroadcast { + /// Farm ID + pub farm_id: FarmId, + /// Solution response + pub solution_response: SolutionResponse, +} + +impl GenericBroadcast for ClusterFarmerSolutionBroadcast { + const SUBJECT: &'static str = "subspace.farmer.*.solution-response"; +} + +/// Read piece from farm +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterFarmerReadPieceRequest { + sector_index: SectorIndex, + piece_offset: PieceOffset, +} + +impl GenericRequest for ClusterFarmerReadPieceRequest { + const SUBJECT: &'static str = "subspace.farmer.*.farm.read-piece"; + type Response = Result, String>; +} + +/// Request plotted sectors from farmer +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterFarmerPlottedSectorsRequest; + +impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest { + const SUBJECT: &'static str = "subspace.farmer.*.farm.plotted-sectors"; + type Response = Result; +} + +#[derive(Debug)] +struct ClusterPlottedSectors { + farm_id_string: String, + nats_client: NatsClient, +} + +#[async_trait] +impl PlottedSectors for ClusterPlottedSectors { + async fn get( + &self, + ) -> Result< + Box> + Unpin + Send + '_>, + FarmError, + > { + Ok(Box::new( + self.nats_client + .stream_request( + ClusterFarmerPlottedSectorsRequest, + Some(&self.farm_id_string), + ) + .await? + .map(|response| response.map_err(FarmError::from)), + )) + } +} + +#[derive(Debug)] +struct DummyPieceCache; + +#[async_trait] +impl PieceCache for DummyPieceCache { + #[inline] + fn max_num_elements(&self) -> u32 { + 0 + } + + #[inline] + async fn contents( + &self, + ) -> Result< + Box< + dyn Stream), FarmError>> + + Unpin + + Send + + '_, + >, + FarmError, + > { + Ok(Box::new(stream::empty())) + } + + #[inline] + async fn write_piece( + &self, + _offset: PieceCacheOffset, + _piece_index: PieceIndex, + _piece: &Piece, + ) -> Result<(), FarmError> { + Err("Can't write pieces into empty cache".into()) + } + + #[inline] + async fn read_piece_index( + &self, + _offset: PieceCacheOffset, + ) -> Result, FarmError> { + Ok(None) + } + + #[inline] + async fn read_piece(&self, _offset: PieceCacheOffset) -> Result, FarmError> { + Ok(None) + } +} + +#[derive(Debug)] +struct DummyPlotCache; + +#[async_trait] +impl PlotCache for DummyPlotCache { + async fn is_piece_maybe_stored( + &self, + _key: &RecordKey, + ) -> Result { + Ok(MaybePieceStoredResult::No) + } + + async fn try_store_piece( + &self, + _piece_index: PieceIndex, + _piece: &Piece, + ) -> Result { + Ok(false) + } + + async fn read_piece(&self, _key: &RecordKey) -> Result, FarmError> { + Ok(None) + } +} + +#[derive(Debug)] +struct ClusterPieceReader { + farm_id_string: String, + nats_client: NatsClient, +} + +#[async_trait] +impl PieceReader for ClusterPieceReader { + async fn read_piece( + &self, + sector_index: SectorIndex, + piece_offset: PieceOffset, + ) -> Result, FarmError> { + Ok(self + .nats_client + .request( + &ClusterFarmerReadPieceRequest { + sector_index, + piece_offset, + }, + Some(&self.farm_id_string), + ) + .await??) + } +} + +#[derive(Default, Debug)] +struct Handlers { + sector_update: Handler<(SectorIndex, SectorUpdate)>, + farming_notification: Handler, + solution: Handler, +} + +/// Cluster farm implementation +#[derive(Debug)] +pub struct ClusterFarm { + farm_id: FarmId, + farm_id_string: String, + total_sectors_count: SectorIndex, + nats_client: NatsClient, + handlers: Arc, + background_tasks: AsyncJoinOnDrop<()>, +} + +#[async_trait(?Send)] +impl Farm for ClusterFarm { + fn id(&self) -> &FarmId { + &self.farm_id + } + + fn total_sectors_count(&self) -> SectorIndex { + self.total_sectors_count + } + + fn plotted_sectors(&self) -> Arc { + Arc::new(ClusterPlottedSectors { + farm_id_string: self.farm_id_string.clone(), + nats_client: self.nats_client.clone(), + }) + } + + fn piece_cache(&self) -> Arc { + Arc::new(DummyPieceCache) + } + + fn plot_cache(&self) -> Arc { + Arc::new(DummyPlotCache) + } + + fn piece_reader(&self) -> Arc { + Arc::new(ClusterPieceReader { + farm_id_string: self.farm_id_string.clone(), + nats_client: self.nats_client.clone(), + }) + } + + fn on_sector_update( + &self, + callback: HandlerFn<(SectorIndex, SectorUpdate)>, + ) -> Box { + Box::new(self.handlers.sector_update.add(callback)) + } + + fn on_farming_notification( + &self, + callback: HandlerFn, + ) -> Box { + Box::new(self.handlers.farming_notification.add(callback)) + } + + fn on_solution(&self, callback: HandlerFn) -> Box { + Box::new(self.handlers.solution.add(callback)) + } + + fn run(self: Box) -> Pin> + Send>> { + Box::pin(async move { Ok(self.background_tasks.await?) }) + } +} + +impl ClusterFarm { + /// Create new instance using information from previously received + /// [`ClusterFarmerIdentifyFarmBroadcast`] + pub async fn new( + farm_id: FarmId, + total_sectors_count: SectorIndex, + nats_client: NatsClient, + ) -> anyhow::Result { + let farm_id_string = farm_id.to_string(); + + let sector_updates_subscription = nats_client + .subscribe_to_broadcasts::( + Some(&farm_id_string), + None, + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?; + let farming_notifications_subscription = nats_client + .subscribe_to_broadcasts::( + Some(&farm_id_string), + None, + ) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to farming notifications broadcast: {error}") + })?; + let solution_subscription = nats_client + .subscribe_to_broadcasts::(Some(&farm_id_string), None) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to solution responses broadcast: {error}") + })?; + + let handlers = Arc::::default(); + // Run background tasks and fire corresponding notifications + let background_tasks = { + let handlers = Arc::clone(&handlers); + + async move { + let mut sector_updates_subscription = pin!(sector_updates_subscription); + let mut farming_notifications_subscription = + pin!(farming_notifications_subscription); + let mut solution_subscription = pin!(solution_subscription); + + let sector_updates_fut = async { + while let Some(ClusterFarmerSectorUpdateBroadcast { + sector_index, + sector_update, + .. + }) = sector_updates_subscription.next().await + { + handlers + .sector_update + .call_simple(&(sector_index, sector_update)); + } + }; + let farming_notifications_fut = async { + while let Some(ClusterFarmerFarmingNotificationBroadcast { + farming_notification, + .. + }) = farming_notifications_subscription.next().await + { + handlers + .farming_notification + .call_simple(&farming_notification); + } + }; + let solutions_fut = async { + while let Some(ClusterFarmerSolutionBroadcast { + solution_response, .. + }) = solution_subscription.next().await + { + handlers.solution.call_simple(&solution_response); + } + }; + + select! { + _ = sector_updates_fut.fuse() => {} + _ = farming_notifications_fut.fuse() => {} + _ = solutions_fut.fuse() => {} + } + } + }; + + Ok(Self { + farm_id, + farm_id_string, + total_sectors_count, + nats_client, + handlers, + background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true), + }) + } +} + +#[derive(Debug)] +struct FarmDetails { + farm_id: FarmId, + farm_id_string: String, + total_sectors_count: SectorIndex, + piece_reader: Arc, + plotted_sectors: Arc, + _background_tasks: AsyncJoinOnDrop<()>, +} + +/// Create farmer service for specified farms that will be processing incoming requests and send +/// periodic identify notifications. +/// +/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times +/// per controller instance in order to parallelize more work across threads if needed. +pub fn farmer_service( + nats_client: NatsClient, + farms: &[F], + identification_broadcast_interval: Duration, +) -> impl Future> + Send + 'static +where + F: Farm, +{ + // For each farm start forwarding notifications as broadcast messages and create farm details + // that can be used to respond to incoming requests + let farms_details = farms + .iter() + .map(|farm| { + let farm_id = *farm.id(); + let nats_client = nats_client.clone(); + + let (sector_updates_sender, mut sector_updates_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + let (farming_notifications_sender, mut farming_notifications_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + let (solutions_sender, mut solutions_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + + let sector_updates_handler_id = + farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| { + if let Err(error) = + sector_updates_sender + .clone() + .try_send(ClusterFarmerSectorUpdateBroadcast { + farm_id, + sector_index: *sector_index, + sector_update: sector_update.clone(), + }) + { + warn!(%farm_id, %error, "Failed to send sector update notification"); + } + })); + + let farming_notifications_handler_id = + farm.on_farming_notification(Arc::new(move |farming_notification| { + if let Err(error) = farming_notifications_sender.clone().try_send( + ClusterFarmerFarmingNotificationBroadcast { + farm_id, + farming_notification: farming_notification.clone(), + }, + ) { + warn!(%farm_id, %error, "Failed to send farming notification"); + } + })); + + let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| { + if let Err(error) = + solutions_sender + .clone() + .try_send(ClusterFarmerSolutionBroadcast { + farm_id, + solution_response: solution_response.clone(), + }) + { + warn!(%farm_id, %error, "Failed to send solution notification"); + } + })); + + let background_tasks = AsyncJoinOnDrop::new( + tokio::spawn(async move { + let farm_id_string = farm_id.to_string(); + + let sector_updates_fut = async { + while let Some(broadcast) = sector_updates_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast sector update"); + } + } + }; + let farming_notifications_fut = async { + while let Some(broadcast) = farming_notifications_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast farming notification"); + } + } + }; + let solutions_fut = async { + while let Some(broadcast) = solutions_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast solution"); + } + } + }; + + select! { + _ = sector_updates_fut.fuse() => {} + _ = farming_notifications_fut.fuse() => {} + _ = solutions_fut.fuse() => {} + } + + drop(sector_updates_handler_id); + drop(farming_notifications_handler_id); + drop(solutions_handler_id); + }), + true, + ); + + FarmDetails { + farm_id, + farm_id_string: farm_id.to_string(), + total_sectors_count: farm.total_sectors_count(), + piece_reader: farm.piece_reader(), + plotted_sectors: farm.plotted_sectors(), + _background_tasks: background_tasks, + } + }) + .collect::>(); + + async move { + select! { + result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { + result + }, + result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { + result + }, + result = read_piece_responder(&nats_client, &farms_details).fuse() => { + result + }, + } + } +} + +// Listen for farmer identification broadcast from controller and publish identification +// broadcast in response, also send periodic notifications reminding that farm exists +async fn identify_responder( + nats_client: &NatsClient, + farms_details: &[FarmDetails], + identification_broadcast_interval: Duration, +) -> anyhow::Result<()> { + let mut subscription = nats_client + .subscribe_to_broadcasts::( + None, + // Use the first farm as a queue group. Doesn't matter what we use, just needs to be + // deterministic. + farms_details + .first() + .map(|farm_details| farm_details.farm_id_string.clone()), + ) + .await + .map_err(|error| { + anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}") + })? + .fuse(); + // Also send periodic updates in addition to the subscription response + let mut interval = tokio::time::interval(identification_broadcast_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let mut last_identification = Instant::now(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + debug!("Identify broadcast stream ended"); + break; + }; + + trace!(?message, "Farmer received identify broadcast message"); + + if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL { + // Skip too frequent identification requests + continue; + } + + last_identification = Instant::now(); + send_identify_broadcast(nats_client, farms_details).await; + interval.reset(); + } + _ = interval.tick().fuse() => { + last_identification = Instant::now(); + trace!("Farmer self-identification"); + + send_identify_broadcast(nats_client, farms_details).await; + } + } + } + + Ok(()) +} + +async fn send_identify_broadcast(nats_client: &NatsClient, farms_details: &[FarmDetails]) { + farms_details + .iter() + .map(|farm_details| async move { + if let Err(error) = nats_client + .broadcast( + &ClusterFarmerIdentifyFarmBroadcast { + farm_id: farm_details.farm_id, + total_sectors_count: farm_details.total_sectors_count, + fingerprint: blake3_hash_list(&[ + &farm_details.farm_id.encode(), + &farm_details.total_sectors_count.to_le_bytes(), + ]), + }, + &farm_details.farm_id_string, + ) + .await + { + warn!( + farm_id = %farm_details.farm_id, + %error, + "Failed to send farmer identify notification" + ); + } + }) + .collect::>() + .collect::>() + .await; +} + +async fn plotted_sectors_responder( + nats_client: &NatsClient, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + farms_details + .iter() + .map(|farm_details| async move { + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin + Send>> + ]); + let mut subscription = nats_client + .subscribe_to_stream_requests( + Some(&farm_details.farm_id_string), + Some(farm_details.farm_id_string.clone()), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to plotted sectors requests for farm {}: {}", + farm_details.farm_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_plotted_sectors_request( + nats_client, + farm_details, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No farms"))? +} + +async fn process_plotted_sectors_request( + nats_client: &NatsClient, + farm_details: &FarmDetails, + request: StreamRequest, +) { + trace!(?request, "Plotted sectors request"); + + match farm_details.plotted_sectors.get().await { + Ok(plotted_sectors) => { + nats_client + .stream_response::( + request.response_subject, + plotted_sectors.map(|maybe_plotted_sector| { + maybe_plotted_sector.map_err(|error| error.to_string()) + }), + ) + .await; + } + Err(error) => { + error!( + %error, + farm_id = %farm_details.farm_id, + "Failed to get plotted sectors" + ); + + nats_client + .stream_response::( + request.response_subject, + pin!(stream::once(async move { + Err(format!("Failed to get plotted sectors: {error}")) + })), + ) + .await; + } + } +} + +async fn read_piece_responder( + nats_client: &NatsClient, + farms_details: &[FarmDetails], +) -> anyhow::Result<()> { + farms_details + .iter() + .map(|farm_details| async move { + // Initialize with pending future so it never ends + let mut processing = + FuturesUnordered:: + Send>>>::from_iter([ + Box::pin(pending()) as Pin>, + ]); + let mut subscription = nats_client + .queue_subscribe( + ClusterFarmerReadPieceRequest::SUBJECT + .replace('*', &farm_details.farm_id_string), + farm_details.farm_id_string.clone(), + ) + .await + .map_err(|error| { + anyhow!( + "Failed to subscribe to read piece requests for farm {}: {}", + farm_details.farm_id, + error + ) + })? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_read_piece_request( + nats_client, + farm_details, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) + }) + .collect::>() + .next() + .await + .ok_or_else(|| anyhow!("No farms"))? +} + +async fn process_read_piece_request( + nats_client: &NatsClient, + farm_details: &FarmDetails, + message: Message, +) { + let Some(reply_subject) = message.reply else { + return; + }; + + let ClusterFarmerReadPieceRequest { + sector_index, + piece_offset, + } = match ClusterFarmerReadPieceRequest::decode(&mut message.payload.as_ref()) { + Ok(request) => request, + Err(error) => { + warn!( + %error, + message = %hex::encode(message.payload), + "Failed to decode read piece request" + ); + return; + } + }; + + let response: ::Response = farm_details + .piece_reader + .read_piece(sector_index, piece_offset) + .await + .map_err(|error| error.to_string()); + + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send read piece response"); + } +} diff --git a/crates/subspace-farmer/src/cluster/plotter.rs b/crates/subspace-farmer/src/cluster/plotter.rs new file mode 100644 index 00000000000..4230de2d1c1 --- /dev/null +++ b/crates/subspace-farmer/src/cluster/plotter.rs @@ -0,0 +1,960 @@ +//! Farming cluster plotter +//! +//! Plotter is responsible for plotting sectors in response to farmer requests. +//! +//! This module exposes some data structures for NATS communication, custom plotter +//! implementation designed to work with cluster plotter and a service function to drive the backend +//! part of the plotter. + +use crate::cluster::nats_client::{ + GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, +}; +use crate::plotter::{Plotter, SectorPlottingProgress}; +use crate::utils::AsyncJoinOnDrop; +use anyhow::anyhow; +use async_trait::async_trait; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use derive_more::Display; +use event_listener_primitives::{Bag, HandlerId}; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use std::error::Error; +use std::future::{pending, Future}; +use std::num::NonZeroUsize; +use std::pin::{pin, Pin}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use subspace_core_primitives::{PublicKey, SectorIndex}; +use subspace_farmer_components::plotting::PlottedSector; +use subspace_farmer_components::FarmerProtocolInfo; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::time::MissedTickBehavior; +use tracing::{debug, info, info_span, trace, warn, Instrument}; +use ulid::Ulid; + +const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1); +const PING_INTERVAL: Duration = Duration::from_secs(5); + +pub type HandlerFn3 = Arc; +type Handler3 = Bag, A, B, C>; + +/// An ephemeral identifier for a plotter +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)] +pub enum ClusterPlotterId { + Ulid(Ulid), +} + +#[allow(clippy::new_without_default)] +impl ClusterPlotterId { + /// Creates new ID + pub fn new() -> Self { + Self::Ulid(Ulid::new()) + } +} + +/// Request for free plotter instance +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterPlotterFreeInstanceRequest; + +impl GenericRequest for ClusterPlotterFreeInstanceRequest { + const SUBJECT: &'static str = "subspace.plotter.free-instance"; + /// Might be `None` if instance had to respond, but turned out it was fully occupied already + type Response = Option; +} + +#[derive(Debug, Encode, Decode)] +enum ClusterSectorPlottingProgress { + /// Plotter is already fully occupied with other work + Occupied, + /// Periodic ping indicating plotter is still busy + Ping, + /// Downloading sector pieces + Downloading, + /// Downloaded sector pieces + Downloaded(Duration), + /// Encoding sector pieces + Encoding, + /// Encoded sector pieces + Encoded(Duration), + /// Finished plotting, followed by a series of sector chunks + Finished { + /// Information about plotted sector + plotted_sector: PlottedSector, + /// How much time it took to plot a sector + time: Duration, + }, + /// Sector chunk after finished plotting + SectorChunk(Result, String>), + /// Plotting failed + Error { + /// Error message + error: String, + }, +} + +/// Request to plot sector from plotter +#[derive(Debug, Clone, Encode, Decode)] +struct ClusterPlotterPlotSectorRequest { + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, +} + +impl GenericStreamRequest for ClusterPlotterPlotSectorRequest { + const SUBJECT: &'static str = "subspace.plotter.*.plot-sector"; + type Response = ClusterSectorPlottingProgress; +} + +#[derive(Default, Debug)] +struct Handlers { + plotting_progress: Handler3, +} + +/// Cluster plotter +pub struct ClusterPlotter { + sector_encoding_semaphore: Arc, + retry_backoff_policy: ExponentialBackoff, + nats_client: NatsClient, + handlers: Arc, + tasks_sender: mpsc::Sender>, + _background_tasks: AsyncJoinOnDrop<()>, +} + +impl Drop for ClusterPlotter { + #[inline] + fn drop(&mut self) { + self.tasks_sender.close_channel(); + } +} + +#[async_trait] +impl Plotter for ClusterPlotter { + async fn has_free_capacity(&self) -> Result { + Ok(self.sector_encoding_semaphore.available_permits() > 0 + && self + .nats_client + .request(&ClusterPlotterFreeInstanceRequest, None) + .await + .map_err(|error| error.to_string())? + .is_some()) + } + + async fn plot_sector( + &self, + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, + _replotting: bool, + mut progress_sender: PS, + ) where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, + { + let start = Instant::now(); + + // Done outside the future below as a backpressure, ensuring that it is not possible to + // schedule unbounded number of plotting tasks + let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore) + .acquire_owned() + .await + { + Ok(sector_encoding_permit) => sector_encoding_permit, + Err(error) => { + warn!(%error, "Failed to acquire sector encoding permit"); + + let progress_updater = ProgressUpdater { + public_key, + sector_index, + handlers: Arc::clone(&self.handlers), + }; + + progress_updater + .update_progress_and_events( + &mut progress_sender, + SectorPlottingProgress::Error { + error: format!("Failed to acquire sector encoding permit: {error}"), + }, + ) + .await; + + return; + } + }; + + self.plot_sector_internal( + start, + sector_encoding_permit, + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + progress_sender, + ) + .await + } + + async fn try_plot_sector( + &self, + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, + _replotting: bool, + progress_sender: PS, + ) -> bool + where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, + { + let start = Instant::now(); + + let Ok(sector_encoding_permit) = + Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned() + else { + return false; + }; + + self.plot_sector_internal( + start, + sector_encoding_permit, + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + progress_sender, + ) + .await; + + true + } +} + +impl ClusterPlotter { + /// Create new instance + pub fn new( + nats_client: NatsClient, + sector_encoding_concurrency: NonZeroUsize, + retry_backoff_policy: ExponentialBackoff, + ) -> Self { + let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get())); + + let (tasks_sender, mut tasks_receiver) = mpsc::channel(1); + + // Basically runs plotting tasks in the background and allows to abort on drop + let background_tasks = AsyncJoinOnDrop::new( + tokio::spawn(async move { + let background_tasks = FuturesUnordered::new(); + let mut background_tasks = pin!(background_tasks); + // Just so that `FuturesUnordered` will never end + background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true)); + + loop { + select! { + maybe_background_task = tasks_receiver.next().fuse() => { + let Some(background_task) = maybe_background_task else { + break; + }; + + background_tasks.push(background_task); + }, + _ = background_tasks.select_next_some() => { + // Nothing to do + } + } + } + }), + true, + ); + + Self { + sector_encoding_semaphore, + retry_backoff_policy, + nats_client, + handlers: Arc::default(), + tasks_sender, + _background_tasks: background_tasks, + } + } + + /// Subscribe to plotting progress notifications + pub fn on_plotting_progress( + &self, + callback: HandlerFn3, + ) -> HandlerId { + self.handlers.plotting_progress.add(callback) + } + + #[allow(clippy::too_many_arguments)] + async fn plot_sector_internal( + &self, + start: Instant, + sector_encoding_permit: OwnedSemaphorePermit, + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, + mut progress_sender: PS, + ) where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, + { + let span = info_span!("", %public_key, %sector_index); + let _span_guard = span.enter(); + + trace!("Starting plotting, getting plotting permit"); + + let progress_updater = ProgressUpdater { + public_key, + sector_index, + handlers: Arc::clone(&self.handlers), + }; + + let mut retry_backoff_policy = self.retry_backoff_policy.clone(); + retry_backoff_policy.reset(); + + // Try to get plotter instance here first as a backpressure measure + let free_plotter_instance_fut = get_free_plotter_instance( + &self.nats_client, + &progress_updater, + &mut progress_sender, + &mut retry_backoff_policy, + ); + let mut maybe_free_instance = free_plotter_instance_fut.await; + if maybe_free_instance.is_none() { + return; + } + + trace!("Got plotting permit #1"); + + let nats_client = self.nats_client.clone(); + + let plotting_fut = async move { + 'outer: loop { + // Take free instance that was found earlier if available or try to find a new one + let free_instance = match maybe_free_instance.take() { + Some(free_instance) => free_instance, + None => { + let free_plotter_instance_fut = get_free_plotter_instance( + &nats_client, + &progress_updater, + &mut progress_sender, + &mut retry_backoff_policy, + ); + let Some(free_instance) = free_plotter_instance_fut.await else { + break; + }; + trace!("Got plotting permit #2"); + free_instance + } + }; + + let response_stream_result = nats_client + .stream_request( + ClusterPlotterPlotSectorRequest { + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + }, + Some(&free_instance), + ) + .await; + trace!("Subscribed to plotting notifications"); + + let mut response_stream = match response_stream_result { + Ok(response_stream) => response_stream, + Err(error) => { + progress_updater + .update_progress_and_events( + &mut progress_sender, + SectorPlottingProgress::Error { + error: format!("Failed make stream request: {error}"), + }, + ) + .await; + + break; + } + }; + + let (mut sector_sender, sector_receiver) = mpsc::channel(1); + let mut maybe_sector_receiver = Some(sector_receiver); + loop { + match tokio::time::timeout(PING_INTERVAL * 2, response_stream.next()).await { + Ok(Some(response)) => { + match process_response_notification( + &start, + &free_instance, + &progress_updater, + &mut progress_sender, + &mut retry_backoff_policy, + response, + &mut sector_sender, + &mut maybe_sector_receiver, + ) + .await + { + ResponseProcessingResult::Retry => { + debug!("Retrying"); + continue 'outer; + } + ResponseProcessingResult::Abort => { + debug!("Aborting"); + break 'outer; + } + ResponseProcessingResult::Continue => { + // Nothing to do + } + } + } + Ok(None) => { + trace!("Plotting done"); + break; + } + Err(_error) => { + progress_updater + .update_progress_and_events( + &mut progress_sender, + SectorPlottingProgress::Error { + error: "Timed out without ping from plotter".to_string(), + }, + ) + .await; + break; + } + } + } + + break; + } + + drop(sector_encoding_permit); + }; + + let plotting_task = + AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.instrument(span.clone())), true); + if let Err(error) = self.tasks_sender.clone().send(plotting_task).await { + warn!(%error, "Failed to send plotting task"); + + let progress = SectorPlottingProgress::Error { + error: format!("Failed to send plotting task: {error}"), + }; + + self.handlers + .plotting_progress + .call_simple(&public_key, §or_index, &progress); + } + } +} + +// Try to get free plotter instance and return `None` if it is not possible +async fn get_free_plotter_instance( + nats_client: &NatsClient, + progress_updater: &ProgressUpdater, + progress_sender: &mut PS, + retry_backoff_policy: &mut ExponentialBackoff, +) -> Option +where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, +{ + loop { + match nats_client + .request(&ClusterPlotterFreeInstanceRequest, None) + .await + { + Ok(Some(free_instance)) => { + return Some(free_instance); + } + Ok(None) => { + if let Some(delay) = retry_backoff_policy.next_backoff() { + debug!("Instance was occupied, retrying #1"); + + tokio::time::sleep(delay).await; + continue; + } else { + progress_updater + .update_progress_and_events( + progress_sender, + SectorPlottingProgress::Error { + error: "Instance was occupied, exiting #1".to_string(), + }, + ) + .await; + return None; + } + } + // TODO: Handle different kinds of errors differently, not all of them are + // fatal + Err(error) => { + progress_updater + .update_progress_and_events( + progress_sender, + SectorPlottingProgress::Error { + error: format!("Failed to get free plotter instance: {error}"), + }, + ) + .await; + return None; + } + }; + } +} + +enum ResponseProcessingResult { + Retry, + Abort, + Continue, +} + +#[allow(clippy::too_many_arguments)] +async fn process_response_notification( + start: &Instant, + free_instance: &str, + progress_updater: &ProgressUpdater, + progress_sender: &mut PS, + retry_backoff_policy: &mut ExponentialBackoff, + response: ClusterSectorPlottingProgress, + sector_sender: &mut mpsc::Sender, String>>, + maybe_sector_receiver: &mut Option, String>>>, +) -> ResponseProcessingResult +where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, +{ + match response { + ClusterSectorPlottingProgress::Occupied => { + debug!(%free_instance, "Instance was occupied, retrying #2"); + + if let Some(delay) = retry_backoff_policy.next_backoff() { + debug!("Instance was occupied, retrying #2"); + + tokio::time::sleep(delay).await; + return ResponseProcessingResult::Retry; + } else { + debug!("Instance was occupied, exiting #2"); + return ResponseProcessingResult::Abort; + } + } + ClusterSectorPlottingProgress::Ping => { + // Expected + } + ClusterSectorPlottingProgress::Downloading => { + if !progress_updater + .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading) + .await + { + return ResponseProcessingResult::Abort; + } + } + ClusterSectorPlottingProgress::Downloaded(time) => { + if !progress_updater + .update_progress_and_events( + progress_sender, + SectorPlottingProgress::Downloaded(time), + ) + .await + { + return ResponseProcessingResult::Abort; + } + } + ClusterSectorPlottingProgress::Encoding => { + if !progress_updater + .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding) + .await + { + return ResponseProcessingResult::Abort; + } + } + ClusterSectorPlottingProgress::Encoded(time) => { + if !progress_updater + .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time)) + .await + { + return ResponseProcessingResult::Abort; + } + } + ClusterSectorPlottingProgress::Finished { + plotted_sector, + time: _, + } => { + let Some(sector_receiver) = maybe_sector_receiver.take() else { + debug!("Unexpected duplicated sector plotting progress Finished"); + + progress_updater + .update_progress_and_events( + progress_sender, + SectorPlottingProgress::Error { + error: "Unexpected duplicated sector plotting progress Finished" + .to_string(), + }, + ) + .await; + return ResponseProcessingResult::Abort; + }; + + let progress = SectorPlottingProgress::Finished { + plotted_sector, + // Use local time instead of reported by remote plotter + time: start.elapsed(), + sector: Box::pin(sector_receiver), + }; + if !progress_updater + .update_progress_and_events(progress_sender, progress) + .await + { + return ResponseProcessingResult::Abort; + } + + return ResponseProcessingResult::Continue; + } + // This variant must be sent after Finished and it handled above + ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => { + if let Err(error) = sector_sender.send(maybe_sector_chunk).await { + warn!(%error, "Failed to send sector chunk"); + return ResponseProcessingResult::Abort; + } + return ResponseProcessingResult::Continue; + } + ClusterSectorPlottingProgress::Error { error } => { + if !progress_updater + .update_progress_and_events( + progress_sender, + SectorPlottingProgress::Error { error }, + ) + .await + { + return ResponseProcessingResult::Abort; + } + } + } + + ResponseProcessingResult::Continue +} + +struct ProgressUpdater { + public_key: PublicKey, + sector_index: SectorIndex, + handlers: Arc, +} + +impl ProgressUpdater { + /// Returns `true` on success and `false` if progress receiver channel is gone + async fn update_progress_and_events( + &self, + progress_sender: &mut PS, + progress: SectorPlottingProgress, + ) -> bool + where + PS: Sink + Unpin, + PS::Error: Error, + { + self.handlers.plotting_progress.call_simple( + &self.public_key, + &self.sector_index, + &progress, + ); + + if let Err(error) = progress_sender.send(progress).await { + warn!(%error, "Failed to send error progress update"); + + false + } else { + true + } + } +} + +/// Create plotter service that will be processing incoming requests. +/// +/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times +/// per controller instance in order to parallelize more work across threads if needed. +pub async fn plotter_service

(nats_client: &NatsClient, cpu_plotter: &P) -> anyhow::Result<()> +where + P: Plotter + Sync, +{ + let plotter_id = ClusterPlotterId::new(); + + select! { + result = free_instance_responder(&plotter_id, nats_client, cpu_plotter).fuse() => { + result + } + result = plot_sector_responder(&plotter_id, nats_client, cpu_plotter).fuse() => { + result + } + } +} + +async fn free_instance_responder

( + plotter_id: &ClusterPlotterId, + nats_client: &NatsClient, + cpu_plotter: &P, +) -> anyhow::Result<()> +where + P: Plotter + Sync, +{ + loop { + while !cpu_plotter.has_free_capacity().await.unwrap_or_default() { + tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await; + } + + let mut subscription = nats_client + .queue_subscribe( + ClusterPlotterFreeInstanceRequest::SUBJECT, + "subspace.plotter".to_string(), + ) + .await + .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?; + + while let Some(message) = subscription.next().await { + let Some(reply_subject) = message.reply else { + continue; + }; + + let has_free_capacity = cpu_plotter.has_free_capacity().await.unwrap_or_default(); + let response: ::Response = + has_free_capacity.then(|| plotter_id.to_string()); + + if let Err(error) = nats_client + .publish(reply_subject, response.encode().into()) + .await + { + warn!(%error, "Failed to send free instance response"); + } + + if !has_free_capacity { + subscription.unsubscribe().await.map_err(|error| { + anyhow!("Failed to unsubscribe from free instance requests: {error}") + })?; + } + } + } +} + +async fn plot_sector_responder

( + plotter_id: &ClusterPlotterId, + nats_client: &NatsClient, + cpu_plotter: &P, +) -> anyhow::Result<()> +where + P: Plotter + Sync, +{ + let plotter_id_string = plotter_id.to_string(); + + // Initialize with pending future so it never ends + let mut processing = FuturesUnordered::from_iter([ + Box::pin(pending()) as Pin + Send>> + ]); + let mut subscription = nats_client + .subscribe_to_stream_requests(Some(&plotter_id_string), Some(plotter_id_string.clone())) + .await + .map_err(|error| anyhow!("Failed to subscribe to plot sector requests: {}", error))? + .fuse(); + + loop { + select! { + maybe_message = subscription.next() => { + let Some(message) = maybe_message else { + break; + }; + + // Create background task for concurrent processing + processing.push(Box::pin(process_plot_sector_request( + nats_client, + cpu_plotter, + message, + ))); + } + _ = processing.next() => { + // Nothing to do here + } + } + } + + Ok(()) +} + +async fn process_plot_sector_request

( + nats_client: &NatsClient, + cpu_plotter: &P, + request: StreamRequest, +) where + P: Plotter, +{ + let StreamRequest { + request: + ClusterPlotterPlotSectorRequest { + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + }, + response_subject, + } = request; + + // Wrapper future just for instrumentation below + let inner_fut = async { + info!("Plot sector request"); + + let (progress_sender, mut progress_receiver) = mpsc::channel(1); + + if !cpu_plotter + .try_plot_sector( + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + false, + progress_sender, + ) + .await + { + debug!("Plotter is currently occupied and can't plot more sectors"); + + nats_client + .stream_response::( + response_subject, + pin!(stream::once(async move { + ClusterSectorPlottingProgress::Occupied + })), + ) + .await; + return; + } + + let (mut response_proxy_sender, response_proxy_receiver) = mpsc::channel(0); + + let response_streaming_fut = nats_client + .stream_response::( + response_subject, + response_proxy_receiver, + ) + .fuse(); + let mut response_streaming_fut = pin!(response_streaming_fut); + let progress_proxy_fut = { + let mut response_proxy_sender = response_proxy_sender.clone(); + let approximate_max_message_size = nats_client.approximate_max_message_size(); + + async move { + while let Some(progress) = progress_receiver.next().await { + send_publish_progress( + &mut response_proxy_sender, + progress, + approximate_max_message_size, + ) + .await; + } + } + }; + + let mut ping_interval = tokio::time::interval(PING_INTERVAL); + ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let ping_fut = async { + loop { + ping_interval.tick().await; + if let Err(error) = response_proxy_sender + .send(ClusterSectorPlottingProgress::Ping) + .await + { + warn!(%error, "Failed to send plotting ping"); + return; + } + } + }; + + select! { + _ = response_streaming_fut => { + warn!("Response sending ended early"); + + return; + } + _ = progress_proxy_fut.fuse() => { + // Done + } + _ = ping_fut.fuse() => { + unreachable!("Ping loop never ends"); + } + } + + // Drain remaining progress messages + response_streaming_fut.await; + + info!("Finished plotting sector successfully"); + }; + + inner_fut + .instrument(info_span!("", %public_key, %sector_index)) + .await +} + +async fn send_publish_progress( + response_sender: &mut mpsc::Sender, + progress: SectorPlottingProgress, + approximate_max_message_size: usize, +) { + // Finished response is large and needs special care + let cluster_progress = match progress { + SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading, + SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time), + SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding, + SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time), + SectorPlottingProgress::Finished { + plotted_sector, + time, + mut sector, + } => { + if let Err(error) = response_sender + .send(ClusterSectorPlottingProgress::Finished { + plotted_sector, + time, + }) + .await + { + warn!(%error, "Failed to send plotting progress"); + return; + } + + while let Some(maybe_sector_chunk) = sector.next().await { + match maybe_sector_chunk { + Ok(sector_chunk) => { + // Slice large chunks into smaller ones before publishing + for sector_chunk in sector_chunk.chunks(approximate_max_message_size) { + if let Err(error) = response_sender + .send(ClusterSectorPlottingProgress::SectorChunk(Ok( + sector_chunk.to_vec() + ))) + .await + { + warn!(%error, "Failed to send plotting progress"); + return; + } + } + } + Err(error) => { + if let Err(error) = response_sender + .send(ClusterSectorPlottingProgress::SectorChunk(Err(error))) + .await + { + warn!(%error, "Failed to send plotting progress"); + return; + } + } + } + } + + response_sender.close_channel(); + + return; + } + SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error }, + }; + + if let Err(error) = response_sender.send(cluster_progress).await { + warn!(%error, "Failed to send plotting progress"); + } +}