Skip to content

Commit

Permalink
Improve farming cluster logging (#2883)
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc authored Jun 27, 2024
2 parents 24365ec + a0b4d6f commit f6ed626
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 15 deletions.
61 changes: 46 additions & 15 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ pub struct StreamResponseSubscriber<Response> {
#[deref]
#[deref_mut]
subscriber: Subscriber,
response_subject: String,
buffered_responses: Option<GenericStreamResponses<Response>>,
next_index: u32,
acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
Expand Down Expand Up @@ -217,6 +218,7 @@ where
actual_index = %responses.index(),
expected_index = %*projected.next_index,
message_type = %type_name::<Response>(),
response_subject = %projected.response_subject,
"Received unexpected response stream index, aborting stream"
);

Expand All @@ -235,6 +237,7 @@ where
%error,
%index,
message_type = %type_name::<Response>(),
response_subject = %projected.response_subject,
%ack_subject,
"Failed to send acknowledgement for stream response"
);
Expand All @@ -252,6 +255,7 @@ where
warn!(
%error,
response_type = %type_name::<Response>(),
response_subject = %projected.response_subject,
message = %hex::encode(message.payload),
"Failed to decode stream response"
);
Expand All @@ -267,26 +271,45 @@ where
}

impl<Response> StreamResponseSubscriber<Response> {
fn new(subscriber: Subscriber, nats_client: NatsClient) -> Self {
fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
let (acknowledgement_sender, mut acknowledgement_receiver) =
mpsc::unbounded::<(String, u32)>();

let background_task = AsyncJoinOnDrop::new(
tokio::spawn(async move {
while let Some((subject, index)) = acknowledgement_receiver.next().await {
if let Err(error) = nats_client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(%error, %subject, %index, "Failed to send acknowledgement");
return;
tokio::spawn({
let response_subject = response_subject.clone();

async move {
while let Some((subject, index)) = acknowledgement_receiver.next().await {
warn!(
%subject,
%index,
%response_subject,
%index,
"Sending stream response acknowledgement"
);
if let Err(error) = nats_client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(
%error,
%subject,
%index,
%response_subject,
%index,
"Failed to send stream response acknowledgement"
);
return;
}
}
}
}),
true,
);

Self {
response_subject,
subscriber,
buffered_responses: None,
next_index: 0,
Expand Down Expand Up @@ -631,17 +654,25 @@ impl NatsClient {
.client
.subscribe(stream_request.response_subject.clone())
.await?;
debug!(request_type = %type_name::<Request>(), ?subscriber, "Stream request subscription");

let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
debug!(
request_type = %type_name::<Request>(),
%stream_request_subject,
?subscriber,
"Stream request subscription"
);

self.inner
.client
.publish(
subject_with_instance(Request::SUBJECT, instance),
stream_request.encode().into(),
)
.publish(stream_request_subject, stream_request.encode().into())
.await?;

Ok(StreamResponseSubscriber::new(subscriber, self.clone()))
Ok(StreamResponseSubscriber::new(
subscriber,
stream_request.response_subject,
self.clone(),
))
}

/// Helper method to send responses to requests initiated with [`Self::stream_request`]
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-farmer/src/cluster/plotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,12 @@ where
PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
PS::Error: Error,
{
if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
trace!(?response, "Processing plotting response notification");
} else {
trace!("Processing plotting response notification (sector chunk)");
}

match response {
ClusterSectorPlottingProgress::Occupied => {
debug!(%free_instance, "Instance was occupied, retrying #2");
Expand Down

0 comments on commit f6ed626

Please sign in to comment.