diff --git a/application/apps/indexer/sources/src/producer.rs b/application/apps/indexer/sources/src/producer.rs index a7f51aae9a..4c9f9b04c1 100644 --- a/application/apps/indexer/sources/src/producer.rs +++ b/application/apps/indexer/sources/src/producer.rs @@ -15,6 +15,10 @@ use tokio_stream::Stream; pub type SdeSender = UnboundedSender; pub type SdeReceiver = UnboundedReceiver; +/// The maximum number of allowed continuous incomplete errors before skipping +/// some bytes in the internal buffer. +const MAX_INCOMPLETE_ATTEMPTS: usize = 3; + enum Next { Read((usize, usize, usize)), Sde(Option), @@ -111,6 +115,10 @@ impl, D: ByteSource> MessageProducer { break 'outer self.load().await.unwrap_or((0, 0, 0)); }; }; + + let mut incomplete_count = 0; + let mut incomplete_consume = 0; + // 1. buffer loaded? if not, fill buffer with frame data // 2. try to parse message from buffer // 3a. if message, pop it of the buffer and deliver @@ -175,6 +183,28 @@ impl, D: ByteSource> MessageProducer { } Err(ParserError::Incomplete) => { trace!("not enough bytes to parse a message"); + + match incomplete_count { + // Mark consume amount on first incomplete error only to avoid skipping too + // much bytes. + 0 => incomplete_consume = available, + + // Skip the bytes from first incomplete attempt and retry parsing. + MAX_INCOMPLETE_ATTEMPTS => { + self.byte_source.consume(incomplete_consume); + skipped_bytes += incomplete_consume; + available = self.byte_source.len(); + + // Reset incomplete variables. + incomplete_consume = 0; + incomplete_count = 0; + + continue; + } + _ => {} + } + incomplete_count += 1; + let (newly_loaded, _available_bytes, skipped) = self.load().await?; // Stop if there is no new available bytes.