Skip to content

Commit

Permalink
strip out tokio impl
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Sep 26, 2024
1 parent 3b2cf84 commit d8f5325
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 540 deletions.
36 changes: 30 additions & 6 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,18 @@ enum ReadingFrom {
Chunk(ChunkState),
}

pub struct LinearReader {
#[derive(Default, Clone)]
pub struct RecordReaderOptions {
/// If true, the reader will not expect the MCAP magic at the start of the stream.
pub skip_start_magic: bool,
/// If true, the reader will not expect the MCAP magic at the end of the stream.
pub skip_end_magic: bool,
/// If true, the reader will yield entire chunk records. Otherwise, the reader will decompress
/// and read into the chunk, yielding the records inside.
pub emit_chunks: bool,
}

pub struct RecordReader {
currently_reading: CurrentlyReading,
from: ReadingFrom,
uncompressed_data_start: usize,
Expand All @@ -45,11 +56,16 @@ pub struct LinearReader {
compressed_data_end: usize,
compressed_data: Vec<u8>,
decompressors: HashMap<String, Box<dyn Decompressor>>,
options: RecordReaderOptions,
}

impl LinearReader {
impl RecordReader {
pub fn new() -> Self {
LinearReader {
Self::new_with_options(RecordReaderOptions::default())
}

pub fn new_with_options(options: RecordReaderOptions) -> Self {
RecordReader {
currently_reading: CurrentlyReading::StartMagic,
from: ReadingFrom::File,
uncompressed_data: Vec::new(),
Expand All @@ -59,6 +75,7 @@ impl LinearReader {
compressed_data_start: 0,
compressed_data_end: 0,
decompressors: HashMap::new(),
options: options,
}
}
fn get_decompressor(&mut self, name: &str) -> McapResult<Box<dyn Decompressor>> {
Expand Down Expand Up @@ -95,6 +112,10 @@ impl LinearReader {

match self.currently_reading {
CurrentlyReading::StartMagic => {
if self.options.skip_start_magic {
self.currently_reading = CurrentlyReading::RecordOpcodeLength;
continue;
}
let input = match self.consume(MAGIC.len())? {
BufOrRemainder::Buf(input) => input,
BufOrRemainder::Remainder(want) => return self.request(want),
Expand All @@ -105,6 +126,9 @@ impl LinearReader {
self.currently_reading = CurrentlyReading::RecordOpcodeLength;
}
CurrentlyReading::EndMagic => {
if self.options.skip_end_magic {
return Ok(ReadState::Finished);
}
let input = match self.consume(MAGIC.len())? {
BufOrRemainder::Buf(input) => input,
BufOrRemainder::Remainder(want) => return self.request(want),
Expand All @@ -122,7 +146,7 @@ impl LinearReader {
};
let opcode = input[0];
let record_length: u64 = u64::from_le_bytes(input[1..9].try_into().unwrap());
if opcode == op::CHUNK {
if opcode == op::CHUNK && !self.options.emit_chunks {
self.currently_reading = CurrentlyReading::ChunkHeader { record_length };
} else {
self.currently_reading = CurrentlyReading::RecordContent {
Expand Down Expand Up @@ -341,7 +365,7 @@ mod tests {
})?;
writer.finish()?;
}
let mut reader = LinearReader::new();
let mut reader = RecordReader::new();
let mut cursor = std::io::Cursor::new(buf.into_inner());
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
Expand Down Expand Up @@ -402,7 +426,7 @@ mod tests {
})?;
writer.finish()?;
}
let mut reader = LinearReader::new();
let mut reader = RecordReader::new();
let mut cursor = std::io::Cursor::new(buf.into_inner());
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
Expand Down
4 changes: 0 additions & 4 deletions rust/src/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
//! Read MCAP data from a stream asynchronously
#[cfg(feature = "lz4")]
mod lz4;
pub mod read;
mod read_exact_or_zero;

pub use read::{RecordReader, RecordReaderOptions};
use read_exact_or_zero::read_exact_or_zero;
146 changes: 0 additions & 146 deletions rust/src/tokio/lz4.rs

This file was deleted.

Loading

0 comments on commit d8f5325

Please sign in to comment.