From a48c5a42ec004ee4e32140f89220f2abba07eb5f Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sun, 16 Jun 2024 07:36:47 +0300 Subject: [PATCH] Fix farming cluster forwarders and identification intervals --- .../subspace-farmer/commands/cluster/cache.rs | 8 +- .../commands/cluster/controller.rs | 3 +- .../commands/cluster/farmer.rs | 8 +- .../commands/cluster/plotter.rs | 26 +-- .../src/bin/subspace-farmer/main.rs | 1 + crates/subspace-farmer/src/cluster/cache.rs | 68 ++++-- .../subspace-farmer/src/cluster/controller.rs | 74 ++++--- crates/subspace-farmer/src/cluster/farmer.rs | 194 ++++++++++-------- .../src/cluster/nats_client.rs | 5 + 9 files changed, 216 insertions(+), 171 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs index 6d22d845a5..d5a4f06fe2 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs @@ -188,12 +188,8 @@ pub(super) async fn cache( nats_client, &caches, &cache_group, - // Only one of the tasks needs to send periodic broadcast - if index == 0 { - CACHE_IDENTIFICATION_BROADCAST_INTERVAL - } else { - Duration::MAX - }, + CACHE_IDENTIFICATION_BROADCAST_INTERVAL, + index == 0, ) .await }), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index 8159d71973..ef249bcdf7 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -196,7 +196,7 @@ pub(super) async fn controller( )?; let mut controller_services = (0..service_instances.get()) - .map(|_| { + .map(|index| { let nats_client = nats_client.clone(); let node_client = node_client.clone(); let piece_getter = piece_getter.clone(); @@ -211,6 +211,7 @@ pub(super) async fn controller( &piece_getter, &farmer_cache, &instance, + index == 0, ) .await }), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs index 2455d4bd19..a86fed607d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -367,12 +367,8 @@ where tokio::spawn(farmer_service( nats_client.clone(), farms.as_slice(), - // Only one of the tasks needs to send periodic broadcast - if index == 0 { - FARMER_IDENTIFICATION_BROADCAST_INTERVAL - } else { - Duration::MAX - }, + FARMER_IDENTIFICATION_BROADCAST_INTERVAL, + index == 0, )), true, ) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index ccdf4753f1..9109a152ed 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -1,8 +1,6 @@ use crate::commands::shared::PlottingThreadPriority; use anyhow::anyhow; use clap::Parser; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use prometheus_client::registry::Registry; use std::future::Future; use std::num::NonZeroUsize; @@ -17,7 +15,6 @@ use subspace_farmer::cluster::plotter::plotter_service; use subspace_farmer::plotter::cpu::CpuPlotter; use subspace_farmer::utils::{ create_plotting_thread_pool_manager, parse_cpu_cores_sets, thread_pool_core_indices, - AsyncJoinOnDrop, }; use subspace_proof_of_space::Table; use tokio::sync::Semaphore; @@ -78,12 +75,6 @@ pub(super) struct PlotterArgs { /// "min", "max" or "default". #[arg(long, default_value_t = PlottingThreadPriority::Min)] plotting_thread_priority: PlottingThreadPriority, - /// Number of service instances. - /// - /// Increasing number of services allows to process more concurrent requests, but increasing - /// beyond number of CPU cores doesn't make sense and will likely hurt performance instead. - #[arg(long, default_value = "32")] - service_instances: NonZeroUsize, /// Additional cluster components #[clap(raw = true)] pub(super) additional_components: Vec, @@ -105,7 +96,6 @@ where plotting_thread_pool_size, plotting_cpu_cores, plotting_thread_priority, - service_instances, additional_components: _, } = plotter_args; @@ -180,24 +170,10 @@ where )); // TODO: Metrics - let mut plotter_services = (0..service_instances.get()) - .map(|_| { - let nats_client = nats_client.clone(); - let cpu_plotter = Arc::clone(&cpu_plotter); - - AsyncJoinOnDrop::new( - tokio::spawn(async move { plotter_service(&nats_client, &cpu_plotter).await }), - true, - ) - }) - .collect::>(); Ok(Box::pin(async move { - plotter_services - .next() + plotter_service(&nats_client, &cpu_plotter) .await - .expect("Not empty; qed") - .map_err(|error| anyhow!("Plotter service failed: {error}"))? .map_err(|error| anyhow!("Plotter service failed: {error}")) })) } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 8d6556f4a7..12d5e68284 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -1,5 +1,6 @@ #![feature( const_option, + duration_constructors, extract_if, hash_extract_if, let_chains, diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 35a1171902..5453742669 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -20,11 +20,13 @@ use futures::{select, stream, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; use std::future::{pending, Future}; use std::pin::{pin, Pin}; -use std::time::Duration; +use std::time::{Duration, Instant}; use subspace_core_primitives::{Piece, PieceIndex}; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, trace, warn}; +const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1); + /// Broadcast with identification details by caches #[derive(Debug, Clone, Encode, Decode)] pub struct ClusterCacheIdentifyBroadcast { @@ -199,6 +201,7 @@ pub async fn cache_service( caches: &[C], cache_group: &str, identification_broadcast_interval: Duration, + primary_instance: bool, ) -> anyhow::Result<()> where C: PieceCache, @@ -208,7 +211,9 @@ where .map(|cache| { let cache_id = *cache.id(); - info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); + if primary_instance { + info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache"); + } CacheDetails { cache_id, @@ -218,22 +223,39 @@ where }) .collect::>(); - select! { - result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { - result - }, - result = write_piece_responder(&nats_client, &caches_details).fuse() => { - result - }, - result = read_piece_index_responder(&nats_client, &caches_details).fuse() => { - result - }, - result = read_piece_responder(&nats_client, &caches_details).fuse() => { - result - }, - result = contents_responder(&nats_client, &caches_details).fuse() => { - result - }, + if primary_instance { + select! { + result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => { + result + }, + result = write_piece_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = read_piece_index_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = read_piece_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = contents_responder(&nats_client, &caches_details).fuse() => { + result + }, + } + } else { + select! { + result = write_piece_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = read_piece_index_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = read_piece_responder(&nats_client, &caches_details).fuse() => { + result + }, + result = contents_responder(&nats_client, &caches_details).fuse() => { + result + }, + } } } @@ -261,10 +283,13 @@ where anyhow!("Failed to subscribe to cache identify broadcast requests: {error}") })? .fuse(); + // Also send periodic updates in addition to the subscription response let mut interval = tokio::time::interval(identification_broadcast_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut last_identification = Instant::now(); + loop { select! { maybe_message = subscription.next() => { @@ -275,10 +300,17 @@ where trace!(?message, "Cache received identify broadcast message"); + if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL { + // Skip too frequent identification requests + continue; + } + + last_identification = Instant::now(); send_identify_broadcast(nats_client, caches_details).await; interval.reset(); } _ = interval.tick().fuse() => { + last_identification = Instant::now(); trace!("Cache self-identification"); send_identify_broadcast(nats_client, caches_details).await; diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index b614bbd8eb..6be323b52c 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -492,39 +492,57 @@ pub async fn controller_service( piece_getter: &PG, farmer_cache: &FarmerCache, instance: &str, + primary_instance: bool, ) -> anyhow::Result<()> where NC: NodeClient, PG: PieceGetter + Sync, { - select! { - result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => { - result - }, - result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => { - result - }, - result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => { - result - }, - result = solution_response_forwarder(nats_client, node_client, instance).fuse() => { - result - }, - result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => { - result - }, - result = farmer_app_info_responder(nats_client, node_client).fuse() => { - result - }, - result = segment_headers_responder(nats_client, node_client).fuse() => { - result - }, - result = find_piece_responder(nats_client, farmer_cache).fuse() => { - result - }, - result = piece_responder(nats_client, piece_getter).fuse() => { - result - }, + if primary_instance { + select! { + result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => { + result + }, + result = solution_response_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => { + result + }, + result = farmer_app_info_responder(nats_client, node_client).fuse() => { + result + }, + result = segment_headers_responder(nats_client, node_client).fuse() => { + result + }, + result = find_piece_responder(nats_client, farmer_cache).fuse() => { + result + }, + result = piece_responder(nats_client, piece_getter).fuse() => { + result + }, + } + } else { + select! { + result = farmer_app_info_responder(nats_client, node_client).fuse() => { + result + }, + result = segment_headers_responder(nats_client, node_client).fuse() => { + result + }, + result = find_piece_responder(nats_client, farmer_cache).fuse() => { + result + }, + result = piece_responder(nats_client, piece_getter).fuse() => { + result + }, + } } } diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 664b083ea1..897985198d 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -341,7 +341,7 @@ struct FarmDetails { total_sectors_count: SectorIndex, piece_reader: Arc, plotted_sectors: Arc, - _background_tasks: AsyncJoinOnDrop<()>, + _background_tasks: Option>, } /// Create farmer service for specified farms that will be processing incoming requests and send @@ -353,6 +353,7 @@ pub fn farmer_service( nats_client: NatsClient, farms: &[F], identification_broadcast_interval: Duration, + primary_instance: bool, ) -> impl Future> + Send + 'static where F: Farm, @@ -365,97 +366,104 @@ where let farm_id = *farm.id(); let nats_client = nats_client.clone(); - let (sector_updates_sender, mut sector_updates_receiver) = - mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); - let (farming_notifications_sender, mut farming_notifications_receiver) = - mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); - let (solutions_sender, mut solutions_receiver) = - mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + let background_tasks = if primary_instance { + let (sector_updates_sender, mut sector_updates_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + let (farming_notifications_sender, mut farming_notifications_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + let (solutions_sender, mut solutions_receiver) = + mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER); + + let sector_updates_handler_id = + farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| { + if let Err(error) = sector_updates_sender.clone().try_send( + ClusterFarmerSectorUpdateBroadcast { + farm_id, + sector_index: *sector_index, + sector_update: sector_update.clone(), + }, + ) { + warn!(%farm_id, %error, "Failed to send sector update notification"); + } + })); - let sector_updates_handler_id = - farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| { + let farming_notifications_handler_id = + farm.on_farming_notification(Arc::new(move |farming_notification| { + if let Err(error) = farming_notifications_sender.clone().try_send( + ClusterFarmerFarmingNotificationBroadcast { + farm_id, + farming_notification: farming_notification.clone(), + }, + ) { + warn!(%farm_id, %error, "Failed to send farming notification"); + } + })); + + let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| { if let Err(error) = - sector_updates_sender + solutions_sender .clone() - .try_send(ClusterFarmerSectorUpdateBroadcast { + .try_send(ClusterFarmerSolutionBroadcast { farm_id, - sector_index: *sector_index, - sector_update: sector_update.clone(), + solution_response: solution_response.clone(), }) { - warn!(%farm_id, %error, "Failed to send sector update notification"); + warn!(%farm_id, %error, "Failed to send solution notification"); } })); - let farming_notifications_handler_id = - farm.on_farming_notification(Arc::new(move |farming_notification| { - if let Err(error) = farming_notifications_sender.clone().try_send( - ClusterFarmerFarmingNotificationBroadcast { - farm_id, - farming_notification: farming_notification.clone(), - }, - ) { - warn!(%farm_id, %error, "Failed to send farming notification"); - } - })); - - let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| { - if let Err(error) = - solutions_sender - .clone() - .try_send(ClusterFarmerSolutionBroadcast { - farm_id, - solution_response: solution_response.clone(), - }) - { - warn!(%farm_id, %error, "Failed to send solution notification"); - } - })); - - let background_tasks = AsyncJoinOnDrop::new( - tokio::spawn(async move { - let farm_id_string = farm_id.to_string(); - - let sector_updates_fut = async { - while let Some(broadcast) = sector_updates_receiver.next().await { - if let Err(error) = - nats_client.broadcast(&broadcast, &farm_id_string).await - { - warn!(%farm_id, %error, "Failed to broadcast sector update"); + Some(AsyncJoinOnDrop::new( + tokio::spawn(async move { + let farm_id_string = farm_id.to_string(); + + let sector_updates_fut = async { + while let Some(broadcast) = sector_updates_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast sector update"); + } } - } - }; - let farming_notifications_fut = async { - while let Some(broadcast) = farming_notifications_receiver.next().await { - if let Err(error) = - nats_client.broadcast(&broadcast, &farm_id_string).await + }; + let farming_notifications_fut = async { + while let Some(broadcast) = farming_notifications_receiver.next().await { - warn!(%farm_id, %error, "Failed to broadcast farming notification"); + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!( + %farm_id, + %error, + "Failed to broadcast farming notification" + ); + } } - } - }; - let solutions_fut = async { - while let Some(broadcast) = solutions_receiver.next().await { - if let Err(error) = - nats_client.broadcast(&broadcast, &farm_id_string).await - { - warn!(%farm_id, %error, "Failed to broadcast solution"); + }; + let solutions_fut = async { + while let Some(broadcast) = solutions_receiver.next().await { + if let Err(error) = + nats_client.broadcast(&broadcast, &farm_id_string).await + { + warn!(%farm_id, %error, "Failed to broadcast solution"); + } } - } - }; + }; - select! { - _ = sector_updates_fut.fuse() => {} - _ = farming_notifications_fut.fuse() => {} - _ = solutions_fut.fuse() => {} - } + select! { + _ = sector_updates_fut.fuse() => {} + _ = farming_notifications_fut.fuse() => {} + _ = solutions_fut.fuse() => {} + } - drop(sector_updates_handler_id); - drop(farming_notifications_handler_id); - drop(solutions_handler_id); - }), - true, - ); + drop(sector_updates_handler_id); + drop(farming_notifications_handler_id); + drop(solutions_handler_id); + }), + true, + )) + } else { + None + }; FarmDetails { farm_id, @@ -469,16 +477,27 @@ where .collect::>(); async move { - select! { - result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { - result - }, - result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { - result - }, - result = read_piece_responder(&nats_client, &farms_details).fuse() => { - result - }, + if primary_instance { + select! { + result = identify_responder(&nats_client, &farms_details, identification_broadcast_interval).fuse() => { + result + }, + result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { + result + }, + result = read_piece_responder(&nats_client, &farms_details).fuse() => { + result + }, + } + } else { + select! { + result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => { + result + }, + result = read_piece_responder(&nats_client, &farms_details).fuse() => { + result + }, + } } } } @@ -504,6 +523,7 @@ async fn identify_responder( anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}") })? .fuse(); + // Also send periodic updates in addition to the subscription response let mut interval = tokio::time::interval(identification_broadcast_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index 4032b9ff19..f5a71f8ded 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -674,6 +674,7 @@ impl NatsClient { ); if received_index != expected_index { warn!( + %response_subject, %received_index, %expected_index, request_type = %type_name::(), @@ -685,6 +686,7 @@ impl NatsClient { } } else { warn!( + %response_subject, request_type = %type_name::(), response_type = %type_name::(), message = %hex::encode(message.payload), @@ -695,6 +697,7 @@ impl NatsClient { } Ok(None) => { warn!( + %response_subject, request_type = %type_name::(), response_type = %type_name::(), "Acknowledgement stream ended unexpectedly" @@ -703,6 +706,8 @@ impl NatsClient { } Err(_error) => { warn!( + %response_subject, + %expected_index, request_type = %type_name::(), response_type = %type_name::(), "Acknowledgement wait timed out"