diff --git a/rust/src/read.rs b/rust/src/read.rs index cd6808c089..dc15b9a770 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -21,6 +21,9 @@ use byteorder::{ReadBytesExt, LE}; use crc32fast::hash as crc32; use enumset::{enum_set, EnumSet, EnumSetType}; use log::*; +use std::cmp::Ordering; +use std::collections::binary_heap::IntoIter; +use std::collections::BinaryHeap; use crate::{ io_utils::CountingCrcReader, @@ -758,6 +761,93 @@ impl<'a> Iterator for MessageStream<'a> { } } +#[derive(Eq, PartialEq)] +struct OrderedMessage<'a> { + message: Message<'a>, + reversed: bool, +} + +impl PartialOrd for OrderedMessage<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OrderedMessage<'_> { + fn cmp(&self, other: &Self) -> Ordering { + let mut cmp = self.message.log_time.cmp(&other.message.log_time); + + // default order is largest to smallest + match self.reversed { + true => cmp, + false => cmp.reverse(), + } + } +} + +pub struct OrderedMessageStream<'a> { + remaining_iter: MessageStream<'a>, + ordered_msgs: BinaryHeap>, + // should only be set if there's an error while reading from MessageStream into + // ordered iterator + next_remaining: Option>>, +} + +impl<'a> OrderedMessageStream<'a> { + pub fn new(buf: &'a [u8]) -> McapResult { + Self::new_with_options(buf, enum_set!(), false) + } + + pub fn new_with_options( + buf: &'a [u8], + options: EnumSet, + reversed: bool, + ) -> McapResult { + MessageStream::new_with_options(buf, options).map(|mut msg_stream| { + let mut heap = BinaryHeap::new(); + let mut next: Option> = None; + + loop { + match msg_stream.next() { + None => break, + Some(Ok(msg)) => { + heap.push(OrderedMessage { + message: msg, + reversed, + }); + } + Some(Err(e)) => { + next = Some(Err(e)); + break; + } + } + } + + OrderedMessageStream { + remaining_iter: msg_stream, + ordered_msgs: heap, + next_remaining: next, + } + }) + } +} + +impl<'a> Iterator for OrderedMessageStream<'a> { + type Item = McapResult>; + + fn next(&mut self) -> Option { + if let Some(msg) = self.ordered_msgs.pop() { + return Some(Ok(msg.message)); + } + + if self.next_remaining.is_some() { + return self.next_remaining.take(); + } + + self.remaining_iter.next() + } +} + const FOOTER_LEN: usize = 20 + 8 + 1; // 20 bytes + 8 byte len + 1 byte opcode /// Read the MCAP footer. @@ -1168,6 +1258,8 @@ reader!(u64); #[cfg(test)] mod test { use super::*; + use crate::records::MessageHeader; + use crate::Writer; // Can we read a file that's only magic? // (Probably considered malformed by the spec, but let's not panic on user input)