Skip to content

Commit

Permalink
Merge pull request #2858 from subspace/fix-farming-cluster-forwarders…
Browse files Browse the repository at this point in the history
…-and-intervals

Fix farming cluster forwarders and identification intervals
  • Loading branch information
nazar-pc authored Jun 18, 2024
2 parents 5dfec24 + a48c5a4 commit 870bc43
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,8 @@ pub(super) async fn cache(
nats_client,
&caches,
&cache_group,
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
CACHE_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
CACHE_IDENTIFICATION_BROADCAST_INTERVAL,
index == 0,
)
.await
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub(super) async fn controller(
)?;

let mut controller_services = (0..service_instances.get())
.map(|_| {
.map(|index| {
let nats_client = nats_client.clone();
let node_client = node_client.clone();
let piece_getter = piece_getter.clone();
Expand All @@ -211,6 +211,7 @@ pub(super) async fn controller(
&piece_getter,
&farmer_cache,
&instance,
index == 0,
)
.await
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,8 @@ where
tokio::spawn(farmer_service(
nats_client.clone(),
farms.as_slice(),
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
FARMER_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
FARMER_IDENTIFICATION_BROADCAST_INTERVAL,
index == 0,
)),
true,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
use clap::Parser;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::num::NonZeroUsize;
Expand All @@ -17,7 +15,6 @@ use subspace_farmer::cluster::plotter::plotter_service;
use subspace_farmer::plotter::cpu::CpuPlotter;
use subspace_farmer::utils::{
create_plotting_thread_pool_manager, parse_cpu_cores_sets, thread_pool_core_indices,
AsyncJoinOnDrop,
};
use subspace_proof_of_space::Table;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -78,12 +75,6 @@ pub(super) struct PlotterArgs {
/// "min", "max" or "default".
#[arg(long, default_value_t = PlottingThreadPriority::Min)]
plotting_thread_priority: PlottingThreadPriority,
/// Number of service instances.
///
/// Increasing number of services allows to process more concurrent requests, but increasing
/// beyond number of CPU cores doesn't make sense and will likely hurt performance instead.
#[arg(long, default_value = "32")]
service_instances: NonZeroUsize,
/// Additional cluster components
#[clap(raw = true)]
pub(super) additional_components: Vec<String>,
Expand All @@ -105,7 +96,6 @@ where
plotting_thread_pool_size,
plotting_cpu_cores,
plotting_thread_priority,
service_instances,
additional_components: _,
} = plotter_args;

Expand Down Expand Up @@ -180,24 +170,10 @@ where
));

// TODO: Metrics
let mut plotter_services = (0..service_instances.get())
.map(|_| {
let nats_client = nats_client.clone();
let cpu_plotter = Arc::clone(&cpu_plotter);

AsyncJoinOnDrop::new(
tokio::spawn(async move { plotter_service(&nats_client, &cpu_plotter).await }),
true,
)
})
.collect::<FuturesUnordered<_>>();

Ok(Box::pin(async move {
plotter_services
.next()
plotter_service(&nats_client, &cpu_plotter)
.await
.expect("Not empty; qed")
.map_err(|error| anyhow!("Plotter service failed: {error}"))?
.map_err(|error| anyhow!("Plotter service failed: {error}"))
}))
}
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(
const_option,
duration_constructors,
extract_if,
hash_extract_if,
let_chains,
Expand Down
68 changes: 50 additions & 18 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use futures::{select, stream, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use std::future::{pending, Future};
use std::pin::{pin, Pin};
use std::time::Duration;
use std::time::{Duration, Instant};
use subspace_core_primitives::{Piece, PieceIndex};
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, trace, warn};

const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);

/// Broadcast with identification details by caches
#[derive(Debug, Clone, Encode, Decode)]
pub struct ClusterCacheIdentifyBroadcast {
Expand Down Expand Up @@ -199,6 +201,7 @@ pub async fn cache_service<C>(
caches: &[C],
cache_group: &str,
identification_broadcast_interval: Duration,
primary_instance: bool,
) -> anyhow::Result<()>
where
C: PieceCache,
Expand All @@ -208,7 +211,9 @@ where
.map(|cache| {
let cache_id = *cache.id();

info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache");
if primary_instance {
info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache");
}

CacheDetails {
cache_id,
Expand All @@ -218,22 +223,39 @@ where
})
.collect::<Vec<_>>();

select! {
result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => {
result
},
result = write_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = contents_responder(&nats_client, &caches_details).fuse() => {
result
},
if primary_instance {
select! {
result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => {
result
},
result = write_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = contents_responder(&nats_client, &caches_details).fuse() => {
result
},
}
} else {
select! {
result = write_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = contents_responder(&nats_client, &caches_details).fuse() => {
result
},
}
}
}

Expand Down Expand Up @@ -261,10 +283,13 @@ where
anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
})?
.fuse();

// Also send periodic updates in addition to the subscription response
let mut interval = tokio::time::interval(identification_broadcast_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut last_identification = Instant::now();

loop {
select! {
maybe_message = subscription.next() => {
Expand All @@ -275,10 +300,17 @@ where

trace!(?message, "Cache received identify broadcast message");

if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
// Skip too frequent identification requests
continue;
}

last_identification = Instant::now();
send_identify_broadcast(nats_client, caches_details).await;
interval.reset();
}
_ = interval.tick().fuse() => {
last_identification = Instant::now();
trace!("Cache self-identification");

send_identify_broadcast(nats_client, caches_details).await;
Expand Down
74 changes: 46 additions & 28 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,39 +518,57 @@ pub async fn controller_service<NC, PG>(
piece_getter: &PG,
farmer_cache: &FarmerCache,
instance: &str,
primary_instance: bool,
) -> anyhow::Result<()>
where
NC: NodeClient,
PG: PieceGetter + Sync,
{
select! {
result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
if primary_instance {
select! {
result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
}
} else {
select! {
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
}
}
}

Expand Down
Loading

0 comments on commit 870bc43

Please sign in to comment.