Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rust ordered message stream implementation #1197

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions rust/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use byteorder::{ReadBytesExt, LE};
use crc32fast::hash as crc32;
use enumset::{enum_set, EnumSet, EnumSetType};
use log::*;
use std::cmp::Ordering;
use std::collections::binary_heap::IntoIter;
use std::collections::BinaryHeap;

use crate::{
io_utils::CountingCrcReader,
Expand Down Expand Up @@ -758,6 +761,93 @@ impl<'a> Iterator for MessageStream<'a> {
}
}

#[derive(Eq, PartialEq)]
struct OrderedMessage<'a> {
message: Message<'a>,
reversed: bool,
}

impl PartialOrd for OrderedMessage<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for OrderedMessage<'_> {
fn cmp(&self, other: &Self) -> Ordering {
let mut cmp = self.message.log_time.cmp(&other.message.log_time);

// default order is largest to smallest
match self.reversed {
true => cmp,
false => cmp.reverse(),
}
}
}

pub struct OrderedMessageStream<'a> {
remaining_iter: MessageStream<'a>,
ordered_msgs: BinaryHeap<OrderedMessage<'static>>,
// should only be set if there's an error while reading from MessageStream into
// ordered iterator
next_remaining: Option<McapResult<Message<'static>>>,
}

impl<'a> OrderedMessageStream<'a> {
pub fn new(buf: &'a [u8]) -> McapResult<Self> {
Self::new_with_options(buf, enum_set!(), false)
}

pub fn new_with_options(
buf: &'a [u8],
options: EnumSet<Options>,
reversed: bool,
) -> McapResult<Self> {
MessageStream::new_with_options(buf, options).map(|mut msg_stream| {
let mut heap = BinaryHeap::new();
let mut next: Option<McapResult<Message>> = None;

loop {
match msg_stream.next() {
None => break,
Some(Ok(msg)) => {
heap.push(OrderedMessage {
message: msg,
reversed,
});
}
Some(Err(e)) => {
next = Some(Err(e));
break;
}
}
}

OrderedMessageStream {
remaining_iter: msg_stream,
ordered_msgs: heap,
next_remaining: next,
}
})
}
}

impl<'a> Iterator for OrderedMessageStream<'a> {
type Item = McapResult<Message<'static>>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(msg) = self.ordered_msgs.pop() {
return Some(Ok(msg.message));
}

if self.next_remaining.is_some() {
return self.next_remaining.take();
}

self.remaining_iter.next()
}
}

const FOOTER_LEN: usize = 20 + 8 + 1; // 20 bytes + 8 byte len + 1 byte opcode

/// Read the MCAP footer.
Expand Down Expand Up @@ -1168,6 +1258,8 @@ reader!(u64);
#[cfg(test)]
mod test {
use super::*;
use crate::records::MessageHeader;
use crate::Writer;

// Can we read a file that's only magic?
// (Probably considered malformed by the spec, but let's not panic on user input)
Expand Down
Loading