From a1f9bb0a09f0ba340355c884a86b0341ea356ef7 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 11 Jul 2024 04:56:11 +0300 Subject: [PATCH] Fix sending large messages with NATS client --- .../src/cluster/nats_client.rs | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index a2465d0588..a729299330 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -385,6 +385,7 @@ struct Inner { client: Client, request_retry_backoff_policy: ExponentialBackoff, approximate_max_message_size: usize, + max_message_size: usize, } /// NATS client wrapper that can be used to interact with other Subspace-specific clients @@ -438,6 +439,8 @@ impl NatsClient { request_retry_backoff_policy, // Allow up to 90%, the rest will be wrapper data structures, etc. approximate_max_message_size: max_payload * 9 / 10, + // Allow up to 90%, the rest will be wrapper data structures, etc. + max_message_size: max_payload, }; Ok(Self { @@ -728,8 +731,9 @@ impl NatsClient { return; } }; - let approximate_max_message_size = self.approximate_max_message_size(); - let max_responses_per_message = approximate_max_message_size / first_element.encoded_size(); + let max_message_size = self.inner.max_message_size; + let max_responses_per_message = + self.approximate_max_message_size() / first_element.encoded_size(); let ack_subject = format!("stream-response-ack.{}", Ulid::new()); let mut ack_subscription = match self.subscribe(ack_subject.clone()).await { @@ -788,11 +792,21 @@ impl NatsClient { } }; let encoded_response = response.encode(); + let encoded_response_len = encoded_response.len(); // When encoded response is too large, remove one of the responses from it and try // again - if encoded_response.len() > approximate_max_message_size { + if encoded_response_len > max_message_size { buffer = response.into(); if let Some(element) = buffer.pop_back() { + if buffer.is_empty() { + error!( + ?element, + encoded_response_len, + max_message_size, + "Element was too large to fit into NATS message, this is an \ + implementation bug" + ); + } overflow_buffer.push_front(element); continue; } else {