diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs index 9264783faad..2704b4f8467 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs @@ -96,7 +96,7 @@ pub(super) async fn maintain_caches( (Box::pin(ready(())) as Pin>>).fuse(); let cache_identify_subscription = pin!(nats_client - .subscribe_to_broadcasts::(None, None) + .subscribe_to_broadcasts::(Some(cache_group), None) .await .map_err(|error| anyhow!("Failed to subscribe to cache identify broadcast: {error}"))?); diff --git a/crates/subspace-farmer/src/cluster/cache.rs b/crates/subspace-farmer/src/cluster/cache.rs index 5453742669b..63ae8f5ecbf 100644 --- a/crates/subspace-farmer/src/cluster/cache.rs +++ b/crates/subspace-farmer/src/cluster/cache.rs @@ -37,6 +37,7 @@ pub struct ClusterCacheIdentifyBroadcast { } impl GenericBroadcast for ClusterCacheIdentifyBroadcast { + /// `*` here stands for cache group const SUBJECT: &'static str = "subspace.cache.*.identify"; } @@ -274,10 +275,7 @@ where C: PieceCache, { let mut subscription = nats_client - .subscribe_to_broadcasts::( - Some(cache_group), - Some(cache_group.to_string()), - ) + .subscribe_to_broadcasts::(Some(cache_group), None) .await .map_err(|error| { anyhow!("Failed to subscribe to cache identify broadcast requests: {error}") @@ -306,14 +304,14 @@ where } last_identification = Instant::now(); - send_identify_broadcast(nats_client, caches_details).await; + send_identify_broadcast(nats_client, caches_details, cache_group).await; interval.reset(); } _ = interval.tick().fuse() => { last_identification = Instant::now(); trace!("Cache self-identification"); - send_identify_broadcast(nats_client, caches_details).await; + send_identify_broadcast(nats_client, caches_details, cache_group).await; } } } @@ -324,6 +322,7 @@ where async fn send_identify_broadcast( nats_client: &NatsClient, caches_details: &[CacheDetails<'_, C>], + cache_group: &str, ) where C: PieceCache, { @@ -336,7 +335,7 @@ async fn send_identify_broadcast( cache_id: cache.cache_id, max_num_elements: cache.cache.max_num_elements(), }, - &cache.cache_id_string, + cache_group, ) .await {