From 75ba3c0256ff2392554f1bb5532897bfd474154e Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 9 Aug 2024 14:29:30 +1000 Subject: [PATCH 01/25] rust: add async under `tokio` feature flag --- rust/Cargo.toml | 11 +- rust/src/lib.rs | 4 + rust/src/read.rs | 4 +- rust/src/records.rs | 38 ++++ rust/src/tokio.rs | 2 + rust/src/tokio/lz4.rs | 132 +++++++++++++ rust/src/tokio/read.rs | 438 +++++++++++++++++++++++++++++++++++++++++ rust/src/write.rs | 10 +- 8 files changed, 631 insertions(+), 8 deletions(-) create mode 100644 rust/src/tokio.rs create mode 100644 rust/src/tokio/lz4.rs create mode 100644 rust/src/tokio/read.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index f2fa8ca0d2..015b8f5cad 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ] repository = "https://github.com/foxglove/mcap" documentation = "https://docs.rs/mcap" readme = "README.md" -version = "0.9.2" +version = "0.10.0" edition = "2021" license = "MIT" @@ -22,7 +22,10 @@ log = "0.4" num_cpus = "1.13" paste = "1.0" thiserror = "1.0" -lz4_flex = { version = "0.11.1", optional = true } +lz4 = { version = "1", optional = true } +async-compression = { version = "*", features = ["tokio", "zstd"], optional = true } +tokio = { version = "1", features = ["io-util"] , optional = true } +tokio-stream = { version = "*", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] zstd = { version = "0.11", features = ["wasm"], optional = true } @@ -33,7 +36,8 @@ zstd = { version = "0.11", features = ["zstdmt"], optional = true } [features] default = ["zstd", "lz4"] zstd = ["dep:zstd"] -lz4 = ["dep:lz4_flex"] +lz4 = ["dep:lz4"] +tokio = ["dep:async-compression", "dep:tokio", "dep:tokio-stream"] [dev-dependencies] anyhow = "1" @@ -48,6 +52,7 @@ serde = { version = "1.0.145", features = ["derive"] } serde_json = "1" simplelog = "0.12" tempfile = "3.3" +tokio = { version = "1", features = ["io-util", "macros", "rt"] } [[bench]] name = "reader" diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 9b2eb1b1c7..84df48a4ed 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -75,6 +75,8 @@ pub mod read; pub mod records; +#[cfg(feature = "tokio")] +pub mod tokio; pub mod write; mod io_utils; @@ -119,6 +121,8 @@ pub enum McapError { UnexpectedEof, #[error("Chunk ended in the middle of a record")] UnexpectedEoc, + #[error("Record with opcode {opcode:02X} has length {len}, need at least {expected} to parse")] + RecordTooShort { opcode: u8, len: u64, expected: u64 }, #[error("Message {0} referenced unknown channel {1}")] UnknownChannel(u32, u16), #[error("Channel `{0}` referenced unknown schema {1}")] diff --git a/rust/src/read.rs b/rust/src/read.rs index 72dad85fb2..e7d4d34e41 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -144,7 +144,7 @@ fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult McapResult> { +pub(crate) fn read_record(op: u8, body: &[u8]) -> McapResult> { macro_rules! record { ($b:ident) => {{ let mut cur = Cursor::new($b); @@ -278,7 +278,7 @@ impl<'a> ChunkReader<'a> { #[cfg(feature = "lz4")] "lz4" => ChunkDecompressor::Compressed(Some(CountingCrcReader::new(Box::new( - lz4_flex::frame::FrameDecoder::new(data), + lz4::Decoder::new(data)?, )))), #[cfg(not(feature = "lz4"))] diff --git a/rust/src/records.rs b/rust/src/records.rs index eeb842c495..0f373d0268 100644 --- a/rust/src/records.rs +++ b/rust/src/records.rs @@ -99,6 +99,44 @@ impl Record<'_> { Record::Unknown { opcode, .. } => *opcode, } } + + /// Moves this value into a fully-owned variant with no borrows. This should be free for + /// already-owned values. + pub fn into_owned(self) -> Record<'static> { + match self { + Record::Header(header) => Record::Header(header), + Record::Footer(footer) => Record::Footer(footer), + Record::Schema { header, data } => Record::Schema { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::Channel(channel) => Record::Channel(channel), + Record::Message { header, data } => Record::Message { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::Chunk { header, data } => Record::Chunk { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::MessageIndex(index) => Record::MessageIndex(index), + Record::ChunkIndex(index) => Record::ChunkIndex(index), + Record::Attachment { header, data } => Record::Attachment { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::AttachmentIndex(index) => Record::AttachmentIndex(index), + Record::Statistics(statistics) => Record::Statistics(statistics), + Record::Metadata(metadata) => Record::Metadata(metadata), + Record::MetadataIndex(index) => Record::MetadataIndex(index), + Record::SummaryOffset(offset) => Record::SummaryOffset(offset), + Record::DataEnd(end) => Record::DataEnd(end), + Record::Unknown { opcode, data } => Record::Unknown { + opcode: opcode, + data: Cow::Owned(data.into_owned()), + }, + } + } } #[binrw] diff --git a/rust/src/tokio.rs b/rust/src/tokio.rs new file mode 100644 index 0000000000..ab6bab2dc7 --- /dev/null +++ b/rust/src/tokio.rs @@ -0,0 +1,2 @@ +mod lz4; +pub mod read; diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs new file mode 100644 index 0000000000..bb41316bd5 --- /dev/null +++ b/rust/src/tokio/lz4.rs @@ -0,0 +1,132 @@ +use lz4::liblz4::{ + check_error, LZ4FDecompressionContext, LZ4F_createDecompressionContext, LZ4F_decompress, + LZ4F_freeDecompressionContext, LZ4F_VERSION, +}; +use std::io::{Error, ErrorKind, Result}; +use std::ptr; +use tokio::io::{AsyncRead, ReadBuf}; + +const BUFFER_SIZE: usize = 32 * 1024; + +#[derive(Debug)] +struct DecoderContext { + c: LZ4FDecompressionContext, +} + +// An equivalent of the lz4::Decoder `std::io::Read` wrapper for `tokio::io::AsyncRead`. +// Code below is adapted from the https://github.com/bozaro/lz4-rs crate. +#[derive(Debug)] +pub struct Lz4Decoder { + c: DecoderContext, + r: R, + buf: Box<[u8]>, + pos: usize, + len: usize, + next: usize, +} + +impl Lz4Decoder { + /// Creates a new decoder which reads its input from the given + /// input stream. The input stream can be re-acquired by calling + /// `finish()` + pub fn new(r: R) -> Result> { + Ok(Lz4Decoder { + r, + c: DecoderContext::new()?, + buf: vec![0; BUFFER_SIZE].into_boxed_slice(), + pos: BUFFER_SIZE, + len: BUFFER_SIZE, + // Minimal LZ4 stream size + next: 11, + }) + } + + pub fn finish(self) -> (R, Result<()>) { + ( + self.r, + match self.next { + 0 => Ok(()), + _ => Err(Error::new( + ErrorKind::Interrupted, + "Finish runned before read end of compressed stream", + )), + }, + ) + } +} + +impl AsyncRead for Lz4Decoder { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + if self.next == 0 || buf.remaining() == 0 { + return std::task::Poll::Ready(Ok(())); + } + let mut written_len: usize = 0; + let mself = self.get_mut(); + while written_len == 0 { + if mself.pos >= mself.len { + let need = if mself.buf.len() < mself.next { + mself.buf.len() + } else { + mself.next + }; + { + let mut comp_buf = ReadBuf::new(&mut mself.buf[..need]); + let result = std::pin::pin!(&mut mself.r).poll_read(cx, &mut comp_buf); + match result { + std::task::Poll::Pending => return result, + std::task::Poll::Ready(Err(_)) => return result, + _ => {} + }; + mself.len = comp_buf.filled().len(); + } + if mself.len == 0 { + break; + } + mself.pos = 0; + mself.next -= mself.len; + } + while (written_len < buf.remaining()) && (mself.pos < mself.len) { + let mut src_size = mself.len - mself.pos; + let mut dst_size = buf.remaining() - written_len; + let len = check_error(unsafe { + LZ4F_decompress( + mself.c.c, + buf.initialize_unfilled().as_mut_ptr(), + &mut dst_size, + mself.buf[mself.pos..].as_ptr(), + &mut src_size, + ptr::null(), + ) + })?; + mself.pos += src_size as usize; + written_len += dst_size as usize; + buf.set_filled(written_len); + if len == 0 { + mself.next = 0; + return std::task::Poll::Ready(Ok(())); + } else if mself.next < len { + mself.next = len; + } + } + } + return std::task::Poll::Ready(Ok(())); + } +} + +impl DecoderContext { + fn new() -> Result { + let mut context = LZ4FDecompressionContext(ptr::null_mut()); + check_error(unsafe { LZ4F_createDecompressionContext(&mut context, LZ4F_VERSION) })?; + Ok(DecoderContext { c: context }) + } +} + +impl Drop for DecoderContext { + fn drop(&mut self) { + unsafe { LZ4F_freeDecompressionContext(self.c) }; + } +} diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs new file mode 100644 index 0000000000..c0325ff16f --- /dev/null +++ b/rust/src/tokio/read.rs @@ -0,0 +1,438 @@ +use byteorder::ByteOrder; +use std::future::Future; +use tokio::io::{AsyncRead, AsyncReadExt, BufReader, Take}; +use tokio_stream::Stream; + +use crate::tokio::lz4::Lz4Decoder; +use crate::{records, McapError, McapResult, MAGIC}; +use async_compression::tokio::bufread::ZstdDecoder; + +enum ReaderState { + Base(R), + UncompressedChunk(Take), + ZstdChunk(ZstdDecoder>>), + Lz4Chunk(Lz4Decoder>), + Empty, +} + +impl AsyncRead for ReaderState +where + R: AsyncRead + std::marker::Unpin, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.get_mut() { + ReaderState::Base(r) => std::pin::pin!(r).poll_read(cx, buf), + ReaderState::UncompressedChunk(r) => std::pin::pin!(r).poll_read(cx, buf), + ReaderState::ZstdChunk(r) => std::pin::pin!(r).poll_read(cx, buf), + ReaderState::Lz4Chunk(r) => std::pin::pin!(r).poll_read(cx, buf), + ReaderState::Empty => { + panic!("invariant: reader is only set to empty while swapping with another valid variant") + } + } + } +} +impl ReaderState +where + R: AsyncRead, +{ + pub fn into_inner(self) -> McapResult { + match self { + ReaderState::Base(reader) => Ok(reader), + ReaderState::UncompressedChunk(take) => Ok(take.into_inner()), + ReaderState::ZstdChunk(decoder) => Ok(decoder.into_inner().into_inner().into_inner()), + ReaderState::Lz4Chunk(decoder) => { + let (output, result) = decoder.finish(); + result?; + Ok(output.into_inner()) + } + ReaderState::Empty => { + panic!("invariant: reader is only set to empty while swapping with another valid variant") + } + } + } +} +/// Reads an MCAP file record-by-record, writing the raw record data into a caller-provided Vec. +pub struct RecordReader { + reader: ReaderState, + options: Options, + start_magic_seen: bool, + footer_seen: bool, + scratch: [u8; 9], +} + +#[derive(Default, Clone)] +pub struct Options { + /// If true, the reader will not expect the MCAP magic at the start of the stream. + skip_start_magic: bool, + /// If true, the reader will not expect the MCAP magic at the end of the stream. + skip_end_magic: bool, + // If true, the reader will yield entire chunk records. Otherwise, the reader will decompress + // and read into the chunk, yielding the records inside. + emit_chunks: bool, +} + +enum Cmd { + YieldRecord(u8), + EnterChunk(records::ChunkHeader), + ExitChunk, + Stop, +} + +impl RecordReader +where + R: AsyncRead + std::marker::Unpin, +{ + pub fn new(reader: R) -> Self { + Self::new_with_options(reader, &Options::default()) + } + + pub fn new_with_options(reader: R, options: &Options) -> Self { + Self { + reader: ReaderState::Base(reader), + options: options.clone(), + start_magic_seen: false, + footer_seen: false, + scratch: [0; 9], + } + } + + pub fn into_inner(self) -> McapResult { + self.reader.into_inner() + } + + /// Reads the next record from the input stream and copies the raw content into `data`. + /// Returns the record's opcode as a result. + pub async fn next_record(&mut self, data: &mut Vec) -> McapResult> { + loop { + let cmd = self.next_record_inner(data).await?; + match cmd { + Cmd::Stop => return Ok(None), + Cmd::YieldRecord(opcode) => return Ok(Some(opcode)), + Cmd::EnterChunk(header) => { + let mut rdr = ReaderState::Empty; + std::mem::swap(&mut rdr, &mut self.reader); + match header.compression.as_str() { + "zstd" => { + self.reader = ReaderState::ZstdChunk(ZstdDecoder::new(BufReader::new( + rdr.into_inner()?.take(header.compressed_size), + ))); + } + "lz4" => { + let decoder = + Lz4Decoder::new(rdr.into_inner()?.take(header.compressed_size))?; + self.reader = ReaderState::Lz4Chunk(decoder); + } + "" => { + self.reader = ReaderState::UncompressedChunk( + rdr.into_inner()?.take(header.compressed_size), + ); + } + _ => { + std::mem::swap(&mut rdr, &mut self.reader); + return Err(McapError::UnsupportedCompression( + header.compression.clone(), + )); + } + } + } + Cmd::ExitChunk => { + let mut rdr = ReaderState::Empty; + std::mem::swap(&mut rdr, &mut self.reader); + self.reader = ReaderState::Base(rdr.into_inner()?) + } + }; + } + } + + async fn next_record_inner(&mut self, data: &mut Vec) -> McapResult { + if let ReaderState::Base(reader) = &mut self.reader { + if !self.start_magic_seen && !self.options.skip_start_magic { + reader.read_exact(&mut self.scratch[..MAGIC.len()]).await?; + if &self.scratch[..MAGIC.len()] != MAGIC { + return Err(McapError::BadMagic); + } + self.start_magic_seen = true; + } + if self.footer_seen && !self.options.skip_end_magic { + reader.read_exact(&mut self.scratch[..MAGIC.len()]).await?; + if &self.scratch[..MAGIC.len()] != MAGIC { + return Err(McapError::BadMagic); + } + return Ok(Cmd::Stop); + } + reader.read_exact(&mut self.scratch).await?; + let opcode = self.scratch[0]; + if opcode == records::op::FOOTER { + self.footer_seen = true; + } + let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..]); + if opcode == records::op::CHUNK && !self.options.emit_chunks { + let chunk_header = read_chunk_header(reader, data, record_len).await?; + return Ok(Cmd::EnterChunk(chunk_header)); + } + data.resize(record_len as usize, 0); + reader.read_exact(&mut data[..]).await?; + Ok(Cmd::YieldRecord(opcode)) + } else { + let len = self.reader.read(&mut self.scratch).await?; + if len == 0 { + return Ok(Cmd::ExitChunk); + } + if len != self.scratch.len() { + return Err(McapError::UnexpectedEof); + } + let opcode = self.scratch[0]; + let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..]); + data.resize(record_len as usize, 0); + self.reader.read_exact(&mut data[..]).await?; + Ok(Cmd::YieldRecord(opcode)) + } + } +} + +async fn read_chunk_header( + reader: &mut R, + scratch: &mut Vec, + record_len: u64, +) -> McapResult { + let mut header = records::ChunkHeader { + message_start_time: 0, + message_end_time: 0, + uncompressed_size: 0, + uncompressed_crc: 0, + compression: String::new(), + compressed_size: 0, + }; + if record_len < 40 { + return Err(McapError::RecordTooShort { + opcode: records::op::CHUNK, + len: record_len, + expected: 40, + }); + } + scratch.resize(32, 0); + reader.read_exact(&mut scratch[..]).await?; + header.message_start_time = byteorder::LittleEndian::read_u64(&scratch[0..8]); + header.message_end_time = byteorder::LittleEndian::read_u64(&scratch[8..16]); + header.uncompressed_size = byteorder::LittleEndian::read_u64(&scratch[16..24]); + header.uncompressed_crc = byteorder::LittleEndian::read_u32(&scratch[24..28]); + let compression_len = byteorder::LittleEndian::read_u32(&scratch[28..32]); + scratch.resize(compression_len as usize, 0); + if record_len < (40 + compression_len) as u64 { + return Err(McapError::RecordTooShort { + opcode: records::op::CHUNK, + len: record_len, + expected: (40 + compression_len) as u64, + }); + } + reader.read_exact(&mut scratch[..]).await?; + header.compression = match std::str::from_utf8(&scratch[..]) { + Ok(val) => val.to_owned(), + Err(err) => { + return Err(McapError::Parse(binrw::error::Error::Custom { + pos: 32, + err: Box::new(err), + })); + } + }; + scratch.resize(8, 0); + reader.read_exact(&mut scratch[..]).await?; + header.compressed_size = byteorder::LittleEndian::read_u64(&scratch[..]); + let available = record_len - (32 + compression_len as u64 + 8); + if available < header.compressed_size { + return Err(McapError::BadChunkLength { + header: header.compressed_size, + available, + }); + } + Ok(header) +} + +/// implements a Stream of owned `crate::record::Record` values. +pub struct LinearStream { + r: RecordReader, + buf: Vec, +} + +impl LinearStream { + /// Creates a new stream of records from a reader. + pub fn new(r: R) -> Self { + Self { + r: RecordReader::new(r), + buf: Vec::new(), + } + } + + pub fn into_inner(self) -> McapResult { + self.r.into_inner() + } +} + +impl Stream for LinearStream { + type Item = McapResult>; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // we do this swap maneuver in order to appease the borrow checker and also reuse one read + // buf across several records. + let mut buf = Vec::new(); + std::mem::swap(&mut buf, &mut self.buf); + let opcode = { + let res = std::pin::pin!((&mut self).r.next_record(&mut buf)).poll(cx); + match res { + std::task::Poll::Pending => { + std::mem::swap(&mut buf, &mut self.buf); + return std::task::Poll::Pending; + } + std::task::Poll::Ready(result) => match result { + Err(err) => { + std::mem::swap(&mut buf, &mut self.buf); + return std::task::Poll::Ready(Some(Err(err))); + } + Ok(None) => { + std::mem::swap(&mut buf, &mut self.buf); + return std::task::Poll::Ready(None); + } + Ok(Some(op)) => op, + }, + } + }; + let parse_res = crate::read::read_record(opcode, &buf[..]); + let result = std::task::Poll::Ready(Some(match parse_res { + Ok(record) => Ok(record.into_owned()), + Err(err) => Err(err), + })); + std::mem::swap(&mut buf, &mut self.buf); + return result; + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use tokio_stream::StreamExt; + + use super::*; + #[tokio::test] + async fn test_record_reader() -> Result<(), McapError> { + for compression in [ + None, + Some(crate::Compression::Lz4), + Some(crate::Compression::Zstd), + ] { + let mut buf = std::io::Cursor::new(Vec::new()); + { + let mut writer = crate::WriteOptions::new() + .compression(compression) + .create(&mut buf)?; + let channel = std::sync::Arc::new(crate::Channel { + topic: "chat".to_owned(), + schema: None, + message_encoding: "json".to_owned(), + metadata: BTreeMap::new(), + }); + writer.add_channel(&channel)?; + writer.write(&crate::Message { + channel, + sequence: 0, + log_time: 0, + publish_time: 0, + data: (&[0, 1, 2]).into(), + })?; + writer.finish()?; + } + let mut reader = RecordReader::new(std::io::Cursor::new(buf.into_inner())); + let mut record = Vec::new(); + let mut opcodes: Vec = Vec::new(); + loop { + let opcode = reader.next_record(&mut record).await?; + if let Some(opcode) = opcode { + opcodes.push(opcode); + } else { + break; + } + } + assert_eq!( + opcodes.as_slice(), + [ + records::op::HEADER, + records::op::CHANNEL, + records::op::MESSAGE, + records::op::MESSAGE_INDEX, + records::op::DATA_END, + records::op::CHANNEL, + records::op::CHUNK_INDEX, + records::op::STATISTICS, + records::op::SUMMARY_OFFSET, + records::op::SUMMARY_OFFSET, + records::op::SUMMARY_OFFSET, + records::op::FOOTER, + ], + "reads opcodes from MCAP compressed with {:?}", + compression + ); + } + Ok(()) + } + #[tokio::test] + async fn test_linear_stream() -> Result<(), McapError> { + for compression in [ + None, + Some(crate::Compression::Lz4), + Some(crate::Compression::Zstd), + ] { + let mut buf = std::io::Cursor::new(Vec::new()); + { + let mut writer = crate::WriteOptions::new() + .compression(compression) + .create(&mut buf)?; + let channel = std::sync::Arc::new(crate::Channel { + topic: "chat".to_owned(), + schema: None, + message_encoding: "json".to_owned(), + metadata: BTreeMap::new(), + }); + writer.add_channel(&channel)?; + writer.write(&crate::Message { + channel, + sequence: 0, + log_time: 0, + publish_time: 0, + data: (&[0, 1, 2]).into(), + })?; + writer.finish()?; + } + let mut reader = LinearStream::new(std::io::Cursor::new(buf.into_inner())); + let mut opcodes: Vec = Vec::new(); + while let Some(result) = reader.next().await { + let record = result?; + opcodes.push(record.opcode()) + } + assert_eq!( + opcodes.as_slice(), + [ + records::op::HEADER, + records::op::CHANNEL, + records::op::MESSAGE, + records::op::MESSAGE_INDEX, + records::op::DATA_END, + records::op::CHANNEL, + records::op::CHUNK_INDEX, + records::op::STATISTICS, + records::op::SUMMARY_OFFSET, + records::op::SUMMARY_OFFSET, + records::op::SUMMARY_OFFSET, + records::op::FOOTER, + ], + "reads records from MCAP compressed with {:?}", + compression + ); + } + Ok(()) + } +} diff --git a/rust/src/write.rs b/rust/src/write.rs index 32e47a0524..ef83587fb7 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -711,7 +711,7 @@ enum Compressor { #[cfg(feature = "zstd")] Zstd(zstd::Encoder<'static, W>), #[cfg(feature = "lz4")] - Lz4(lz4_flex::frame::FrameEncoder), + Lz4(lz4::Encoder), } impl Compressor { @@ -721,7 +721,11 @@ impl Compressor { #[cfg(feature = "zstd")] Compressor::Zstd(w) => w.finish()?, #[cfg(feature = "lz4")] - Compressor::Lz4(w) => w.finish()?, + Compressor::Lz4(w) => { + let (output, result) = w.finish(); + result?; + output + } }) } } @@ -797,7 +801,7 @@ impl ChunkWriter { Compressor::Zstd(enc) } #[cfg(feature = "lz4")] - Some(Compression::Lz4) => Compressor::Lz4(lz4_flex::frame::FrameEncoder::new(writer)), + Some(Compression::Lz4) => Compressor::Lz4(lz4::EncoderBuilder::new().build(writer)?), #[cfg(not(any(feature = "zstd", feature = "lz4")))] Some(_) => unreachable!("`Compression` is an empty enum that cannot be instantiated"), None => Compressor::Null(writer), From 71ba806157be0f63caf8eedfb86c7a7c1072586c Mon Sep 17 00:00:00 2001 From: James Smith Date: Wed, 14 Aug 2024 15:53:21 +1000 Subject: [PATCH 02/25] discard bytes after reading chunk --- rust/src/tokio/read.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index c0325ff16f..6b5ca14270 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -62,6 +62,8 @@ pub struct RecordReader { start_magic_seen: bool, footer_seen: bool, scratch: [u8; 9], + to_discard_after_chunk: usize, + discard_buf: Box<[u8]>, } #[derive(Default, Clone)] @@ -77,7 +79,10 @@ pub struct Options { enum Cmd { YieldRecord(u8), - EnterChunk(records::ChunkHeader), + EnterChunk { + header: records::ChunkHeader, + len: u64, + }, ExitChunk, Stop, } @@ -97,6 +102,8 @@ where start_magic_seen: false, footer_seen: false, scratch: [0; 9], + to_discard_after_chunk: 0, + discard_buf: vec![0; 1024].into_boxed_slice(), } } @@ -112,7 +119,7 @@ where match cmd { Cmd::Stop => return Ok(None), Cmd::YieldRecord(opcode) => return Ok(Some(opcode)), - Cmd::EnterChunk(header) => { + Cmd::EnterChunk { header, len } => { let mut rdr = ReaderState::Empty; std::mem::swap(&mut rdr, &mut self.reader); match header.compression.as_str() { @@ -138,11 +145,24 @@ where )); } } + self.to_discard_after_chunk = len as usize + - (40 + header.compression.len() + header.compressed_size as usize); } Cmd::ExitChunk => { let mut rdr = ReaderState::Empty; std::mem::swap(&mut rdr, &mut self.reader); - self.reader = ReaderState::Base(rdr.into_inner()?) + self.reader = ReaderState::Base(rdr.into_inner()?); + while self.to_discard_after_chunk > 0 { + let to_read = if self.to_discard_after_chunk > self.discard_buf.len() { + self.discard_buf.len() + } else { + self.to_discard_after_chunk + }; + self.reader + .read_exact(&mut self.discard_buf[..to_read]) + .await?; + self.to_discard_after_chunk -= to_read; + } } }; } @@ -171,8 +191,11 @@ where } let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..]); if opcode == records::op::CHUNK && !self.options.emit_chunks { - let chunk_header = read_chunk_header(reader, data, record_len).await?; - return Ok(Cmd::EnterChunk(chunk_header)); + let header = read_chunk_header(reader, data, record_len).await?; + return Ok(Cmd::EnterChunk { + header, + len: record_len, + }); } data.resize(record_len as usize, 0); reader.read_exact(&mut data[..]).await?; From 07e657133b52442a7099cba68569be426dd2b6a8 Mon Sep 17 00:00:00 2001 From: James Smith Date: Wed, 14 Aug 2024 16:24:45 +1000 Subject: [PATCH 03/25] nicer namespace usage --- rust/src/tokio/lz4.rs | 27 ++++++++------- rust/src/tokio/read.rs | 76 +++++++++++++++++++----------------------- 2 files changed, 50 insertions(+), 53 deletions(-) diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs index bb41316bd5..1145b588a4 100644 --- a/rust/src/tokio/lz4.rs +++ b/rust/src/tokio/lz4.rs @@ -1,9 +1,12 @@ +use std::io::{Error, ErrorKind, Result}; +use std::pin::{pin, Pin}; +use std::ptr; +use std::task::{Context, Poll}; + use lz4::liblz4::{ check_error, LZ4FDecompressionContext, LZ4F_createDecompressionContext, LZ4F_decompress, LZ4F_freeDecompressionContext, LZ4F_VERSION, }; -use std::io::{Error, ErrorKind, Result}; -use std::ptr; use tokio::io::{AsyncRead, ReadBuf}; const BUFFER_SIZE: usize = 32 * 1024; @@ -57,12 +60,12 @@ impl Lz4Decoder { impl AsyncRead for Lz4Decoder { fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { if self.next == 0 || buf.remaining() == 0 { - return std::task::Poll::Ready(Ok(())); + return Poll::Ready(Ok(())); } let mut written_len: usize = 0; let mself = self.get_mut(); @@ -75,10 +78,10 @@ impl AsyncRead for Lz4Decoder { }; { let mut comp_buf = ReadBuf::new(&mut mself.buf[..need]); - let result = std::pin::pin!(&mut mself.r).poll_read(cx, &mut comp_buf); + let result = pin!(&mut mself.r).poll_read(cx, &mut comp_buf); match result { - std::task::Poll::Pending => return result, - std::task::Poll::Ready(Err(_)) => return result, + Poll::Pending => return result, + Poll::Ready(Err(_)) => return result, _ => {} }; mself.len = comp_buf.filled().len(); @@ -107,13 +110,13 @@ impl AsyncRead for Lz4Decoder { buf.set_filled(written_len); if len == 0 { mself.next = 0; - return std::task::Poll::Ready(Ok(())); + return Poll::Ready(Ok(())); } else if mself.next < len { mself.next = len; } } } - return std::task::Poll::Ready(Ok(())); + return Poll::Ready(Ok(())); } } diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index 6b5ca14270..3e67366ad2 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -1,11 +1,14 @@ -use byteorder::ByteOrder; use std::future::Future; -use tokio::io::{AsyncRead, AsyncReadExt, BufReader, Take}; +use std::pin::{pin, Pin}; +use std::task::{Context, Poll}; + +use async_compression::tokio::bufread::ZstdDecoder; +use byteorder::ByteOrder; +use tokio::io::{AsyncRead, AsyncReadExt, BufReader, ReadBuf, Take}; use tokio_stream::Stream; use crate::tokio::lz4::Lz4Decoder; use crate::{records, McapError, McapResult, MAGIC}; -use async_compression::tokio::bufread::ZstdDecoder; enum ReaderState { Base(R), @@ -20,15 +23,15 @@ where R: AsyncRead + std::marker::Unpin, { fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { match self.get_mut() { - ReaderState::Base(r) => std::pin::pin!(r).poll_read(cx, buf), - ReaderState::UncompressedChunk(r) => std::pin::pin!(r).poll_read(cx, buf), - ReaderState::ZstdChunk(r) => std::pin::pin!(r).poll_read(cx, buf), - ReaderState::Lz4Chunk(r) => std::pin::pin!(r).poll_read(cx, buf), + ReaderState::Base(r) => pin!(r).poll_read(cx, buf), + ReaderState::UncompressedChunk(r) => pin!(r).poll_read(cx, buf), + ReaderState::ZstdChunk(r) => pin!(r).poll_read(cx, buf), + ReaderState::Lz4Chunk(r) => pin!(r).poll_read(cx, buf), ReaderState::Empty => { panic!("invariant: reader is only set to empty while swapping with another valid variant") } @@ -61,9 +64,8 @@ pub struct RecordReader { options: Options, start_magic_seen: bool, footer_seen: bool, - scratch: [u8; 9], to_discard_after_chunk: usize, - discard_buf: Box<[u8]>, + scratch: Box<[u8]>, } #[derive(Default, Clone)] @@ -101,9 +103,8 @@ where options: options.clone(), start_magic_seen: false, footer_seen: false, - scratch: [0; 9], to_discard_after_chunk: 0, - discard_buf: vec![0; 1024].into_boxed_slice(), + scratch: vec![0; 1024].into_boxed_slice(), } } @@ -153,14 +154,12 @@ where std::mem::swap(&mut rdr, &mut self.reader); self.reader = ReaderState::Base(rdr.into_inner()?); while self.to_discard_after_chunk > 0 { - let to_read = if self.to_discard_after_chunk > self.discard_buf.len() { - self.discard_buf.len() + let to_read = if self.to_discard_after_chunk > self.scratch.len() { + self.scratch.len() } else { self.to_discard_after_chunk }; - self.reader - .read_exact(&mut self.discard_buf[..to_read]) - .await?; + self.reader.read_exact(&mut self.scratch[..to_read]).await?; self.to_discard_after_chunk -= to_read; } } @@ -184,12 +183,12 @@ where } return Ok(Cmd::Stop); } - reader.read_exact(&mut self.scratch).await?; + reader.read_exact(&mut self.scratch[..9]).await?; let opcode = self.scratch[0]; if opcode == records::op::FOOTER { self.footer_seen = true; } - let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..]); + let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..9]); if opcode == records::op::CHUNK && !self.options.emit_chunks { let header = read_chunk_header(reader, data, record_len).await?; return Ok(Cmd::EnterChunk { @@ -201,15 +200,15 @@ where reader.read_exact(&mut data[..]).await?; Ok(Cmd::YieldRecord(opcode)) } else { - let len = self.reader.read(&mut self.scratch).await?; + let len = self.reader.read(&mut self.scratch[..9]).await?; if len == 0 { return Ok(Cmd::ExitChunk); } - if len != self.scratch.len() { + if len != 9 { return Err(McapError::UnexpectedEof); } let opcode = self.scratch[0]; - let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..]); + let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..9]); data.resize(record_len as usize, 0); self.reader.read_exact(&mut data[..]).await?; Ok(Cmd::YieldRecord(opcode)) @@ -298,35 +297,35 @@ impl LinearStream { impl Stream for LinearStream { type Item = McapResult>; fn poll_next( - mut self: std::pin::Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { // we do this swap maneuver in order to appease the borrow checker and also reuse one read // buf across several records. let mut buf = Vec::new(); std::mem::swap(&mut buf, &mut self.buf); let opcode = { - let res = std::pin::pin!((&mut self).r.next_record(&mut buf)).poll(cx); + let res = pin!((&mut self).r.next_record(&mut buf)).poll(cx); match res { - std::task::Poll::Pending => { + Poll::Pending => { std::mem::swap(&mut buf, &mut self.buf); - return std::task::Poll::Pending; + return Poll::Pending; } - std::task::Poll::Ready(result) => match result { + Poll::Ready(result) => match result { Err(err) => { std::mem::swap(&mut buf, &mut self.buf); - return std::task::Poll::Ready(Some(Err(err))); + return Poll::Ready(Some(Err(err))); } Ok(None) => { std::mem::swap(&mut buf, &mut self.buf); - return std::task::Poll::Ready(None); + return Poll::Ready(None); } Ok(Some(op)) => op, }, } }; let parse_res = crate::read::read_record(opcode, &buf[..]); - let result = std::task::Poll::Ready(Some(match parse_res { + let result = Poll::Ready(Some(match parse_res { Ok(record) => Ok(record.into_owned()), Err(err) => Err(err), })); @@ -372,13 +371,8 @@ mod tests { let mut reader = RecordReader::new(std::io::Cursor::new(buf.into_inner())); let mut record = Vec::new(); let mut opcodes: Vec = Vec::new(); - loop { - let opcode = reader.next_record(&mut record).await?; - if let Some(opcode) = opcode { - opcodes.push(opcode); - } else { - break; - } + while let Some(opcode) = reader.next_record(&mut record).await? { + opcodes.push(opcode); } assert_eq!( opcodes.as_slice(), From 4dbb4cc70de82667352fa4a7de79117fed9b3950 Mon Sep 17 00:00:00 2001 From: James Smith Date: Wed, 14 Aug 2024 22:30:06 +1000 Subject: [PATCH 04/25] tweak attachment test --- rust/tests/attachment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/tests/attachment.rs b/rust/tests/attachment.rs index 76b6762777..dba9fadd90 100644 --- a/rust/tests/attachment.rs +++ b/rust/tests/attachment.rs @@ -66,7 +66,7 @@ fn round_trip() -> Result<()> { ..Default::default() }), attachment_indexes: vec![mcap::records::AttachmentIndex { - offset: 38, // Finicky - depends on the length of the library version string + offset: 39, // Finicky - depends on the length of the library version string length: 78, log_time: 2, create_time: 1, From 39b00f923f6f0986072eab6bae50d9196fcd8886 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 15 Aug 2024 11:00:11 +1000 Subject: [PATCH 05/25] remove stream implementation --- rust/Cargo.toml | 3 +- rust/src/lib.rs | 2 +- rust/src/read.rs | 5 +- rust/src/tokio/read.rs | 121 +---------------------------------------- rust/tests/metadata.rs | 2 +- 5 files changed, 8 insertions(+), 125 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 015b8f5cad..0b70fb047c 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -25,7 +25,6 @@ thiserror = "1.0" lz4 = { version = "1", optional = true } async-compression = { version = "*", features = ["tokio", "zstd"], optional = true } tokio = { version = "1", features = ["io-util"] , optional = true } -tokio-stream = { version = "*", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] zstd = { version = "0.11", features = ["wasm"], optional = true } @@ -37,7 +36,7 @@ zstd = { version = "0.11", features = ["zstdmt"], optional = true } default = ["zstd", "lz4"] zstd = ["dep:zstd"] lz4 = ["dep:lz4"] -tokio = ["dep:async-compression", "dep:tokio", "dep:tokio-stream"] +tokio = ["dep:async-compression", "dep:tokio"] [dev-dependencies] anyhow = "1" diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 84df48a4ed..31303cdc7b 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -200,5 +200,5 @@ pub struct Attachment<'a> { pub data: Cow<'a, [u8]>, } -pub use read::{MessageStream, Summary}; +pub use read::{read_record, MessageStream, Summary}; pub use write::{WriteOptions, Writer}; diff --git a/rust/src/read.rs b/rust/src/read.rs index e7d4d34e41..b98cbd28c6 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -143,8 +143,9 @@ fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult McapResult> { +/// Given a records' opcode and data, parse into a Record. The resulting Record will contain +/// borrowed slices from `body`. +pub fn read_record(op: u8, body: &[u8]) -> McapResult> { macro_rules! record { ($b:ident) => {{ let mut cur = Cursor::new($b); diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index 3e67366ad2..d44e87f34b 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -1,11 +1,9 @@ -use std::future::Future; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use async_compression::tokio::bufread::ZstdDecoder; use byteorder::ByteOrder; use tokio::io::{AsyncRead, AsyncReadExt, BufReader, ReadBuf, Take}; -use tokio_stream::Stream; use crate::tokio::lz4::Lz4Decoder; use crate::{records, McapError, McapResult, MAGIC}; @@ -274,70 +272,10 @@ async fn read_chunk_header( Ok(header) } -/// implements a Stream of owned `crate::record::Record` values. -pub struct LinearStream { - r: RecordReader, - buf: Vec, -} - -impl LinearStream { - /// Creates a new stream of records from a reader. - pub fn new(r: R) -> Self { - Self { - r: RecordReader::new(r), - buf: Vec::new(), - } - } - - pub fn into_inner(self) -> McapResult { - self.r.into_inner() - } -} - -impl Stream for LinearStream { - type Item = McapResult>; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // we do this swap maneuver in order to appease the borrow checker and also reuse one read - // buf across several records. - let mut buf = Vec::new(); - std::mem::swap(&mut buf, &mut self.buf); - let opcode = { - let res = pin!((&mut self).r.next_record(&mut buf)).poll(cx); - match res { - Poll::Pending => { - std::mem::swap(&mut buf, &mut self.buf); - return Poll::Pending; - } - Poll::Ready(result) => match result { - Err(err) => { - std::mem::swap(&mut buf, &mut self.buf); - return Poll::Ready(Some(Err(err))); - } - Ok(None) => { - std::mem::swap(&mut buf, &mut self.buf); - return Poll::Ready(None); - } - Ok(Some(op)) => op, - }, - } - }; - let parse_res = crate::read::read_record(opcode, &buf[..]); - let result = Poll::Ready(Some(match parse_res { - Ok(record) => Ok(record.into_owned()), - Err(err) => Err(err), - })); - std::mem::swap(&mut buf, &mut self.buf); - return result; - } -} - #[cfg(test)] mod tests { + use crate::read::read_record; use std::collections::BTreeMap; - use tokio_stream::StreamExt; use super::*; #[tokio::test] @@ -373,6 +311,7 @@ mod tests { let mut opcodes: Vec = Vec::new(); while let Some(opcode) = reader.next_record(&mut record).await? { opcodes.push(opcode); + read_record(opcode, &record)?; } assert_eq!( opcodes.as_slice(), @@ -396,60 +335,4 @@ mod tests { } Ok(()) } - #[tokio::test] - async fn test_linear_stream() -> Result<(), McapError> { - for compression in [ - None, - Some(crate::Compression::Lz4), - Some(crate::Compression::Zstd), - ] { - let mut buf = std::io::Cursor::new(Vec::new()); - { - let mut writer = crate::WriteOptions::new() - .compression(compression) - .create(&mut buf)?; - let channel = std::sync::Arc::new(crate::Channel { - topic: "chat".to_owned(), - schema: None, - message_encoding: "json".to_owned(), - metadata: BTreeMap::new(), - }); - writer.add_channel(&channel)?; - writer.write(&crate::Message { - channel, - sequence: 0, - log_time: 0, - publish_time: 0, - data: (&[0, 1, 2]).into(), - })?; - writer.finish()?; - } - let mut reader = LinearStream::new(std::io::Cursor::new(buf.into_inner())); - let mut opcodes: Vec = Vec::new(); - while let Some(result) = reader.next().await { - let record = result?; - opcodes.push(record.opcode()) - } - assert_eq!( - opcodes.as_slice(), - [ - records::op::HEADER, - records::op::CHANNEL, - records::op::MESSAGE, - records::op::MESSAGE_INDEX, - records::op::DATA_END, - records::op::CHANNEL, - records::op::CHUNK_INDEX, - records::op::STATISTICS, - records::op::SUMMARY_OFFSET, - records::op::SUMMARY_OFFSET, - records::op::SUMMARY_OFFSET, - records::op::FOOTER, - ], - "reads records from MCAP compressed with {:?}", - compression - ); - } - Ok(()) - } } diff --git a/rust/tests/metadata.rs b/rust/tests/metadata.rs index 606d81e791..b928bf578c 100644 --- a/rust/tests/metadata.rs +++ b/rust/tests/metadata.rs @@ -56,7 +56,7 @@ fn round_trip() -> Result<()> { ..Default::default() }), metadata_indexes: vec![mcap::records::MetadataIndex { - offset: 38, // Finicky - depends on the length of the library version string + offset: 39, // Finicky - depends on the length of the library version string length: 41, name: String::from("myMetadata"), }], From db3131b762a0998416ab1c4e26d509392c167dc7 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 15 Aug 2024 15:00:50 +1000 Subject: [PATCH 06/25] no longer depend on crate version string --- rust/tests/attachment.rs | 3 ++- rust/tests/metadata.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/tests/attachment.rs b/rust/tests/attachment.rs index dba9fadd90..57eb98b1eb 100644 --- a/rust/tests/attachment.rs +++ b/rust/tests/attachment.rs @@ -66,7 +66,8 @@ fn round_trip() -> Result<()> { ..Default::default() }), attachment_indexes: vec![mcap::records::AttachmentIndex { - offset: 39, // Finicky - depends on the length of the library version string + // offset depends on the length of the embedded library string, which includes the crate version + offset: 33 + (env!("CARGO_PKG_VERSION").len() as u64), length: 78, log_time: 2, create_time: 1, diff --git a/rust/tests/metadata.rs b/rust/tests/metadata.rs index b928bf578c..905ab98ff7 100644 --- a/rust/tests/metadata.rs +++ b/rust/tests/metadata.rs @@ -56,7 +56,8 @@ fn round_trip() -> Result<()> { ..Default::default() }), metadata_indexes: vec![mcap::records::MetadataIndex { - offset: 39, // Finicky - depends on the length of the library version string + // offset depends on the length of the embedded library string, which includes the crate version + offset: 33 + (env!("CARGO_PKG_VERSION").len() as u64), length: 41, name: String::from("myMetadata"), }], From 829c5db9fc094f26f0af64a3ba3b2ffb63af4f2c Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 15 Aug 2024 20:39:25 +1000 Subject: [PATCH 07/25] add conformance test, rename to parse_record --- .github/workflows/ci.yml | 3 +- rust/Cargo.toml | 2 +- rust/examples/common/serialization.rs | 133 +++++++++++++++++ rust/examples/conformance_reader.rs | 137 +----------------- rust/examples/conformance_reader_async.rs | 37 +++++ rust/src/lib.rs | 2 +- rust/src/read.rs | 4 +- rust/src/tokio/read.rs | 4 +- .../runners/RustAsyncReaderTestRunner.ts | 22 +++ .../scripts/run-tests/runners/index.ts | 2 + 10 files changed, 208 insertions(+), 138 deletions(-) create mode 100644 rust/examples/common/serialization.rs create mode 100644 rust/examples/conformance_reader_async.rs create mode 100644 tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e7e2083df1..28f6e5acd7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -160,7 +160,8 @@ jobs: with: toolchain: stable default: true - - run: cd rust && cargo build --example=conformance_reader + - run: cargo build --example=conformance_reader --example=conformance_reader_async --features=tokio + working-directory: rust - run: yarn install --immutable - run: yarn test:conformance:generate-inputs --verify - run: yarn test:conformance --runner rust- diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0b70fb047c..34f3a74344 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -51,7 +51,7 @@ serde = { version = "1.0.145", features = ["derive"] } serde_json = "1" simplelog = "0.12" tempfile = "3.3" -tokio = { version = "1", features = ["io-util", "macros", "rt"] } +tokio = { version = "1", features = ["io-util", "macros", "rt", "fs"] } [[bench]] name = "reader" diff --git a/rust/examples/common/serialization.rs b/rust/examples/common/serialization.rs new file mode 100644 index 0000000000..73c1c7f9e4 --- /dev/null +++ b/rust/examples/common/serialization.rs @@ -0,0 +1,133 @@ +use mcap::records::Record; + +use std::collections::BTreeMap; + +use serde_json::{json, Value}; + +// We don't want to force Serde on users just for the sake of the conformance tests. +// (In what context would you want to serialize individual records of a MCAP?) +// Stamp out and stringify them ourselves: + +fn get_type(rec: &Record<'_>) -> &'static str { + match rec { + Record::Header(_) => "Header", + Record::Footer(_) => "Footer", + Record::Schema { .. } => "Schema", + Record::Channel(_) => "Channel", + Record::Message { .. } => "Message", + Record::Chunk { .. } => "Chunk", + Record::MessageIndex(_) => "MessageIndex", + Record::ChunkIndex(_) => "ChunkIndex", + Record::Attachment { .. } => "Attachment", + Record::AttachmentIndex(_) => "AttachmentIndex", + Record::Statistics(_) => "Statistics", + Record::Metadata(_) => "Metadata", + Record::MetadataIndex(_) => "MetadataIndex", + Record::SummaryOffset(_) => "SummaryOffset", + Record::DataEnd(_) => "DataEnd", + Record::Unknown { opcode, .. } => { + panic!("Unknown record in conformance test: (op {opcode})") + } + } +} + +fn get_fields(rec: &Record<'_>) -> Value { + fn b2s(bytes: &[u8]) -> Vec { + bytes.iter().map(|b| b.to_string()).collect() + } + fn m2s(map: &BTreeMap) -> BTreeMap { + map.iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + match rec { + Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]), + Record::Footer(f) => json!([ + ["summary_crc", f.summary_crc.to_string()], + ["summary_offset_start", f.summary_offset_start.to_string()], + ["summary_start", f.summary_start.to_string()] + ]), + Record::Schema { header, data } => json!([ + ["data", b2s(data)], + ["encoding", header.encoding], + ["id", header.id.to_string()], + ["name", header.name] + ]), + Record::Channel(c) => json!([ + ["id", c.id.to_string()], + ["message_encoding", c.message_encoding], + ["metadata", c.metadata], + ["schema_id", c.schema_id.to_string()], + ["topic", c.topic] + ]), + Record::Message { header, data } => json!([ + ["channel_id", header.channel_id.to_string()], + ["data", b2s(data)], + ["log_time", header.log_time.to_string()], + ["publish_time", header.publish_time.to_string()], + ["sequence", header.sequence.to_string()] + ]), + Record::Chunk { .. } => unreachable!("Chunks are flattened"), + Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"), + Record::ChunkIndex(i) => json!([ + ["chunk_length", i.chunk_length.to_string()], + ["chunk_start_offset", i.chunk_start_offset.to_string()], + ["compressed_size", i.compressed_size.to_string()], + ["compression", i.compression], + ["message_end_time", i.message_end_time.to_string()], + ["message_index_length", i.message_index_length.to_string()], + ["message_index_offsets", m2s(&i.message_index_offsets)], + ["message_start_time", i.message_start_time.to_string()], + ["uncompressed_size", i.uncompressed_size.to_string()] + ]), + Record::Attachment { header, data } => json!([ + ["create_time", header.create_time.to_string()], + ["data", b2s(data)], + ["log_time", header.log_time.to_string()], + ["media_type", header.media_type], + ["name", header.name] + ]), + Record::AttachmentIndex(i) => json!([ + ["create_time", i.create_time.to_string()], + ["data_size", i.data_size.to_string()], + ["length", i.length.to_string()], + ["log_time", i.log_time.to_string()], + ["media_type", i.media_type], + ["name", i.name], + ["offset", i.offset.to_string()] + ]), + Record::Statistics(s) => json!([ + ["attachment_count", s.attachment_count.to_string()], + ["channel_count", s.channel_count.to_string()], + ["channel_message_counts", m2s(&s.channel_message_counts)], + ["chunk_count", s.chunk_count.to_string()], + ["message_count", s.message_count.to_string()], + ["message_end_time", s.message_end_time.to_string()], + ["message_start_time", s.message_start_time.to_string()], + ["metadata_count", s.metadata_count.to_string()], + ["schema_count", s.schema_count.to_string()] + ]), + Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]), + Record::MetadataIndex(i) => json!([ + ["length", i.length.to_string()], + ["name", i.name], + ["offset", i.offset.to_string()] + ]), + Record::SummaryOffset(s) => json!([ + ["group_length", s.group_length.to_string()], + ["group_opcode", s.group_opcode.to_string()], + ["group_start", s.group_start.to_string()] + ]), + Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]), + Record::Unknown { opcode, .. } => { + panic!("Unknown record in conformance test: (op {opcode})") + } + } +} + +pub fn as_json(view: &Record<'_>) -> Value { + let typename = get_type(view); + let fields = get_fields(view); + json!({"type": typename, "fields": fields}) +} diff --git a/rust/examples/conformance_reader.rs b/rust/examples/conformance_reader.rs index 941ee9386e..3bfa97ae19 100644 --- a/rust/examples/conformance_reader.rs +++ b/rust/examples/conformance_reader.rs @@ -1,136 +1,11 @@ -use mcap::records::Record; - -use std::{collections::BTreeMap, env, process}; +#[path = "common/serialization.rs"] +mod serialization; use serde_json::{json, Value}; -// We don't want to force Serde on users just for the sake of the conformance tests. -// (In what context would you want to serialize individual records of a MCAP?) -// Stamp out and stringify them ourselves: - -fn get_type(rec: &Record<'_>) -> &'static str { - match rec { - Record::Header(_) => "Header", - Record::Footer(_) => "Footer", - Record::Schema { .. } => "Schema", - Record::Channel(_) => "Channel", - Record::Message { .. } => "Message", - Record::Chunk { .. } => "Chunk", - Record::MessageIndex(_) => "MessageIndex", - Record::ChunkIndex(_) => "ChunkIndex", - Record::Attachment { .. } => "Attachment", - Record::AttachmentIndex(_) => "AttachmentIndex", - Record::Statistics(_) => "Statistics", - Record::Metadata(_) => "Metadata", - Record::MetadataIndex(_) => "MetadataIndex", - Record::SummaryOffset(_) => "SummaryOffset", - Record::DataEnd(_) => "DataEnd", - Record::Unknown { opcode, .. } => { - panic!("Unknown record in conformance test: (op {opcode})") - } - } -} - -fn get_fields(rec: &Record<'_>) -> Value { - fn b2s(bytes: &[u8]) -> Vec { - bytes.iter().map(|b| b.to_string()).collect() - } - fn m2s(map: &BTreeMap) -> BTreeMap { - map.iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - match rec { - Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]), - Record::Footer(f) => json!([ - ["summary_crc", f.summary_crc.to_string()], - ["summary_offset_start", f.summary_offset_start.to_string()], - ["summary_start", f.summary_start.to_string()] - ]), - Record::Schema { header, data } => json!([ - ["data", b2s(data)], - ["encoding", header.encoding], - ["id", header.id.to_string()], - ["name", header.name] - ]), - Record::Channel(c) => json!([ - ["id", c.id.to_string()], - ["message_encoding", c.message_encoding], - ["metadata", c.metadata], - ["schema_id", c.schema_id.to_string()], - ["topic", c.topic] - ]), - Record::Message { header, data } => json!([ - ["channel_id", header.channel_id.to_string()], - ["data", b2s(data)], - ["log_time", header.log_time.to_string()], - ["publish_time", header.publish_time.to_string()], - ["sequence", header.sequence.to_string()] - ]), - Record::Chunk { .. } => unreachable!("Chunks are flattened"), - Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"), - Record::ChunkIndex(i) => json!([ - ["chunk_length", i.chunk_length.to_string()], - ["chunk_start_offset", i.chunk_start_offset.to_string()], - ["compressed_size", i.compressed_size.to_string()], - ["compression", i.compression], - ["message_end_time", i.message_end_time.to_string()], - ["message_index_length", i.message_index_length.to_string()], - ["message_index_offsets", m2s(&i.message_index_offsets)], - ["message_start_time", i.message_start_time.to_string()], - ["uncompressed_size", i.uncompressed_size.to_string()] - ]), - Record::Attachment { header, data } => json!([ - ["create_time", header.create_time.to_string()], - ["data", b2s(data)], - ["log_time", header.log_time.to_string()], - ["media_type", header.media_type], - ["name", header.name] - ]), - Record::AttachmentIndex(i) => json!([ - ["create_time", i.create_time.to_string()], - ["data_size", i.data_size.to_string()], - ["length", i.length.to_string()], - ["log_time", i.log_time.to_string()], - ["media_type", i.media_type], - ["name", i.name], - ["offset", i.offset.to_string()] - ]), - Record::Statistics(s) => json!([ - ["attachment_count", s.attachment_count.to_string()], - ["channel_count", s.channel_count.to_string()], - ["channel_message_counts", m2s(&s.channel_message_counts)], - ["chunk_count", s.chunk_count.to_string()], - ["message_count", s.message_count.to_string()], - ["message_end_time", s.message_end_time.to_string()], - ["message_start_time", s.message_start_time.to_string()], - ["metadata_count", s.metadata_count.to_string()], - ["schema_count", s.schema_count.to_string()] - ]), - Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]), - Record::MetadataIndex(i) => json!([ - ["length", i.length.to_string()], - ["name", i.name], - ["offset", i.offset.to_string()] - ]), - Record::SummaryOffset(s) => json!([ - ["group_length", s.group_length.to_string()], - ["group_opcode", s.group_opcode.to_string()], - ["group_start", s.group_start.to_string()] - ]), - Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]), - Record::Unknown { opcode, .. } => { - panic!("Unknown record in conformance test: (op {opcode})") - } - } -} - -fn as_json(view: &Record<'_>) -> Value { - let typename = get_type(view); - let fields = get_fields(view); - json!({"type": typename, "fields": fields}) -} +use mcap::records::Record; +use std::env; +use std::process; pub fn main() { let args: Vec = env::args().collect(); @@ -143,7 +18,7 @@ pub fn main() { for rec in mcap::read::ChunkFlattener::new(&file).expect("Couldn't read file") { let r = rec.expect("failed to read next record"); if !matches!(r, Record::MessageIndex(_)) { - json_records.push(as_json(&r)); + json_records.push(serialization::as_json(&r)); } } let out = json!({ "records": json_records }); diff --git a/rust/examples/conformance_reader_async.rs b/rust/examples/conformance_reader_async.rs new file mode 100644 index 0000000000..f0e63ab868 --- /dev/null +++ b/rust/examples/conformance_reader_async.rs @@ -0,0 +1,37 @@ +#[path = "common/serialization.rs"] +mod serialization; + +use serde_json::{json, Value}; + +use serialization::as_json; +use std::env; +use std::process; +use tokio::fs::File; + +use tokio; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Please supply an MCAP file as argument"); + process::exit(1); + } + let file = File::open(&args[1]).await.expect("couldn't open file"); + let mut reader = mcap::tokio::RecordReader::new(file); + + let mut json_records: Vec = vec![]; + let mut buf: Vec = Vec::new(); + while let Some(op) = reader + .next_record(&mut buf) + .await + .expect("failed to read next record") + { + if op != mcap::records::op::MESSAGE_INDEX { + let parsed = mcap::parse_record(op, &buf[..]).expect("failed to parse record"); + json_records.push(as_json(&parsed)); + } + } + let out = json!({ "records": json_records }); + print!("{}", serde_json::to_string_pretty(&out).unwrap()); +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 31303cdc7b..bdb1f389d9 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -200,5 +200,5 @@ pub struct Attachment<'a> { pub data: Cow<'a, [u8]>, } -pub use read::{read_record, MessageStream, Summary}; +pub use read::{parse_record, MessageStream, Summary}; pub use write::{WriteOptions, Writer}; diff --git a/rust/src/read.rs b/rust/src/read.rs index b98cbd28c6..94d8d53607 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -136,7 +136,7 @@ fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult(buf: &mut &'a [u8]) -> McapResult McapResult> { +pub fn parse_record(op: u8, body: &[u8]) -> McapResult> { macro_rules! record { ($b:ident) => {{ let mut cur = Cursor::new($b); diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index d44e87f34b..7a1fa25cd6 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -274,7 +274,7 @@ async fn read_chunk_header( #[cfg(test)] mod tests { - use crate::read::read_record; + use crate::read::parse_record; use std::collections::BTreeMap; use super::*; @@ -311,7 +311,7 @@ mod tests { let mut opcodes: Vec = Vec::new(); while let Some(opcode) = reader.next_record(&mut record).await? { opcodes.push(opcode); - read_record(opcode, &record)?; + parse_record(opcode, &record)?; } assert_eq!( opcodes.as_slice(), diff --git a/tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts b/tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts new file mode 100644 index 0000000000..8418d62aed --- /dev/null +++ b/tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts @@ -0,0 +1,22 @@ +import { exec } from "child_process"; +import { join } from "path"; +import { promisify } from "util"; +import { TestVariant } from "variants/types"; + +import { StreamedReadTestRunner } from "./TestRunner"; +import { StreamedReadTestResult } from "../types"; + +export default class RustAsyncReaderTestRunner extends StreamedReadTestRunner { + readonly name = "rust-async-streamed-reader"; + + async runReadTest(filePath: string): Promise { + const { stdout } = await promisify(exec)(`./conformance_reader_async ${filePath}`, { + cwd: join(__dirname, "../../../../../rust/target/debug/examples"), + }); + return JSON.parse(stdout.trim()) as StreamedReadTestResult; + } + + supportsVariant(_variant: TestVariant): boolean { + return true; + } +} diff --git a/tests/conformance/scripts/run-tests/runners/index.ts b/tests/conformance/scripts/run-tests/runners/index.ts index 3c76f02148..6af475da47 100644 --- a/tests/conformance/scripts/run-tests/runners/index.ts +++ b/tests/conformance/scripts/run-tests/runners/index.ts @@ -8,6 +8,7 @@ import KaitaiStructReaderTestRunner from "./KaitaiStructReaderTestRunner"; import PythonIndexedReaderTestRunner from "./PythonIndexedReaderTestRunner"; import PythonStreamedReaderTestRunner from "./PythonStreamedReaderTestRunner"; import PythonWriterTestRunner from "./PythonWriterTestRunner"; +import RustAsyncReaderTestRunner from "./RustAsyncReaderTestRunner"; import RustReaderTestRunner from "./RustReaderTestRunner"; import RustWriterTestRunner from "./RustWriterTestRunner"; import SwiftIndexedReaderTestRunner from "./SwiftIndexedReaderTestRunner"; @@ -31,6 +32,7 @@ const runners: readonly (IndexedReadTestRunner | StreamedReadTestRunner | WriteT new TypescriptIndexedReaderTestRunner(), new TypescriptStreamedReaderTestRunner(), new TypescriptWriterTestRunner(), + new RustAsyncReaderTestRunner(), new RustReaderTestRunner(), new RustWriterTestRunner(), new SwiftWriterTestRunner(), From 69bbdf6aaf7b9f204aebf3aa52614b2dfd8dd2fe Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 15 Aug 2024 21:07:31 +1000 Subject: [PATCH 08/25] add default exports from tokio module --- rust/src/tokio.rs | 2 ++ rust/src/tokio/read.rs | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/rust/src/tokio.rs b/rust/src/tokio.rs index ab6bab2dc7..ed0ef0597c 100644 --- a/rust/src/tokio.rs +++ b/rust/src/tokio.rs @@ -1,2 +1,4 @@ mod lz4; pub mod read; + +pub use read::{RecordReader, RecordReaderOptions}; diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index 7a1fa25cd6..01f5ce5e5f 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -59,7 +59,7 @@ where /// Reads an MCAP file record-by-record, writing the raw record data into a caller-provided Vec. pub struct RecordReader { reader: ReaderState, - options: Options, + options: RecordReaderOptions, start_magic_seen: bool, footer_seen: bool, to_discard_after_chunk: usize, @@ -67,7 +67,7 @@ pub struct RecordReader { } #[derive(Default, Clone)] -pub struct Options { +pub struct RecordReaderOptions { /// If true, the reader will not expect the MCAP magic at the start of the stream. skip_start_magic: bool, /// If true, the reader will not expect the MCAP magic at the end of the stream. @@ -92,10 +92,10 @@ where R: AsyncRead + std::marker::Unpin, { pub fn new(reader: R) -> Self { - Self::new_with_options(reader, &Options::default()) + Self::new_with_options(reader, &RecordReaderOptions::default()) } - pub fn new_with_options(reader: R, options: &Options) -> Self { + pub fn new_with_options(reader: R, options: &RecordReaderOptions) -> Self { Self { reader: ReaderState::Base(reader), options: options.clone(), From d9b3e697350a8d26a8a865d7a0e87eeddac48e00 Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 15 Aug 2024 21:24:04 +1000 Subject: [PATCH 09/25] test all features --- .github/workflows/ci.yml | 7 +++++-- rust/src/records.rs | 2 +- rust/src/tokio.rs | 1 + rust/src/tokio/lz4.rs | 6 +++--- rust/src/tokio/read.rs | 24 ++++++++++++++++++------ rust/src/write.rs | 5 +---- 6 files changed, 29 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 28f6e5acd7..e847c94ca0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -496,8 +496,11 @@ jobs: - run: cargo clippy --no-default-features -- --no-deps - run: cargo clippy --no-default-features --features lz4 -- --no-deps - run: cargo clippy --no-default-features --features zstd -- --no-deps - - run: cargo build - - run: cargo test + - run: cargo clippy --no-default-features --features tokio -- --no-deps + - run: cargo clippy --no-default-features --features tokio,lz4 -- --no-deps + - run: cargo clippy --no-default-features --features tokio,zstd -- --no-deps + - run: cargo build --all-features + - run: cargo test --all-features - name: "publish to crates.io" if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/releases/rust/v') run: cargo publish --token ${{ secrets.RUST_CRATES_IO_TOKEN }} diff --git a/rust/src/records.rs b/rust/src/records.rs index 0f373d0268..0ac5d7f72a 100644 --- a/rust/src/records.rs +++ b/rust/src/records.rs @@ -132,7 +132,7 @@ impl Record<'_> { Record::SummaryOffset(offset) => Record::SummaryOffset(offset), Record::DataEnd(end) => Record::DataEnd(end), Record::Unknown { opcode, data } => Record::Unknown { - opcode: opcode, + opcode, data: Cow::Owned(data.into_owned()), }, } diff --git a/rust/src/tokio.rs b/rust/src/tokio.rs index ed0ef0597c..25a9bb5712 100644 --- a/rust/src/tokio.rs +++ b/rust/src/tokio.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "lz4")] mod lz4; pub mod read; diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs index 1145b588a4..14b116bcbf 100644 --- a/rust/src/tokio/lz4.rs +++ b/rust/src/tokio/lz4.rs @@ -105,8 +105,8 @@ impl AsyncRead for Lz4Decoder { ptr::null(), ) })?; - mself.pos += src_size as usize; - written_len += dst_size as usize; + mself.pos += src_size; + written_len += dst_size; buf.set_filled(written_len); if len == 0 { mself.next = 0; @@ -116,7 +116,7 @@ impl AsyncRead for Lz4Decoder { } } } - return Poll::Ready(Ok(())); + Poll::Ready(Ok(())) } } diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index 01f5ce5e5f..d03ea03b36 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -1,17 +1,21 @@ use std::pin::{pin, Pin}; use std::task::{Context, Poll}; +#[cfg(feature = "zstd")] use async_compression::tokio::bufread::ZstdDecoder; use byteorder::ByteOrder; -use tokio::io::{AsyncRead, AsyncReadExt, BufReader, ReadBuf, Take}; +use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take}; +#[cfg(feature = "lz4")] use crate::tokio::lz4::Lz4Decoder; use crate::{records, McapError, McapResult, MAGIC}; enum ReaderState { Base(R), UncompressedChunk(Take), - ZstdChunk(ZstdDecoder>>), + #[cfg(feature = "zstd")] + ZstdChunk(ZstdDecoder>>), + #[cfg(feature = "lz4")] Lz4Chunk(Lz4Decoder>), Empty, } @@ -28,7 +32,9 @@ where match self.get_mut() { ReaderState::Base(r) => pin!(r).poll_read(cx, buf), ReaderState::UncompressedChunk(r) => pin!(r).poll_read(cx, buf), + #[cfg(feature = "zstd")] ReaderState::ZstdChunk(r) => pin!(r).poll_read(cx, buf), + #[cfg(feature = "lz4")] ReaderState::Lz4Chunk(r) => pin!(r).poll_read(cx, buf), ReaderState::Empty => { panic!("invariant: reader is only set to empty while swapping with another valid variant") @@ -44,7 +50,9 @@ where match self { ReaderState::Base(reader) => Ok(reader), ReaderState::UncompressedChunk(take) => Ok(take.into_inner()), + #[cfg(feature = "zstd")] ReaderState::ZstdChunk(decoder) => Ok(decoder.into_inner().into_inner().into_inner()), + #[cfg(feature = "lz4")] ReaderState::Lz4Chunk(decoder) => { let (output, result) = decoder.finish(); result?; @@ -122,11 +130,15 @@ where let mut rdr = ReaderState::Empty; std::mem::swap(&mut rdr, &mut self.reader); match header.compression.as_str() { + #[cfg(feature = "zstd")] "zstd" => { - self.reader = ReaderState::ZstdChunk(ZstdDecoder::new(BufReader::new( - rdr.into_inner()?.take(header.compressed_size), - ))); + self.reader = ReaderState::ZstdChunk(ZstdDecoder::new( + tokio::io::BufReader::new( + rdr.into_inner()?.take(header.compressed_size), + ), + )); } + #[cfg(feature = "lz4")] "lz4" => { let decoder = Lz4Decoder::new(rdr.into_inner()?.take(header.compressed_size))?; @@ -282,8 +294,8 @@ mod tests { async fn test_record_reader() -> Result<(), McapError> { for compression in [ None, - Some(crate::Compression::Lz4), Some(crate::Compression::Zstd), + Some(crate::Compression::Lz4), ] { let mut buf = std::io::Cursor::new(Vec::new()); { diff --git a/rust/src/write.rs b/rust/src/write.rs index ef83587fb7..3be18f9282 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -153,10 +153,7 @@ impl WriteOptions { /// If `None`, chunks will not be automatically closed and the user must call `flush()` to /// begin a new chunk. pub fn chunk_size(self, chunk_size: Option) -> Self { - Self { - chunk_size: chunk_size, - ..self - } + Self { chunk_size, ..self } } /// specifies whether to use chunks for storing messages. From 6b2d0af95cd162431c599af6538d78e20a33689b Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 15 Aug 2024 21:31:38 +1000 Subject: [PATCH 10/25] zstd is fully optional --- rust/Cargo.toml | 4 ++-- rust/src/tokio/read.rs | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 34f3a74344..c24282ccff 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -23,7 +23,7 @@ num_cpus = "1.13" paste = "1.0" thiserror = "1.0" lz4 = { version = "1", optional = true } -async-compression = { version = "*", features = ["tokio", "zstd"], optional = true } +async-compression = { version = "*", features = ["tokio"], optional = true } tokio = { version = "1", features = ["io-util"] , optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] @@ -34,7 +34,7 @@ zstd = { version = "0.11", features = ["zstdmt"], optional = true } [features] default = ["zstd", "lz4"] -zstd = ["dep:zstd"] +zstd = ["dep:zstd", "async-compression/zstd"] lz4 = ["dep:lz4"] tokio = ["dep:async-compression", "dep:tokio"] diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index d03ea03b36..6d2181ae36 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -294,7 +294,9 @@ mod tests { async fn test_record_reader() -> Result<(), McapError> { for compression in [ None, + #[cfg(feature = "zstd")] Some(crate::Compression::Zstd), + #[cfg(feature = "lz4")] Some(crate::Compression::Lz4), ] { let mut buf = std::io::Cursor::new(Vec::new()); From 5e726d5620deeaa6d47fe641214ed1b079980fee Mon Sep 17 00:00:00 2001 From: James Smith Date: Thu, 15 Aug 2024 21:44:04 +1000 Subject: [PATCH 11/25] transpose result of read_record --- rust/examples/conformance_reader_async.rs | 11 ++-- rust/src/tokio/read.rs | 66 +++++++++++++++-------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/rust/examples/conformance_reader_async.rs b/rust/examples/conformance_reader_async.rs index f0e63ab868..ba8dfb94a8 100644 --- a/rust/examples/conformance_reader_async.rs +++ b/rust/examples/conformance_reader_async.rs @@ -22,13 +22,10 @@ async fn main() { let mut json_records: Vec = vec![]; let mut buf: Vec = Vec::new(); - while let Some(op) = reader - .next_record(&mut buf) - .await - .expect("failed to read next record") - { - if op != mcap::records::op::MESSAGE_INDEX { - let parsed = mcap::parse_record(op, &buf[..]).expect("failed to parse record"); + while let Some(opcode) = reader.next_record(&mut buf).await { + let opcode = opcode.expect("failed to read next record"); + if opcode != mcap::records::op::MESSAGE_INDEX { + let parsed = mcap::parse_record(opcode, &buf[..]).expect("failed to parse record"); json_records.push(as_json(&parsed)); } } diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index 6d2181ae36..c460bd43c4 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -120,56 +120,77 @@ where /// Reads the next record from the input stream and copies the raw content into `data`. /// Returns the record's opcode as a result. - pub async fn next_record(&mut self, data: &mut Vec) -> McapResult> { + pub async fn next_record(&mut self, data: &mut Vec) -> Option> { loop { - let cmd = self.next_record_inner(data).await?; + let cmd = match self.next_record_inner(data).await { + Ok(cmd) => cmd, + Err(err) => return Some(Err(err)), + }; match cmd { - Cmd::Stop => return Ok(None), - Cmd::YieldRecord(opcode) => return Ok(Some(opcode)), + Cmd::Stop => return None, + Cmd::YieldRecord(opcode) => return Some(Ok(opcode)), Cmd::EnterChunk { header, len } => { - let mut rdr = ReaderState::Empty; - std::mem::swap(&mut rdr, &mut self.reader); + let mut reader_state = ReaderState::Empty; + std::mem::swap(&mut reader_state, &mut self.reader); match header.compression.as_str() { #[cfg(feature = "zstd")] "zstd" => { + let reader = match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }; self.reader = ReaderState::ZstdChunk(ZstdDecoder::new( - tokio::io::BufReader::new( - rdr.into_inner()?.take(header.compressed_size), - ), + tokio::io::BufReader::new(reader.take(header.compressed_size)), )); } #[cfg(feature = "lz4")] "lz4" => { - let decoder = - Lz4Decoder::new(rdr.into_inner()?.take(header.compressed_size))?; + let reader = match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }; + let decoder = match Lz4Decoder::new(reader.take(header.compressed_size)) + { + Ok(decoder) => decoder, + Err(err) => return Some(Err(err.into())), + }; self.reader = ReaderState::Lz4Chunk(decoder); } "" => { - self.reader = ReaderState::UncompressedChunk( - rdr.into_inner()?.take(header.compressed_size), - ); + let reader = match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }; + self.reader = + ReaderState::UncompressedChunk(reader.take(header.compressed_size)); } _ => { - std::mem::swap(&mut rdr, &mut self.reader); - return Err(McapError::UnsupportedCompression( + std::mem::swap(&mut reader_state, &mut self.reader); + return Some(Err(McapError::UnsupportedCompression( header.compression.clone(), - )); + ))); } } self.to_discard_after_chunk = len as usize - (40 + header.compression.len() + header.compressed_size as usize); } Cmd::ExitChunk => { - let mut rdr = ReaderState::Empty; - std::mem::swap(&mut rdr, &mut self.reader); - self.reader = ReaderState::Base(rdr.into_inner()?); + let mut reader_state = ReaderState::Empty; + std::mem::swap(&mut reader_state, &mut self.reader); + self.reader = ReaderState::Base(match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }); while self.to_discard_after_chunk > 0 { let to_read = if self.to_discard_after_chunk > self.scratch.len() { self.scratch.len() } else { self.to_discard_after_chunk }; - self.reader.read_exact(&mut self.scratch[..to_read]).await?; + if let Err(err) = self.reader.read_exact(&mut self.scratch[..to_read]).await + { + return Some(Err(err.into())); + } self.to_discard_after_chunk -= to_read; } } @@ -323,7 +344,8 @@ mod tests { let mut reader = RecordReader::new(std::io::Cursor::new(buf.into_inner())); let mut record = Vec::new(); let mut opcodes: Vec = Vec::new(); - while let Some(opcode) = reader.next_record(&mut record).await? { + while let Some(opcode) = reader.next_record(&mut record).await { + let opcode = opcode?; opcodes.push(opcode); parse_record(opcode, &record)?; } From fcc6a4731212cbeeaeb6e553a81027626ccb4bfc Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 16 Aug 2024 13:47:59 +1000 Subject: [PATCH 12/25] do not fail on subslices if skipping end magic --- rust/src/tokio/read.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index c460bd43c4..a0faeb169c 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -214,7 +214,13 @@ where } return Ok(Cmd::Stop); } - reader.read_exact(&mut self.scratch[..9]).await?; + let readlen = reader.read(&mut self.scratch[..9]).await?; + if readlen == 0 && self.options.skip_end_magic { + return Ok(Cmd::Stop); + } + if readlen != 9 { + return Err(McapError::UnexpectedEof); + } let opcode = self.scratch[0]; if opcode == records::op::FOOTER { self.footer_seen = true; From 93d1260419e99a29a7daae440a93a2734075052d Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 16 Aug 2024 14:05:46 +1000 Subject: [PATCH 13/25] Add benchmark(s) --- rust/Cargo.toml | 2 +- rust/benches/reader.rs | 47 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c24282ccff..cc89075c8a 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -43,7 +43,7 @@ anyhow = "1" atty = "0.2" camino = "1.0" clap = { version = "3.2", features = ["derive"]} -criterion = "0.5.1" +criterion = { version = "0.5.1", features = ["async_tokio"] } itertools = "0.10" memmap = "0.7" rayon = "1.5" diff --git a/rust/benches/reader.rs b/rust/benches/reader.rs index d234ccf6d3..80648b7447 100644 --- a/rust/benches/reader.rs +++ b/rust/benches/reader.rs @@ -1,5 +1,5 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use mcap::{Channel, Message, MessageStream, Schema}; +use mcap::{parse_record, Channel, Message, MessageStream, Schema}; use std::borrow::Cow; use std::io::Cursor; use std::sync::Arc; @@ -79,6 +79,51 @@ fn bench_read_messages(c: &mut Criterion) { } }); }); + #[cfg(feature = "tokio")] + { + use mcap::tokio::read::RecordReader; + use tokio::runtime::Builder; + + let rt = Builder::new_current_thread().build().unwrap(); + group.bench_function("AsyncMessageStream_1M_uncompressed", |b| { + b.to_async(&rt).iter(|| async { + let mut reader = RecordReader::new(Cursor::new(&mcap_data_uncompressed)); + let mut record = Vec::new(); + let mut count = 0; + while let Some(result) = reader.next_record(&mut record).await { + count += 1; + std::hint::black_box(parse_record(result.unwrap(), &record).unwrap()); + } + assert_eq!(count, N + 118); + }); + }); + + group.bench_function("AsyncMessageStream_1M_zstd", |b| { + b.to_async(&rt).iter(|| async { + let mut reader = RecordReader::new(Cursor::new(&mcap_data_zstd)); + let mut record = Vec::new(); + let mut count = 0; + while let Some(result) = reader.next_record(&mut record).await { + count += 1; + std::hint::black_box(parse_record(result.unwrap(), &record).unwrap()); + } + assert_eq!(count, N + 118); + }); + }); + + group.bench_function("AsyncMessageStream_1M_lz4", |b| { + b.to_async(&rt).iter(|| async { + let mut reader = RecordReader::new(Cursor::new(&mcap_data_lz4)); + let mut record = Vec::new(); + let mut count = 0; + while let Some(result) = reader.next_record(&mut record).await { + count += 1; + std::hint::black_box(parse_record(result.unwrap(), &record).unwrap()); + } + assert_eq!(count, N + 118); + }); + }); + } group.finish(); } From 86f362dd9581f67422abcde7f50d76146a6afed2 Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 16 Aug 2024 15:05:02 +1000 Subject: [PATCH 14/25] fix lz4 --- rust/src/tokio/lz4.rs | 3 ++- rust/src/tokio/read.rs | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs index 14b116bcbf..c6920fcec6 100644 --- a/rust/src/tokio/lz4.rs +++ b/rust/src/tokio/lz4.rs @@ -95,6 +95,7 @@ impl AsyncRead for Lz4Decoder { while (written_len < buf.remaining()) && (mself.pos < mself.len) { let mut src_size = mself.len - mself.pos; let mut dst_size = buf.remaining() - written_len; + let prev_filled = buf.filled().len(); let len = check_error(unsafe { LZ4F_decompress( mself.c.c, @@ -107,7 +108,7 @@ impl AsyncRead for Lz4Decoder { })?; mself.pos += src_size; written_len += dst_size; - buf.set_filled(written_len); + buf.set_filled(prev_filled + written_len); if len == 0 { mself.next = 0; return Poll::Ready(Ok(())); diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index a0faeb169c..928956d070 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -377,4 +377,43 @@ mod tests { } Ok(()) } + #[cfg(feature = "lz4")] + #[tokio::test] + async fn test_lz4_decompression() -> Result<(), McapError> { + let mut buf = std::io::Cursor::new(Vec::new()); + { + let mut writer = crate::WriteOptions::new() + .compression(Some(crate::Compression::Lz4)) + .create(&mut buf)?; + let channel = std::sync::Arc::new(crate::Channel { + topic: "chat".to_owned(), + schema: None, + message_encoding: "json".to_owned(), + metadata: BTreeMap::new(), + }); + let data: Vec = vec![0; 1024]; + writer.add_channel(&channel)?; + for n in 0..10000 { + { + writer.write(&crate::Message { + channel: channel.clone(), + log_time: n, + publish_time: n, + sequence: n as u32, + data: std::borrow::Cow::Owned(data.clone()), + })?; + } + } + writer.finish()?; + } + let mut reader = RecordReader::new(std::io::Cursor::new(buf.into_inner())); + let mut record = Vec::new(); + let mut opcodes: Vec = Vec::new(); + while let Some(opcode) = reader.next_record(&mut record).await { + let opcode = opcode?; + opcodes.push(opcode); + parse_record(opcode, &record)?; + } + Ok(()) + } } From 8d25bc984276c1e9b6a37261c8f462da1264c9df Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 16 Aug 2024 15:55:04 +1000 Subject: [PATCH 15/25] add read_exact_or_zero --- rust/src/tokio.rs | 2 ++ rust/src/tokio/read.rs | 28 +++++++++++++--------------- rust/src/tokio/read_exact_or_zero.rs | 26 ++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 15 deletions(-) create mode 100644 rust/src/tokio/read_exact_or_zero.rs diff --git a/rust/src/tokio.rs b/rust/src/tokio.rs index 25a9bb5712..bd65368fed 100644 --- a/rust/src/tokio.rs +++ b/rust/src/tokio.rs @@ -1,5 +1,7 @@ #[cfg(feature = "lz4")] mod lz4; pub mod read; +mod read_exact_or_zero; pub use read::{RecordReader, RecordReaderOptions}; +use read_exact_or_zero::read_exact_or_zero; diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index 928956d070..36ea90efd8 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -8,6 +8,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take}; #[cfg(feature = "lz4")] use crate::tokio::lz4::Lz4Decoder; +use crate::tokio::read_exact_or_zero; use crate::{records, McapError, McapResult, MAGIC}; enum ReaderState { @@ -187,11 +188,10 @@ where } else { self.to_discard_after_chunk }; - if let Err(err) = self.reader.read_exact(&mut self.scratch[..to_read]).await - { - return Some(Err(err.into())); - } - self.to_discard_after_chunk -= to_read; + match self.reader.read(&mut self.scratch[..to_read]).await { + Ok(n) => self.to_discard_after_chunk -= n, + Err(err) => return Some(Err(err.into())), + }; } } }; @@ -214,12 +214,13 @@ where } return Ok(Cmd::Stop); } - let readlen = reader.read(&mut self.scratch[..9]).await?; - if readlen == 0 && self.options.skip_end_magic { - return Ok(Cmd::Stop); - } - if readlen != 9 { - return Err(McapError::UnexpectedEof); + let readlen = read_exact_or_zero(reader, &mut self.scratch[..9]).await?; + if readlen == 0 { + if self.options.skip_end_magic { + return Ok(Cmd::Stop); + } else { + return Err(McapError::UnexpectedEof); + } } let opcode = self.scratch[0]; if opcode == records::op::FOOTER { @@ -237,13 +238,10 @@ where reader.read_exact(&mut data[..]).await?; Ok(Cmd::YieldRecord(opcode)) } else { - let len = self.reader.read(&mut self.scratch[..9]).await?; + let len = read_exact_or_zero(&mut self.reader, &mut self.scratch[..9]).await?; if len == 0 { return Ok(Cmd::ExitChunk); } - if len != 9 { - return Err(McapError::UnexpectedEof); - } let opcode = self.scratch[0]; let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..9]); data.resize(record_len as usize, 0); diff --git a/rust/src/tokio/read_exact_or_zero.rs b/rust/src/tokio/read_exact_or_zero.rs new file mode 100644 index 0000000000..864d458a81 --- /dev/null +++ b/rust/src/tokio/read_exact_or_zero.rs @@ -0,0 +1,26 @@ +use tokio::io::{AsyncRead, AsyncReadExt}; + +/// read from `r` into `buf` until `buf` is completely full or EOF is reached. +/// if R was already at EOF, this is not considered an error. +/// If R was not at EOF, but EOF came before the end of the buffer, this is considered an error. +/// This is useful for cases where we expect either another record full record or EOF. +pub(crate) async fn read_exact_or_zero( + r: &mut R, + buf: &mut [u8], +) -> Result { + let mut pos: usize = 0; + loop { + let readlen = r.read(&mut buf[pos..]).await?; + if readlen == 0 { + if pos != 0 { + return Err(std::io::ErrorKind::UnexpectedEof.into()); + } else { + return Ok(0); + } + } + pos += readlen; + if pos == buf.len() { + return Ok(pos); + } + } +} From 2e792a761e81ffba62c739ac7b6c0deea4526c27 Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 16 Aug 2024 15:56:25 +1000 Subject: [PATCH 16/25] remove constraint from lz4 decoder constructor fns --- rust/src/tokio/lz4.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs index c6920fcec6..82315c6f01 100644 --- a/rust/src/tokio/lz4.rs +++ b/rust/src/tokio/lz4.rs @@ -28,7 +28,7 @@ pub struct Lz4Decoder { next: usize, } -impl Lz4Decoder { +impl Lz4Decoder { /// Creates a new decoder which reads its input from the given /// input stream. The input stream can be re-acquired by calling /// `finish()` From 38434b003423550656d1cdd21793067335c7e3f8 Mon Sep 17 00:00:00 2001 From: James Smith Date: Sat, 17 Aug 2024 13:18:23 +1000 Subject: [PATCH 17/25] reword docstring --- rust/src/tokio.rs | 1 + rust/src/tokio/read.rs | 27 ++++++++++++++++++++++----- rust/src/tokio/read_exact_or_zero.rs | 9 +++++---- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/rust/src/tokio.rs b/rust/src/tokio.rs index bd65368fed..cf3d68ebd5 100644 --- a/rust/src/tokio.rs +++ b/rust/src/tokio.rs @@ -1,3 +1,4 @@ +//! Read MCAP data from a stream asynchronously #[cfg(feature = "lz4")] mod lz4; pub mod read; diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index 36ea90efd8..f71f5c518f 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -65,7 +65,24 @@ where } } } + /// Reads an MCAP file record-by-record, writing the raw record data into a caller-provided Vec. +/// ```no_run +/// use std::fs; +/// +/// use tokio::fs::File; +/// +/// async fn read_it() { +/// let file = File::open("in.mcap").await.expect("couldn't open file"); +/// let mut record_buf: Vec = Vec::new(); +/// let mut reader = mcap::tokio::RecordReader::new(file); +/// while let Some(result) = reader.next_record(&mut record_buf).await { +/// let opcode = result.expect("couldn't read next record"); +/// let raw_record = mcap::parse_record(opcode, &record_buf[..]).expect("couldn't parse"); +/// // do something with the record... +/// } +/// } +/// ``` pub struct RecordReader { reader: ReaderState, options: RecordReaderOptions, @@ -78,12 +95,12 @@ pub struct RecordReader { #[derive(Default, Clone)] pub struct RecordReaderOptions { /// If true, the reader will not expect the MCAP magic at the start of the stream. - skip_start_magic: bool, + pub skip_start_magic: bool, /// If true, the reader will not expect the MCAP magic at the end of the stream. - skip_end_magic: bool, - // If true, the reader will yield entire chunk records. Otherwise, the reader will decompress - // and read into the chunk, yielding the records inside. - emit_chunks: bool, + pub skip_end_magic: bool, + /// If true, the reader will yield entire chunk records. Otherwise, the reader will decompress + /// and read into the chunk, yielding the records inside. + pub emit_chunks: bool, } enum Cmd { diff --git a/rust/src/tokio/read_exact_or_zero.rs b/rust/src/tokio/read_exact_or_zero.rs index 864d458a81..20343f9d5f 100644 --- a/rust/src/tokio/read_exact_or_zero.rs +++ b/rust/src/tokio/read_exact_or_zero.rs @@ -1,9 +1,10 @@ use tokio::io::{AsyncRead, AsyncReadExt}; -/// read from `r` into `buf` until `buf` is completely full or EOF is reached. -/// if R was already at EOF, this is not considered an error. -/// If R was not at EOF, but EOF came before the end of the buffer, this is considered an error. -/// This is useful for cases where we expect either another record full record or EOF. +/// read up to buf.len() bytes from `r` into `buf`. This repeatedly calls read() on `r` until +/// either the buffer is full or EOF is reached. If either 0 or buf.len() bytes were read before +/// EOF, Ok(n) is returned. If EOF is reached after 0 bytes but before buf.len(), Err(UnexpectedEOF) +/// is returned. +/// This is useful for cases where we expect either to read either a whole MCAP record or EOF. pub(crate) async fn read_exact_or_zero( r: &mut R, buf: &mut [u8], From 1056c7cf24b72f11eb5a2b60fbb882ad2d0a1eb4 Mon Sep 17 00:00:00 2001 From: James Smith Date: Tue, 20 Aug 2024 14:52:44 +1000 Subject: [PATCH 18/25] remove benches (until we have a messagestream impl) --- rust/benches/reader.rs | 47 +----------------------------------------- 1 file changed, 1 insertion(+), 46 deletions(-) diff --git a/rust/benches/reader.rs b/rust/benches/reader.rs index 80648b7447..d234ccf6d3 100644 --- a/rust/benches/reader.rs +++ b/rust/benches/reader.rs @@ -1,5 +1,5 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use mcap::{parse_record, Channel, Message, MessageStream, Schema}; +use mcap::{Channel, Message, MessageStream, Schema}; use std::borrow::Cow; use std::io::Cursor; use std::sync::Arc; @@ -79,51 +79,6 @@ fn bench_read_messages(c: &mut Criterion) { } }); }); - #[cfg(feature = "tokio")] - { - use mcap::tokio::read::RecordReader; - use tokio::runtime::Builder; - - let rt = Builder::new_current_thread().build().unwrap(); - group.bench_function("AsyncMessageStream_1M_uncompressed", |b| { - b.to_async(&rt).iter(|| async { - let mut reader = RecordReader::new(Cursor::new(&mcap_data_uncompressed)); - let mut record = Vec::new(); - let mut count = 0; - while let Some(result) = reader.next_record(&mut record).await { - count += 1; - std::hint::black_box(parse_record(result.unwrap(), &record).unwrap()); - } - assert_eq!(count, N + 118); - }); - }); - - group.bench_function("AsyncMessageStream_1M_zstd", |b| { - b.to_async(&rt).iter(|| async { - let mut reader = RecordReader::new(Cursor::new(&mcap_data_zstd)); - let mut record = Vec::new(); - let mut count = 0; - while let Some(result) = reader.next_record(&mut record).await { - count += 1; - std::hint::black_box(parse_record(result.unwrap(), &record).unwrap()); - } - assert_eq!(count, N + 118); - }); - }); - - group.bench_function("AsyncMessageStream_1M_lz4", |b| { - b.to_async(&rt).iter(|| async { - let mut reader = RecordReader::new(Cursor::new(&mcap_data_lz4)); - let mut record = Vec::new(); - let mut count = 0; - while let Some(result) = reader.next_record(&mut record).await { - count += 1; - std::hint::black_box(parse_record(result.unwrap(), &record).unwrap()); - } - assert_eq!(count, N + 118); - }); - }); - } group.finish(); } From 31877c666deca1b6bf56c84e81604614cb6881a6 Mon Sep 17 00:00:00 2001 From: James Smith Date: Tue, 20 Aug 2024 15:32:43 +1000 Subject: [PATCH 19/25] test read_exact_or_zero --- rust/src/tokio/lz4.rs | 4 +- rust/src/tokio/read_exact_or_zero.rs | 78 +++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs index 82315c6f01..3203a424e6 100644 --- a/rust/src/tokio/lz4.rs +++ b/rust/src/tokio/lz4.rs @@ -16,8 +16,8 @@ struct DecoderContext { c: LZ4FDecompressionContext, } -// An equivalent of the lz4::Decoder `std::io::Read` wrapper for `tokio::io::AsyncRead`. -// Code below is adapted from the https://github.com/bozaro/lz4-rs crate. +// An adaptation of the [`lz4::Decoder`] [`std::io::Read`] impl, but for [`tokio::io::AsyncRead`]. +// Code below is adapted from the [lz4](https://github.com/bozaro/lz4-rs) crate source. #[derive(Debug)] pub struct Lz4Decoder { c: DecoderContext, diff --git a/rust/src/tokio/read_exact_or_zero.rs b/rust/src/tokio/read_exact_or_zero.rs index 20343f9d5f..b3e0708e0e 100644 --- a/rust/src/tokio/read_exact_or_zero.rs +++ b/rust/src/tokio/read_exact_or_zero.rs @@ -1,6 +1,6 @@ use tokio::io::{AsyncRead, AsyncReadExt}; -/// read up to buf.len() bytes from `r` into `buf`. This repeatedly calls read() on `r` until +/// read up to `buf.len()` bytes from `r` into `buf`. This repeatedly calls read() on `r` until /// either the buffer is full or EOF is reached. If either 0 or buf.len() bytes were read before /// EOF, Ok(n) is returned. If EOF is reached after 0 bytes but before buf.len(), Err(UnexpectedEOF) /// is returned. @@ -25,3 +25,79 @@ pub(crate) async fn read_exact_or_zero( } } } + +#[cfg(test)] +mod tests { + + use super::*; + use std::cmp::min; + + struct ZeroReader { + remaining: usize, + max_read_len: usize, + } + + impl AsyncRead for ZeroReader { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let max_read_len = self.as_ref().max_read_len; + let remaining = self.as_ref().remaining; + if remaining == 0 { + return std::task::Poll::Ready(Ok(())); + } + let to_fill = min(min(remaining, buf.remaining()), max_read_len); + buf.initialize_unfilled_to(to_fill).fill(0); + buf.set_filled(to_fill); + self.as_mut().remaining -= to_fill; + return std::task::Poll::Ready(Ok(())); + } + } + #[tokio::test] + async fn test_full_read_is_not_error() { + let mut r = ZeroReader { + remaining: 10, + max_read_len: 10, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert_eq!(result.ok(), Some(10)); + assert_eq!(&buf[..], &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + } + + #[tokio::test] + async fn test_eof_is_not_error() { + let mut r = ZeroReader { + remaining: 0, + max_read_len: 10, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert_eq!(result.ok(), Some(0)); + assert_eq!(&buf[..], &[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + } + #[tokio::test] + async fn test_repeated_read_calls() { + let mut r = ZeroReader { + remaining: 10, + max_read_len: 1, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert_eq!(result.ok(), Some(10)); + assert_eq!(&buf[..], &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + } + #[tokio::test] + async fn test_partial_read_is_error() { + let mut r = ZeroReader { + remaining: 4, + max_read_len: 2, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert!(!result.is_ok()); + assert_eq!(&buf[..], &[0, 0, 0, 0, 1, 1, 1, 1, 1, 1]); + } +} From cc53dbd9b9c08be0a2e7c35077f20861c64e5db7 Mon Sep 17 00:00:00 2001 From: James Smith Date: Wed, 11 Sep 2024 15:01:41 +1000 Subject: [PATCH 20/25] lz4: clarify and comment on async reader --- rust/src/tokio/lz4.rs | 82 ++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs index 3203a424e6..05a0f65a9e 100644 --- a/rust/src/tokio/lz4.rs +++ b/rust/src/tokio/lz4.rs @@ -17,14 +17,14 @@ struct DecoderContext { } // An adaptation of the [`lz4::Decoder`] [`std::io::Read`] impl, but for [`tokio::io::AsyncRead`]. -// Code below is adapted from the [lz4](https://github.com/bozaro/lz4-rs) crate source. +// Code below is adapted from the [lz4](https://github.com/10XGenomics/lz4-rs) crate source. #[derive(Debug)] pub struct Lz4Decoder { c: DecoderContext, r: R, - buf: Box<[u8]>, - pos: usize, - len: usize, + input_buf: Box<[u8]>, + unread_input_start: usize, + unread_input_end: usize, next: usize, } @@ -36,9 +36,9 @@ impl Lz4Decoder { Ok(Lz4Decoder { r, c: DecoderContext::new()?, - buf: vec![0; BUFFER_SIZE].into_boxed_slice(), - pos: BUFFER_SIZE, - len: BUFFER_SIZE, + input_buf: vec![0; BUFFER_SIZE].into_boxed_slice(), + unread_input_start: BUFFER_SIZE, + unread_input_end: BUFFER_SIZE, // Minimal LZ4 stream size next: 11, }) @@ -51,7 +51,7 @@ impl Lz4Decoder { 0 => Ok(()), _ => Err(Error::new( ErrorKind::Interrupted, - "Finish runned before read end of compressed stream", + "Finish called before end of compressed stream", )), }, ) @@ -62,58 +62,68 @@ impl AsyncRead for Lz4Decoder { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, + output_buf: &mut ReadBuf<'_>, ) -> Poll> { - if self.next == 0 || buf.remaining() == 0 { + // Thre's nothing left to read. + if self.next == 0 || output_buf.remaining() == 0 { return Poll::Ready(Ok(())); } let mut written_len: usize = 0; - let mself = self.get_mut(); + let this = self.get_mut(); while written_len == 0 { - if mself.pos >= mself.len { - let need = if mself.buf.len() < mself.next { - mself.buf.len() - } else { - mself.next - }; + // this reader buffers input data until it has enough to present to the lz4 frame decoder. + // if there's nothing unread, request more data from the reader. + if this.unread_input_start >= this.unread_input_end { + // request a full BUFFER_SIZE or the amount requested by the lz4 frame decoder, + // whichever is less. + let need = std::cmp::min(BUFFER_SIZE, this.next); + // try reading more input data. If it's not ready, return and try again later. + // NOTE: we don't need to save this stack frame as a future and re-enter it later + // because the only frame-local state `written_len` has not been modified and can be + // discarded. { - let mut comp_buf = ReadBuf::new(&mut mself.buf[..need]); - let result = pin!(&mut mself.r).poll_read(cx, &mut comp_buf); + let mut input_buf = ReadBuf::new(&mut this.input_buf[..need]); + let result = pin!(&mut this.r).poll_read(cx, &mut input_buf); match result { Poll::Pending => return result, Poll::Ready(Err(_)) => return result, _ => {} }; - mself.len = comp_buf.filled().len(); + this.unread_input_start = 0; + this.unread_input_end = input_buf.filled().len(); + this.next -= this.unread_input_end; } - if mself.len == 0 { - break; + // The read succeeded. If zero bytes were read, we're at the end of the stream. + if this.unread_input_end == 0 { + return Poll::Ready(Ok(())); } - mself.pos = 0; - mself.next -= mself.len; } - while (written_len < buf.remaining()) && (mself.pos < mself.len) { - let mut src_size = mself.len - mself.pos; - let mut dst_size = buf.remaining() - written_len; - let prev_filled = buf.filled().len(); + // feed bytes from our input buffer into the compressor, writing into the output + // buffer until either the output buffer is full or the input buffer is consumed. + while (written_len < output_buf.remaining()) + && (this.unread_input_start < this.unread_input_end) + { + let mut src_size = this.unread_input_end - this.unread_input_start; + let mut dst_size = output_buf.remaining() - written_len; + let prev_filled = output_buf.filled().len(); let len = check_error(unsafe { LZ4F_decompress( - mself.c.c, - buf.initialize_unfilled().as_mut_ptr(), + this.c.c, + output_buf.initialize_unfilled().as_mut_ptr(), &mut dst_size, - mself.buf[mself.pos..].as_ptr(), + this.input_buf[this.unread_input_start..].as_ptr(), &mut src_size, ptr::null(), ) })?; - mself.pos += src_size; + this.unread_input_start += src_size; written_len += dst_size; - buf.set_filled(prev_filled + written_len); + output_buf.set_filled(prev_filled + written_len); if len == 0 { - mself.next = 0; + this.next = 0; return Poll::Ready(Ok(())); - } else if mself.next < len { - mself.next = len; + } else if this.next < len { + this.next = len; } } } From c9a4179d4452598527eda069d1d68a2c330b5191 Mon Sep 17 00:00:00 2001 From: James Smith Date: Wed, 11 Sep 2024 15:10:24 +1000 Subject: [PATCH 21/25] tokio/read.rs: do not save opcodes in test --- rust/src/tokio/read.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index f71f5c518f..b06cea9bb8 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -423,11 +423,8 @@ mod tests { } let mut reader = RecordReader::new(std::io::Cursor::new(buf.into_inner())); let mut record = Vec::new(); - let mut opcodes: Vec = Vec::new(); while let Some(opcode) = reader.next_record(&mut record).await { - let opcode = opcode?; - opcodes.push(opcode); - parse_record(opcode, &record)?; + parse_record(opcode?, &record)?; } Ok(()) } From 986022f48842954514eb9c7a909f691e3c4a07f3 Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 13 Sep 2024 21:58:18 +1000 Subject: [PATCH 22/25] update to lz4 v1.27 --- rust/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index cc89075c8a..4e5d980aa3 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -22,7 +22,7 @@ log = "0.4" num_cpus = "1.13" paste = "1.0" thiserror = "1.0" -lz4 = { version = "1", optional = true } +lz4 = { version = "1.27", optional = true } async-compression = { version = "*", features = ["tokio"], optional = true } tokio = { version = "1", features = ["io-util"] , optional = true } From 6854c862228c84acd2a96d7ac14165ae7f0908ed Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 13 Sep 2024 22:00:24 +1000 Subject: [PATCH 23/25] check builds for wasm32 --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e847c94ca0..e4c9ae282c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -491,6 +491,7 @@ jobs: toolchain: stable default: true components: "rustfmt, clippy" + - run: rustup target add wasm32-unknown-unknown - run: cargo fmt --all -- --check - run: cargo clippy -- --no-deps - run: cargo clippy --no-default-features -- --no-deps @@ -501,6 +502,8 @@ jobs: - run: cargo clippy --no-default-features --features tokio,zstd -- --no-deps - run: cargo build --all-features - run: cargo test --all-features + - run: cargo build --all-features --target wasm32-unknown-unknown + - run: cargo check --all-features --target wasm32-unknown-unknown - name: "publish to crates.io" if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/releases/rust/v') run: cargo publish --token ${{ secrets.RUST_CRATES_IO_TOKEN }} From c5b48e233eafd26e0f6c4cbad5b62d8dc3fbc217 Mon Sep 17 00:00:00 2001 From: James Smith Date: Sun, 15 Sep 2024 21:06:00 +1000 Subject: [PATCH 24/25] add a non-full read test --- rust/src/tokio/read_exact_or_zero.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/tokio/read_exact_or_zero.rs b/rust/src/tokio/read_exact_or_zero.rs index b3e0708e0e..cc4eb902d5 100644 --- a/rust/src/tokio/read_exact_or_zero.rs +++ b/rust/src/tokio/read_exact_or_zero.rs @@ -82,7 +82,7 @@ mod tests { async fn test_repeated_read_calls() { let mut r = ZeroReader { remaining: 10, - max_read_len: 1, + max_read_len: 4, }; let mut buf: Vec = vec![1; 10]; let result = read_exact_or_zero(&mut r, &mut buf).await; From 77d542bc7d48817d94dd9f929db2f0537ac22b66 Mon Sep 17 00:00:00 2001 From: James Smith Date: Sun, 15 Sep 2024 21:26:01 +1000 Subject: [PATCH 25/25] remove byteorder dep --- rust/Cargo.toml | 4 ++++ rust/src/tokio/read.rs | 21 ++++++++++++--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 4e5d980aa3..0cd54d80ed 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -61,3 +61,7 @@ harness = false opt-level = 3 debug = true lto = true + +[[example]] +name = "conformance_reader_async" +required-features = ["tokio"] diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs index b06cea9bb8..50a1384250 100644 --- a/rust/src/tokio/read.rs +++ b/rust/src/tokio/read.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; #[cfg(feature = "zstd")] use async_compression::tokio::bufread::ZstdDecoder; -use byteorder::ByteOrder; +use binrw::BinReaderExt; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take}; #[cfg(feature = "lz4")] @@ -243,7 +243,7 @@ where if opcode == records::op::FOOTER { self.footer_seen = true; } - let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..9]); + let record_len = u64::from_le_bytes(self.scratch[1..9].try_into().unwrap()); if opcode == records::op::CHUNK && !self.options.emit_chunks { let header = read_chunk_header(reader, data, record_len).await?; return Ok(Cmd::EnterChunk { @@ -260,7 +260,7 @@ where return Ok(Cmd::ExitChunk); } let opcode = self.scratch[0]; - let record_len = byteorder::LittleEndian::read_u64(&self.scratch[1..9]); + let record_len = u64::from_le_bytes(self.scratch[1..9].try_into().unwrap()); data.resize(record_len as usize, 0); self.reader.read_exact(&mut data[..]).await?; Ok(Cmd::YieldRecord(opcode)) @@ -290,11 +290,14 @@ async fn read_chunk_header( } scratch.resize(32, 0); reader.read_exact(&mut scratch[..]).await?; - header.message_start_time = byteorder::LittleEndian::read_u64(&scratch[0..8]); - header.message_end_time = byteorder::LittleEndian::read_u64(&scratch[8..16]); - header.uncompressed_size = byteorder::LittleEndian::read_u64(&scratch[16..24]); - header.uncompressed_crc = byteorder::LittleEndian::read_u32(&scratch[24..28]); - let compression_len = byteorder::LittleEndian::read_u32(&scratch[28..32]); + let compression_len: u32 = { + let mut cursor = std::io::Cursor::new(&scratch); + header.message_start_time = cursor.read_le()?; + header.message_end_time = cursor.read_le()?; + header.uncompressed_size = cursor.read_le()?; + header.uncompressed_crc = cursor.read_le()?; + cursor.read_le()? + }; scratch.resize(compression_len as usize, 0); if record_len < (40 + compression_len) as u64 { return Err(McapError::RecordTooShort { @@ -315,7 +318,7 @@ async fn read_chunk_header( }; scratch.resize(8, 0); reader.read_exact(&mut scratch[..]).await?; - header.compressed_size = byteorder::LittleEndian::read_u64(&scratch[..]); + header.compressed_size = u64::from_le_bytes(scratch[..].try_into().unwrap()); let available = record_len - (32 + compression_len as u64 + 8); if available < header.compressed_size { return Err(McapError::BadChunkLength {