Skip to content

Commit

Permalink
Fix sending large messages with NATS client
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Jul 11, 2024
1 parent 1d36819 commit a1f9bb0
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ struct Inner {
client: Client,
request_retry_backoff_policy: ExponentialBackoff,
approximate_max_message_size: usize,
max_message_size: usize,
}

/// NATS client wrapper that can be used to interact with other Subspace-specific clients
Expand Down Expand Up @@ -438,6 +439,8 @@ impl NatsClient {
request_retry_backoff_policy,
// Allow up to 90%, the rest will be wrapper data structures, etc.
approximate_max_message_size: max_payload * 9 / 10,
// Allow up to 90%, the rest will be wrapper data structures, etc.
max_message_size: max_payload,
};

Ok(Self {
Expand Down Expand Up @@ -728,8 +731,9 @@ impl NatsClient {
return;
}
};
let approximate_max_message_size = self.approximate_max_message_size();
let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();
let max_message_size = self.inner.max_message_size;
let max_responses_per_message =
self.approximate_max_message_size() / first_element.encoded_size();

let ack_subject = format!("stream-response-ack.{}", Ulid::new());
let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
Expand Down Expand Up @@ -788,11 +792,21 @@ impl NatsClient {
}
};
let encoded_response = response.encode();
let encoded_response_len = encoded_response.len();
// When encoded response is too large, remove one of the responses from it and try
// again
if encoded_response.len() > approximate_max_message_size {
if encoded_response_len > max_message_size {
buffer = response.into();
if let Some(element) = buffer.pop_back() {
if buffer.is_empty() {
error!(
?element,
encoded_response_len,
max_message_size,
"Element was too large to fit into NATS message, this is an \
implementation bug"
);
}
overflow_buffer.push_front(element);
continue;
} else {
Expand Down

0 comments on commit a1f9bb0

Please sign in to comment.