Skip to content

Commit

Permalink
Producer: Escape counter for Incomplete errors.
Browse files Browse the repository at this point in the history
Count incomplete errors and skip bytes if they produced multiple times
in raw to avoid filling the internal buffers without being able to
parse the data or skip it.
  • Loading branch information
AmmarAbouZor committed Feb 14, 2025
1 parent fcdbb1c commit cba4a67
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions application/apps/indexer/sources/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ use tokio_stream::Stream;
pub type SdeSender = UnboundedSender<SdeMsg>;
pub type SdeReceiver = UnboundedReceiver<SdeMsg>;

/// The maximum number of allowed continuous incomplete errors before skipping
/// some bytes in the internal buffer.
const MAX_INCOMPLETE_ATTEMPTS: usize = 3;

enum Next {
Read((usize, usize, usize)),
Sde(Option<SdeMsg>),
Expand Down Expand Up @@ -111,6 +115,10 @@ impl<T: LogMessage, P: Parser<T>, D: ByteSource> MessageProducer<T, P, D> {
break 'outer self.load().await.unwrap_or((0, 0, 0));
};
};

let mut incomplete_count = 0;
let mut incomplete_consume = 0;

// 1. buffer loaded? if not, fill buffer with frame data
// 2. try to parse message from buffer
// 3a. if message, pop it of the buffer and deliver
Expand Down Expand Up @@ -175,6 +183,28 @@ impl<T: LogMessage, P: Parser<T>, D: ByteSource> MessageProducer<T, P, D> {
}
Err(ParserError::Incomplete) => {
trace!("not enough bytes to parse a message");

match incomplete_count {
// Mark consume amount on first incomplete error only to avoid skipping too
// much bytes.
0 => incomplete_consume = available,

// Skip the bytes from first incomplete attempt and retry parsing.
MAX_INCOMPLETE_ATTEMPTS => {
self.byte_source.consume(incomplete_consume);
skipped_bytes += incomplete_consume;
available = self.byte_source.len();

// Reset incomplete variables.
incomplete_consume = 0;
incomplete_count = 0;

continue;
}
_ => {}
}
incomplete_count += 1;

let (newly_loaded, _available_bytes, skipped) = self.load().await?;

// Stop if there is no new available bytes.
Expand Down

0 comments on commit cba4a67

Please sign in to comment.