From c67c6328eff14193c21762466912ef890255a54a Mon Sep 17 00:00:00 2001 From: james-rms Date: Sun, 15 Sep 2024 21:35:22 +1000 Subject: [PATCH] rust: add async reading functionality (#1211) ### Changelog - rust: switches the LZ4 compression dependency from `lz4_flex` to `lz4-rs`. This moves us from using a pure-rust lz4 implementation to C bindings. I believe this is worthwhile because `lz4_flex` does not support LZ4 "high compression mode". The practical reason for doing so in this PR is that `lz4_flex` does not expose interfaces that make it easy to build an AsyncRead adapter for it, but `lz4-rs` does. - rust: Adds structs to read MCAP data asynchronously in a linear stream. ### Docs - Check generated rust docs for review. ### Description Adds an async `RecordReader`implementation, for reading MCAP data asynchronously. This is an optional feature, named `tokio`. I chose this feature flag name and this module name because this functionality is tied heavily into the Tokio ecosystem. If at some point we rebuild this to be async-executor-agnostic, we can add that functionality under a new module and feature flag name.
BeforeAfter
--- .github/workflows/ci.yml | 13 +- rust/Cargo.toml | 18 +- rust/examples/common/serialization.rs | 133 ++++++ rust/examples/conformance_reader.rs | 137 +----- rust/examples/conformance_reader_async.rs | 34 ++ rust/src/lib.rs | 6 +- rust/src/read.rs | 9 +- rust/src/records.rs | 38 ++ rust/src/tokio.rs | 8 + rust/src/tokio/lz4.rs | 146 ++++++ rust/src/tokio/read.rs | 434 ++++++++++++++++++ rust/src/tokio/read_exact_or_zero.rs | 103 +++++ rust/src/write.rs | 15 +- rust/tests/attachment.rs | 3 +- rust/tests/metadata.rs | 3 +- .../runners/RustAsyncReaderTestRunner.ts | 22 + .../scripts/run-tests/runners/index.ts | 2 + 17 files changed, 971 insertions(+), 153 deletions(-) create mode 100644 rust/examples/common/serialization.rs create mode 100644 rust/examples/conformance_reader_async.rs create mode 100644 rust/src/tokio.rs create mode 100644 rust/src/tokio/lz4.rs create mode 100644 rust/src/tokio/read.rs create mode 100644 rust/src/tokio/read_exact_or_zero.rs create mode 100644 tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e7e2083df1..e4c9ae282c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -160,7 +160,8 @@ jobs: with: toolchain: stable default: true - - run: cd rust && cargo build --example=conformance_reader + - run: cargo build --example=conformance_reader --example=conformance_reader_async --features=tokio + working-directory: rust - run: yarn install --immutable - run: yarn test:conformance:generate-inputs --verify - run: yarn test:conformance --runner rust- @@ -490,13 +491,19 @@ jobs: toolchain: stable default: true components: "rustfmt, clippy" + - run: rustup target add wasm32-unknown-unknown - run: cargo fmt --all -- --check - run: cargo clippy -- --no-deps - run: cargo clippy --no-default-features -- --no-deps - run: cargo clippy --no-default-features --features lz4 -- --no-deps - run: cargo clippy --no-default-features --features zstd -- --no-deps - - run: cargo build - - run: cargo test + - run: cargo clippy --no-default-features --features tokio -- --no-deps + - run: cargo clippy --no-default-features --features tokio,lz4 -- --no-deps + - run: cargo clippy --no-default-features --features tokio,zstd -- --no-deps + - run: cargo build --all-features + - run: cargo test --all-features + - run: cargo build --all-features --target wasm32-unknown-unknown + - run: cargo check --all-features --target wasm32-unknown-unknown - name: "publish to crates.io" if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/releases/rust/v') run: cargo publish --token ${{ secrets.RUST_CRATES_IO_TOKEN }} diff --git a/rust/Cargo.toml b/rust/Cargo.toml index f2fa8ca0d2..0cd54d80ed 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ] repository = "https://github.com/foxglove/mcap" documentation = "https://docs.rs/mcap" readme = "README.md" -version = "0.9.2" +version = "0.10.0" edition = "2021" license = "MIT" @@ -22,7 +22,9 @@ log = "0.4" num_cpus = "1.13" paste = "1.0" thiserror = "1.0" -lz4_flex = { version = "0.11.1", optional = true } +lz4 = { version = "1.27", optional = true } +async-compression = { version = "*", features = ["tokio"], optional = true } +tokio = { version = "1", features = ["io-util"] , optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] zstd = { version = "0.11", features = ["wasm"], optional = true } @@ -32,15 +34,16 @@ zstd = { version = "0.11", features = ["zstdmt"], optional = true } [features] default = ["zstd", "lz4"] -zstd = ["dep:zstd"] -lz4 = ["dep:lz4_flex"] +zstd = ["dep:zstd", "async-compression/zstd"] +lz4 = ["dep:lz4"] +tokio = ["dep:async-compression", "dep:tokio"] [dev-dependencies] anyhow = "1" atty = "0.2" camino = "1.0" clap = { version = "3.2", features = ["derive"]} -criterion = "0.5.1" +criterion = { version = "0.5.1", features = ["async_tokio"] } itertools = "0.10" memmap = "0.7" rayon = "1.5" @@ -48,6 +51,7 @@ serde = { version = "1.0.145", features = ["derive"] } serde_json = "1" simplelog = "0.12" tempfile = "3.3" +tokio = { version = "1", features = ["io-util", "macros", "rt", "fs"] } [[bench]] name = "reader" @@ -57,3 +61,7 @@ harness = false opt-level = 3 debug = true lto = true + +[[example]] +name = "conformance_reader_async" +required-features = ["tokio"] diff --git a/rust/examples/common/serialization.rs b/rust/examples/common/serialization.rs new file mode 100644 index 0000000000..73c1c7f9e4 --- /dev/null +++ b/rust/examples/common/serialization.rs @@ -0,0 +1,133 @@ +use mcap::records::Record; + +use std::collections::BTreeMap; + +use serde_json::{json, Value}; + +// We don't want to force Serde on users just for the sake of the conformance tests. +// (In what context would you want to serialize individual records of a MCAP?) +// Stamp out and stringify them ourselves: + +fn get_type(rec: &Record<'_>) -> &'static str { + match rec { + Record::Header(_) => "Header", + Record::Footer(_) => "Footer", + Record::Schema { .. } => "Schema", + Record::Channel(_) => "Channel", + Record::Message { .. } => "Message", + Record::Chunk { .. } => "Chunk", + Record::MessageIndex(_) => "MessageIndex", + Record::ChunkIndex(_) => "ChunkIndex", + Record::Attachment { .. } => "Attachment", + Record::AttachmentIndex(_) => "AttachmentIndex", + Record::Statistics(_) => "Statistics", + Record::Metadata(_) => "Metadata", + Record::MetadataIndex(_) => "MetadataIndex", + Record::SummaryOffset(_) => "SummaryOffset", + Record::DataEnd(_) => "DataEnd", + Record::Unknown { opcode, .. } => { + panic!("Unknown record in conformance test: (op {opcode})") + } + } +} + +fn get_fields(rec: &Record<'_>) -> Value { + fn b2s(bytes: &[u8]) -> Vec { + bytes.iter().map(|b| b.to_string()).collect() + } + fn m2s(map: &BTreeMap) -> BTreeMap { + map.iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + match rec { + Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]), + Record::Footer(f) => json!([ + ["summary_crc", f.summary_crc.to_string()], + ["summary_offset_start", f.summary_offset_start.to_string()], + ["summary_start", f.summary_start.to_string()] + ]), + Record::Schema { header, data } => json!([ + ["data", b2s(data)], + ["encoding", header.encoding], + ["id", header.id.to_string()], + ["name", header.name] + ]), + Record::Channel(c) => json!([ + ["id", c.id.to_string()], + ["message_encoding", c.message_encoding], + ["metadata", c.metadata], + ["schema_id", c.schema_id.to_string()], + ["topic", c.topic] + ]), + Record::Message { header, data } => json!([ + ["channel_id", header.channel_id.to_string()], + ["data", b2s(data)], + ["log_time", header.log_time.to_string()], + ["publish_time", header.publish_time.to_string()], + ["sequence", header.sequence.to_string()] + ]), + Record::Chunk { .. } => unreachable!("Chunks are flattened"), + Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"), + Record::ChunkIndex(i) => json!([ + ["chunk_length", i.chunk_length.to_string()], + ["chunk_start_offset", i.chunk_start_offset.to_string()], + ["compressed_size", i.compressed_size.to_string()], + ["compression", i.compression], + ["message_end_time", i.message_end_time.to_string()], + ["message_index_length", i.message_index_length.to_string()], + ["message_index_offsets", m2s(&i.message_index_offsets)], + ["message_start_time", i.message_start_time.to_string()], + ["uncompressed_size", i.uncompressed_size.to_string()] + ]), + Record::Attachment { header, data } => json!([ + ["create_time", header.create_time.to_string()], + ["data", b2s(data)], + ["log_time", header.log_time.to_string()], + ["media_type", header.media_type], + ["name", header.name] + ]), + Record::AttachmentIndex(i) => json!([ + ["create_time", i.create_time.to_string()], + ["data_size", i.data_size.to_string()], + ["length", i.length.to_string()], + ["log_time", i.log_time.to_string()], + ["media_type", i.media_type], + ["name", i.name], + ["offset", i.offset.to_string()] + ]), + Record::Statistics(s) => json!([ + ["attachment_count", s.attachment_count.to_string()], + ["channel_count", s.channel_count.to_string()], + ["channel_message_counts", m2s(&s.channel_message_counts)], + ["chunk_count", s.chunk_count.to_string()], + ["message_count", s.message_count.to_string()], + ["message_end_time", s.message_end_time.to_string()], + ["message_start_time", s.message_start_time.to_string()], + ["metadata_count", s.metadata_count.to_string()], + ["schema_count", s.schema_count.to_string()] + ]), + Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]), + Record::MetadataIndex(i) => json!([ + ["length", i.length.to_string()], + ["name", i.name], + ["offset", i.offset.to_string()] + ]), + Record::SummaryOffset(s) => json!([ + ["group_length", s.group_length.to_string()], + ["group_opcode", s.group_opcode.to_string()], + ["group_start", s.group_start.to_string()] + ]), + Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]), + Record::Unknown { opcode, .. } => { + panic!("Unknown record in conformance test: (op {opcode})") + } + } +} + +pub fn as_json(view: &Record<'_>) -> Value { + let typename = get_type(view); + let fields = get_fields(view); + json!({"type": typename, "fields": fields}) +} diff --git a/rust/examples/conformance_reader.rs b/rust/examples/conformance_reader.rs index 941ee9386e..3bfa97ae19 100644 --- a/rust/examples/conformance_reader.rs +++ b/rust/examples/conformance_reader.rs @@ -1,136 +1,11 @@ -use mcap::records::Record; - -use std::{collections::BTreeMap, env, process}; +#[path = "common/serialization.rs"] +mod serialization; use serde_json::{json, Value}; -// We don't want to force Serde on users just for the sake of the conformance tests. -// (In what context would you want to serialize individual records of a MCAP?) -// Stamp out and stringify them ourselves: - -fn get_type(rec: &Record<'_>) -> &'static str { - match rec { - Record::Header(_) => "Header", - Record::Footer(_) => "Footer", - Record::Schema { .. } => "Schema", - Record::Channel(_) => "Channel", - Record::Message { .. } => "Message", - Record::Chunk { .. } => "Chunk", - Record::MessageIndex(_) => "MessageIndex", - Record::ChunkIndex(_) => "ChunkIndex", - Record::Attachment { .. } => "Attachment", - Record::AttachmentIndex(_) => "AttachmentIndex", - Record::Statistics(_) => "Statistics", - Record::Metadata(_) => "Metadata", - Record::MetadataIndex(_) => "MetadataIndex", - Record::SummaryOffset(_) => "SummaryOffset", - Record::DataEnd(_) => "DataEnd", - Record::Unknown { opcode, .. } => { - panic!("Unknown record in conformance test: (op {opcode})") - } - } -} - -fn get_fields(rec: &Record<'_>) -> Value { - fn b2s(bytes: &[u8]) -> Vec { - bytes.iter().map(|b| b.to_string()).collect() - } - fn m2s(map: &BTreeMap) -> BTreeMap { - map.iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - match rec { - Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]), - Record::Footer(f) => json!([ - ["summary_crc", f.summary_crc.to_string()], - ["summary_offset_start", f.summary_offset_start.to_string()], - ["summary_start", f.summary_start.to_string()] - ]), - Record::Schema { header, data } => json!([ - ["data", b2s(data)], - ["encoding", header.encoding], - ["id", header.id.to_string()], - ["name", header.name] - ]), - Record::Channel(c) => json!([ - ["id", c.id.to_string()], - ["message_encoding", c.message_encoding], - ["metadata", c.metadata], - ["schema_id", c.schema_id.to_string()], - ["topic", c.topic] - ]), - Record::Message { header, data } => json!([ - ["channel_id", header.channel_id.to_string()], - ["data", b2s(data)], - ["log_time", header.log_time.to_string()], - ["publish_time", header.publish_time.to_string()], - ["sequence", header.sequence.to_string()] - ]), - Record::Chunk { .. } => unreachable!("Chunks are flattened"), - Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"), - Record::ChunkIndex(i) => json!([ - ["chunk_length", i.chunk_length.to_string()], - ["chunk_start_offset", i.chunk_start_offset.to_string()], - ["compressed_size", i.compressed_size.to_string()], - ["compression", i.compression], - ["message_end_time", i.message_end_time.to_string()], - ["message_index_length", i.message_index_length.to_string()], - ["message_index_offsets", m2s(&i.message_index_offsets)], - ["message_start_time", i.message_start_time.to_string()], - ["uncompressed_size", i.uncompressed_size.to_string()] - ]), - Record::Attachment { header, data } => json!([ - ["create_time", header.create_time.to_string()], - ["data", b2s(data)], - ["log_time", header.log_time.to_string()], - ["media_type", header.media_type], - ["name", header.name] - ]), - Record::AttachmentIndex(i) => json!([ - ["create_time", i.create_time.to_string()], - ["data_size", i.data_size.to_string()], - ["length", i.length.to_string()], - ["log_time", i.log_time.to_string()], - ["media_type", i.media_type], - ["name", i.name], - ["offset", i.offset.to_string()] - ]), - Record::Statistics(s) => json!([ - ["attachment_count", s.attachment_count.to_string()], - ["channel_count", s.channel_count.to_string()], - ["channel_message_counts", m2s(&s.channel_message_counts)], - ["chunk_count", s.chunk_count.to_string()], - ["message_count", s.message_count.to_string()], - ["message_end_time", s.message_end_time.to_string()], - ["message_start_time", s.message_start_time.to_string()], - ["metadata_count", s.metadata_count.to_string()], - ["schema_count", s.schema_count.to_string()] - ]), - Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]), - Record::MetadataIndex(i) => json!([ - ["length", i.length.to_string()], - ["name", i.name], - ["offset", i.offset.to_string()] - ]), - Record::SummaryOffset(s) => json!([ - ["group_length", s.group_length.to_string()], - ["group_opcode", s.group_opcode.to_string()], - ["group_start", s.group_start.to_string()] - ]), - Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]), - Record::Unknown { opcode, .. } => { - panic!("Unknown record in conformance test: (op {opcode})") - } - } -} - -fn as_json(view: &Record<'_>) -> Value { - let typename = get_type(view); - let fields = get_fields(view); - json!({"type": typename, "fields": fields}) -} +use mcap::records::Record; +use std::env; +use std::process; pub fn main() { let args: Vec = env::args().collect(); @@ -143,7 +18,7 @@ pub fn main() { for rec in mcap::read::ChunkFlattener::new(&file).expect("Couldn't read file") { let r = rec.expect("failed to read next record"); if !matches!(r, Record::MessageIndex(_)) { - json_records.push(as_json(&r)); + json_records.push(serialization::as_json(&r)); } } let out = json!({ "records": json_records }); diff --git a/rust/examples/conformance_reader_async.rs b/rust/examples/conformance_reader_async.rs new file mode 100644 index 0000000000..ba8dfb94a8 --- /dev/null +++ b/rust/examples/conformance_reader_async.rs @@ -0,0 +1,34 @@ +#[path = "common/serialization.rs"] +mod serialization; + +use serde_json::{json, Value}; + +use serialization::as_json; +use std::env; +use std::process; +use tokio::fs::File; + +use tokio; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Please supply an MCAP file as argument"); + process::exit(1); + } + let file = File::open(&args[1]).await.expect("couldn't open file"); + let mut reader = mcap::tokio::RecordReader::new(file); + + let mut json_records: Vec = vec![]; + let mut buf: Vec = Vec::new(); + while let Some(opcode) = reader.next_record(&mut buf).await { + let opcode = opcode.expect("failed to read next record"); + if opcode != mcap::records::op::MESSAGE_INDEX { + let parsed = mcap::parse_record(opcode, &buf[..]).expect("failed to parse record"); + json_records.push(as_json(&parsed)); + } + } + let out = json!({ "records": json_records }); + print!("{}", serde_json::to_string_pretty(&out).unwrap()); +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 9b2eb1b1c7..bdb1f389d9 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -75,6 +75,8 @@ pub mod read; pub mod records; +#[cfg(feature = "tokio")] +pub mod tokio; pub mod write; mod io_utils; @@ -119,6 +121,8 @@ pub enum McapError { UnexpectedEof, #[error("Chunk ended in the middle of a record")] UnexpectedEoc, + #[error("Record with opcode {opcode:02X} has length {len}, need at least {expected} to parse")] + RecordTooShort { opcode: u8, len: u64, expected: u64 }, #[error("Message {0} referenced unknown channel {1}")] UnknownChannel(u32, u16), #[error("Channel `{0}` referenced unknown schema {1}")] @@ -196,5 +200,5 @@ pub struct Attachment<'a> { pub data: Cow<'a, [u8]>, } -pub use read::{MessageStream, Summary}; +pub use read::{parse_record, MessageStream, Summary}; pub use write::{WriteOptions, Writer}; diff --git a/rust/src/read.rs b/rust/src/read.rs index 72dad85fb2..94d8d53607 100644 --- a/rust/src/read.rs +++ b/rust/src/read.rs @@ -136,15 +136,16 @@ fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult McapResult> { +/// Given a records' opcode and data, parse into a Record. The resulting Record will contain +/// borrowed slices from `body`. +pub fn parse_record(op: u8, body: &[u8]) -> McapResult> { macro_rules! record { ($b:ident) => {{ let mut cur = Cursor::new($b); @@ -278,7 +279,7 @@ impl<'a> ChunkReader<'a> { #[cfg(feature = "lz4")] "lz4" => ChunkDecompressor::Compressed(Some(CountingCrcReader::new(Box::new( - lz4_flex::frame::FrameDecoder::new(data), + lz4::Decoder::new(data)?, )))), #[cfg(not(feature = "lz4"))] diff --git a/rust/src/records.rs b/rust/src/records.rs index eeb842c495..0ac5d7f72a 100644 --- a/rust/src/records.rs +++ b/rust/src/records.rs @@ -99,6 +99,44 @@ impl Record<'_> { Record::Unknown { opcode, .. } => *opcode, } } + + /// Moves this value into a fully-owned variant with no borrows. This should be free for + /// already-owned values. + pub fn into_owned(self) -> Record<'static> { + match self { + Record::Header(header) => Record::Header(header), + Record::Footer(footer) => Record::Footer(footer), + Record::Schema { header, data } => Record::Schema { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::Channel(channel) => Record::Channel(channel), + Record::Message { header, data } => Record::Message { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::Chunk { header, data } => Record::Chunk { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::MessageIndex(index) => Record::MessageIndex(index), + Record::ChunkIndex(index) => Record::ChunkIndex(index), + Record::Attachment { header, data } => Record::Attachment { + header, + data: Cow::Owned(data.into_owned()), + }, + Record::AttachmentIndex(index) => Record::AttachmentIndex(index), + Record::Statistics(statistics) => Record::Statistics(statistics), + Record::Metadata(metadata) => Record::Metadata(metadata), + Record::MetadataIndex(index) => Record::MetadataIndex(index), + Record::SummaryOffset(offset) => Record::SummaryOffset(offset), + Record::DataEnd(end) => Record::DataEnd(end), + Record::Unknown { opcode, data } => Record::Unknown { + opcode, + data: Cow::Owned(data.into_owned()), + }, + } + } } #[binrw] diff --git a/rust/src/tokio.rs b/rust/src/tokio.rs new file mode 100644 index 0000000000..cf3d68ebd5 --- /dev/null +++ b/rust/src/tokio.rs @@ -0,0 +1,8 @@ +//! Read MCAP data from a stream asynchronously +#[cfg(feature = "lz4")] +mod lz4; +pub mod read; +mod read_exact_or_zero; + +pub use read::{RecordReader, RecordReaderOptions}; +use read_exact_or_zero::read_exact_or_zero; diff --git a/rust/src/tokio/lz4.rs b/rust/src/tokio/lz4.rs new file mode 100644 index 0000000000..05a0f65a9e --- /dev/null +++ b/rust/src/tokio/lz4.rs @@ -0,0 +1,146 @@ +use std::io::{Error, ErrorKind, Result}; +use std::pin::{pin, Pin}; +use std::ptr; +use std::task::{Context, Poll}; + +use lz4::liblz4::{ + check_error, LZ4FDecompressionContext, LZ4F_createDecompressionContext, LZ4F_decompress, + LZ4F_freeDecompressionContext, LZ4F_VERSION, +}; +use tokio::io::{AsyncRead, ReadBuf}; + +const BUFFER_SIZE: usize = 32 * 1024; + +#[derive(Debug)] +struct DecoderContext { + c: LZ4FDecompressionContext, +} + +// An adaptation of the [`lz4::Decoder`] [`std::io::Read`] impl, but for [`tokio::io::AsyncRead`]. +// Code below is adapted from the [lz4](https://github.com/10XGenomics/lz4-rs) crate source. +#[derive(Debug)] +pub struct Lz4Decoder { + c: DecoderContext, + r: R, + input_buf: Box<[u8]>, + unread_input_start: usize, + unread_input_end: usize, + next: usize, +} + +impl Lz4Decoder { + /// Creates a new decoder which reads its input from the given + /// input stream. The input stream can be re-acquired by calling + /// `finish()` + pub fn new(r: R) -> Result> { + Ok(Lz4Decoder { + r, + c: DecoderContext::new()?, + input_buf: vec![0; BUFFER_SIZE].into_boxed_slice(), + unread_input_start: BUFFER_SIZE, + unread_input_end: BUFFER_SIZE, + // Minimal LZ4 stream size + next: 11, + }) + } + + pub fn finish(self) -> (R, Result<()>) { + ( + self.r, + match self.next { + 0 => Ok(()), + _ => Err(Error::new( + ErrorKind::Interrupted, + "Finish called before end of compressed stream", + )), + }, + ) + } +} + +impl AsyncRead for Lz4Decoder { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + output_buf: &mut ReadBuf<'_>, + ) -> Poll> { + // Thre's nothing left to read. + if self.next == 0 || output_buf.remaining() == 0 { + return Poll::Ready(Ok(())); + } + let mut written_len: usize = 0; + let this = self.get_mut(); + while written_len == 0 { + // this reader buffers input data until it has enough to present to the lz4 frame decoder. + // if there's nothing unread, request more data from the reader. + if this.unread_input_start >= this.unread_input_end { + // request a full BUFFER_SIZE or the amount requested by the lz4 frame decoder, + // whichever is less. + let need = std::cmp::min(BUFFER_SIZE, this.next); + // try reading more input data. If it's not ready, return and try again later. + // NOTE: we don't need to save this stack frame as a future and re-enter it later + // because the only frame-local state `written_len` has not been modified and can be + // discarded. + { + let mut input_buf = ReadBuf::new(&mut this.input_buf[..need]); + let result = pin!(&mut this.r).poll_read(cx, &mut input_buf); + match result { + Poll::Pending => return result, + Poll::Ready(Err(_)) => return result, + _ => {} + }; + this.unread_input_start = 0; + this.unread_input_end = input_buf.filled().len(); + this.next -= this.unread_input_end; + } + // The read succeeded. If zero bytes were read, we're at the end of the stream. + if this.unread_input_end == 0 { + return Poll::Ready(Ok(())); + } + } + // feed bytes from our input buffer into the compressor, writing into the output + // buffer until either the output buffer is full or the input buffer is consumed. + while (written_len < output_buf.remaining()) + && (this.unread_input_start < this.unread_input_end) + { + let mut src_size = this.unread_input_end - this.unread_input_start; + let mut dst_size = output_buf.remaining() - written_len; + let prev_filled = output_buf.filled().len(); + let len = check_error(unsafe { + LZ4F_decompress( + this.c.c, + output_buf.initialize_unfilled().as_mut_ptr(), + &mut dst_size, + this.input_buf[this.unread_input_start..].as_ptr(), + &mut src_size, + ptr::null(), + ) + })?; + this.unread_input_start += src_size; + written_len += dst_size; + output_buf.set_filled(prev_filled + written_len); + if len == 0 { + this.next = 0; + return Poll::Ready(Ok(())); + } else if this.next < len { + this.next = len; + } + } + } + Poll::Ready(Ok(())) + } +} + +impl DecoderContext { + fn new() -> Result { + let mut context = LZ4FDecompressionContext(ptr::null_mut()); + check_error(unsafe { LZ4F_createDecompressionContext(&mut context, LZ4F_VERSION) })?; + Ok(DecoderContext { c: context }) + } +} + +impl Drop for DecoderContext { + fn drop(&mut self) { + unsafe { LZ4F_freeDecompressionContext(self.c) }; + } +} diff --git a/rust/src/tokio/read.rs b/rust/src/tokio/read.rs new file mode 100644 index 0000000000..50a1384250 --- /dev/null +++ b/rust/src/tokio/read.rs @@ -0,0 +1,434 @@ +use std::pin::{pin, Pin}; +use std::task::{Context, Poll}; + +#[cfg(feature = "zstd")] +use async_compression::tokio::bufread::ZstdDecoder; +use binrw::BinReaderExt; +use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take}; + +#[cfg(feature = "lz4")] +use crate::tokio::lz4::Lz4Decoder; +use crate::tokio::read_exact_or_zero; +use crate::{records, McapError, McapResult, MAGIC}; + +enum ReaderState { + Base(R), + UncompressedChunk(Take), + #[cfg(feature = "zstd")] + ZstdChunk(ZstdDecoder>>), + #[cfg(feature = "lz4")] + Lz4Chunk(Lz4Decoder>), + Empty, +} + +impl AsyncRead for ReaderState +where + R: AsyncRead + std::marker::Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.get_mut() { + ReaderState::Base(r) => pin!(r).poll_read(cx, buf), + ReaderState::UncompressedChunk(r) => pin!(r).poll_read(cx, buf), + #[cfg(feature = "zstd")] + ReaderState::ZstdChunk(r) => pin!(r).poll_read(cx, buf), + #[cfg(feature = "lz4")] + ReaderState::Lz4Chunk(r) => pin!(r).poll_read(cx, buf), + ReaderState::Empty => { + panic!("invariant: reader is only set to empty while swapping with another valid variant") + } + } + } +} +impl ReaderState +where + R: AsyncRead, +{ + pub fn into_inner(self) -> McapResult { + match self { + ReaderState::Base(reader) => Ok(reader), + ReaderState::UncompressedChunk(take) => Ok(take.into_inner()), + #[cfg(feature = "zstd")] + ReaderState::ZstdChunk(decoder) => Ok(decoder.into_inner().into_inner().into_inner()), + #[cfg(feature = "lz4")] + ReaderState::Lz4Chunk(decoder) => { + let (output, result) = decoder.finish(); + result?; + Ok(output.into_inner()) + } + ReaderState::Empty => { + panic!("invariant: reader is only set to empty while swapping with another valid variant") + } + } + } +} + +/// Reads an MCAP file record-by-record, writing the raw record data into a caller-provided Vec. +/// ```no_run +/// use std::fs; +/// +/// use tokio::fs::File; +/// +/// async fn read_it() { +/// let file = File::open("in.mcap").await.expect("couldn't open file"); +/// let mut record_buf: Vec = Vec::new(); +/// let mut reader = mcap::tokio::RecordReader::new(file); +/// while let Some(result) = reader.next_record(&mut record_buf).await { +/// let opcode = result.expect("couldn't read next record"); +/// let raw_record = mcap::parse_record(opcode, &record_buf[..]).expect("couldn't parse"); +/// // do something with the record... +/// } +/// } +/// ``` +pub struct RecordReader { + reader: ReaderState, + options: RecordReaderOptions, + start_magic_seen: bool, + footer_seen: bool, + to_discard_after_chunk: usize, + scratch: Box<[u8]>, +} + +#[derive(Default, Clone)] +pub struct RecordReaderOptions { + /// If true, the reader will not expect the MCAP magic at the start of the stream. + pub skip_start_magic: bool, + /// If true, the reader will not expect the MCAP magic at the end of the stream. + pub skip_end_magic: bool, + /// If true, the reader will yield entire chunk records. Otherwise, the reader will decompress + /// and read into the chunk, yielding the records inside. + pub emit_chunks: bool, +} + +enum Cmd { + YieldRecord(u8), + EnterChunk { + header: records::ChunkHeader, + len: u64, + }, + ExitChunk, + Stop, +} + +impl RecordReader +where + R: AsyncRead + std::marker::Unpin, +{ + pub fn new(reader: R) -> Self { + Self::new_with_options(reader, &RecordReaderOptions::default()) + } + + pub fn new_with_options(reader: R, options: &RecordReaderOptions) -> Self { + Self { + reader: ReaderState::Base(reader), + options: options.clone(), + start_magic_seen: false, + footer_seen: false, + to_discard_after_chunk: 0, + scratch: vec![0; 1024].into_boxed_slice(), + } + } + + pub fn into_inner(self) -> McapResult { + self.reader.into_inner() + } + + /// Reads the next record from the input stream and copies the raw content into `data`. + /// Returns the record's opcode as a result. + pub async fn next_record(&mut self, data: &mut Vec) -> Option> { + loop { + let cmd = match self.next_record_inner(data).await { + Ok(cmd) => cmd, + Err(err) => return Some(Err(err)), + }; + match cmd { + Cmd::Stop => return None, + Cmd::YieldRecord(opcode) => return Some(Ok(opcode)), + Cmd::EnterChunk { header, len } => { + let mut reader_state = ReaderState::Empty; + std::mem::swap(&mut reader_state, &mut self.reader); + match header.compression.as_str() { + #[cfg(feature = "zstd")] + "zstd" => { + let reader = match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }; + self.reader = ReaderState::ZstdChunk(ZstdDecoder::new( + tokio::io::BufReader::new(reader.take(header.compressed_size)), + )); + } + #[cfg(feature = "lz4")] + "lz4" => { + let reader = match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }; + let decoder = match Lz4Decoder::new(reader.take(header.compressed_size)) + { + Ok(decoder) => decoder, + Err(err) => return Some(Err(err.into())), + }; + self.reader = ReaderState::Lz4Chunk(decoder); + } + "" => { + let reader = match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }; + self.reader = + ReaderState::UncompressedChunk(reader.take(header.compressed_size)); + } + _ => { + std::mem::swap(&mut reader_state, &mut self.reader); + return Some(Err(McapError::UnsupportedCompression( + header.compression.clone(), + ))); + } + } + self.to_discard_after_chunk = len as usize + - (40 + header.compression.len() + header.compressed_size as usize); + } + Cmd::ExitChunk => { + let mut reader_state = ReaderState::Empty; + std::mem::swap(&mut reader_state, &mut self.reader); + self.reader = ReaderState::Base(match reader_state.into_inner() { + Ok(reader) => reader, + Err(err) => return Some(Err(err)), + }); + while self.to_discard_after_chunk > 0 { + let to_read = if self.to_discard_after_chunk > self.scratch.len() { + self.scratch.len() + } else { + self.to_discard_after_chunk + }; + match self.reader.read(&mut self.scratch[..to_read]).await { + Ok(n) => self.to_discard_after_chunk -= n, + Err(err) => return Some(Err(err.into())), + }; + } + } + }; + } + } + + async fn next_record_inner(&mut self, data: &mut Vec) -> McapResult { + if let ReaderState::Base(reader) = &mut self.reader { + if !self.start_magic_seen && !self.options.skip_start_magic { + reader.read_exact(&mut self.scratch[..MAGIC.len()]).await?; + if &self.scratch[..MAGIC.len()] != MAGIC { + return Err(McapError::BadMagic); + } + self.start_magic_seen = true; + } + if self.footer_seen && !self.options.skip_end_magic { + reader.read_exact(&mut self.scratch[..MAGIC.len()]).await?; + if &self.scratch[..MAGIC.len()] != MAGIC { + return Err(McapError::BadMagic); + } + return Ok(Cmd::Stop); + } + let readlen = read_exact_or_zero(reader, &mut self.scratch[..9]).await?; + if readlen == 0 { + if self.options.skip_end_magic { + return Ok(Cmd::Stop); + } else { + return Err(McapError::UnexpectedEof); + } + } + let opcode = self.scratch[0]; + if opcode == records::op::FOOTER { + self.footer_seen = true; + } + let record_len = u64::from_le_bytes(self.scratch[1..9].try_into().unwrap()); + if opcode == records::op::CHUNK && !self.options.emit_chunks { + let header = read_chunk_header(reader, data, record_len).await?; + return Ok(Cmd::EnterChunk { + header, + len: record_len, + }); + } + data.resize(record_len as usize, 0); + reader.read_exact(&mut data[..]).await?; + Ok(Cmd::YieldRecord(opcode)) + } else { + let len = read_exact_or_zero(&mut self.reader, &mut self.scratch[..9]).await?; + if len == 0 { + return Ok(Cmd::ExitChunk); + } + let opcode = self.scratch[0]; + let record_len = u64::from_le_bytes(self.scratch[1..9].try_into().unwrap()); + data.resize(record_len as usize, 0); + self.reader.read_exact(&mut data[..]).await?; + Ok(Cmd::YieldRecord(opcode)) + } + } +} + +async fn read_chunk_header( + reader: &mut R, + scratch: &mut Vec, + record_len: u64, +) -> McapResult { + let mut header = records::ChunkHeader { + message_start_time: 0, + message_end_time: 0, + uncompressed_size: 0, + uncompressed_crc: 0, + compression: String::new(), + compressed_size: 0, + }; + if record_len < 40 { + return Err(McapError::RecordTooShort { + opcode: records::op::CHUNK, + len: record_len, + expected: 40, + }); + } + scratch.resize(32, 0); + reader.read_exact(&mut scratch[..]).await?; + let compression_len: u32 = { + let mut cursor = std::io::Cursor::new(&scratch); + header.message_start_time = cursor.read_le()?; + header.message_end_time = cursor.read_le()?; + header.uncompressed_size = cursor.read_le()?; + header.uncompressed_crc = cursor.read_le()?; + cursor.read_le()? + }; + scratch.resize(compression_len as usize, 0); + if record_len < (40 + compression_len) as u64 { + return Err(McapError::RecordTooShort { + opcode: records::op::CHUNK, + len: record_len, + expected: (40 + compression_len) as u64, + }); + } + reader.read_exact(&mut scratch[..]).await?; + header.compression = match std::str::from_utf8(&scratch[..]) { + Ok(val) => val.to_owned(), + Err(err) => { + return Err(McapError::Parse(binrw::error::Error::Custom { + pos: 32, + err: Box::new(err), + })); + } + }; + scratch.resize(8, 0); + reader.read_exact(&mut scratch[..]).await?; + header.compressed_size = u64::from_le_bytes(scratch[..].try_into().unwrap()); + let available = record_len - (32 + compression_len as u64 + 8); + if available < header.compressed_size { + return Err(McapError::BadChunkLength { + header: header.compressed_size, + available, + }); + } + Ok(header) +} + +#[cfg(test)] +mod tests { + use crate::read::parse_record; + use std::collections::BTreeMap; + + use super::*; + #[tokio::test] + async fn test_record_reader() -> Result<(), McapError> { + for compression in [ + None, + #[cfg(feature = "zstd")] + Some(crate::Compression::Zstd), + #[cfg(feature = "lz4")] + Some(crate::Compression::Lz4), + ] { + let mut buf = std::io::Cursor::new(Vec::new()); + { + let mut writer = crate::WriteOptions::new() + .compression(compression) + .create(&mut buf)?; + let channel = std::sync::Arc::new(crate::Channel { + topic: "chat".to_owned(), + schema: None, + message_encoding: "json".to_owned(), + metadata: BTreeMap::new(), + }); + writer.add_channel(&channel)?; + writer.write(&crate::Message { + channel, + sequence: 0, + log_time: 0, + publish_time: 0, + data: (&[0, 1, 2]).into(), + })?; + writer.finish()?; + } + let mut reader = RecordReader::new(std::io::Cursor::new(buf.into_inner())); + let mut record = Vec::new(); + let mut opcodes: Vec = Vec::new(); + while let Some(opcode) = reader.next_record(&mut record).await { + let opcode = opcode?; + opcodes.push(opcode); + parse_record(opcode, &record)?; + } + assert_eq!( + opcodes.as_slice(), + [ + records::op::HEADER, + records::op::CHANNEL, + records::op::MESSAGE, + records::op::MESSAGE_INDEX, + records::op::DATA_END, + records::op::CHANNEL, + records::op::CHUNK_INDEX, + records::op::STATISTICS, + records::op::SUMMARY_OFFSET, + records::op::SUMMARY_OFFSET, + records::op::SUMMARY_OFFSET, + records::op::FOOTER, + ], + "reads opcodes from MCAP compressed with {:?}", + compression + ); + } + Ok(()) + } + #[cfg(feature = "lz4")] + #[tokio::test] + async fn test_lz4_decompression() -> Result<(), McapError> { + let mut buf = std::io::Cursor::new(Vec::new()); + { + let mut writer = crate::WriteOptions::new() + .compression(Some(crate::Compression::Lz4)) + .create(&mut buf)?; + let channel = std::sync::Arc::new(crate::Channel { + topic: "chat".to_owned(), + schema: None, + message_encoding: "json".to_owned(), + metadata: BTreeMap::new(), + }); + let data: Vec = vec![0; 1024]; + writer.add_channel(&channel)?; + for n in 0..10000 { + { + writer.write(&crate::Message { + channel: channel.clone(), + log_time: n, + publish_time: n, + sequence: n as u32, + data: std::borrow::Cow::Owned(data.clone()), + })?; + } + } + writer.finish()?; + } + let mut reader = RecordReader::new(std::io::Cursor::new(buf.into_inner())); + let mut record = Vec::new(); + while let Some(opcode) = reader.next_record(&mut record).await { + parse_record(opcode?, &record)?; + } + Ok(()) + } +} diff --git a/rust/src/tokio/read_exact_or_zero.rs b/rust/src/tokio/read_exact_or_zero.rs new file mode 100644 index 0000000000..cc4eb902d5 --- /dev/null +++ b/rust/src/tokio/read_exact_or_zero.rs @@ -0,0 +1,103 @@ +use tokio::io::{AsyncRead, AsyncReadExt}; + +/// read up to `buf.len()` bytes from `r` into `buf`. This repeatedly calls read() on `r` until +/// either the buffer is full or EOF is reached. If either 0 or buf.len() bytes were read before +/// EOF, Ok(n) is returned. If EOF is reached after 0 bytes but before buf.len(), Err(UnexpectedEOF) +/// is returned. +/// This is useful for cases where we expect either to read either a whole MCAP record or EOF. +pub(crate) async fn read_exact_or_zero( + r: &mut R, + buf: &mut [u8], +) -> Result { + let mut pos: usize = 0; + loop { + let readlen = r.read(&mut buf[pos..]).await?; + if readlen == 0 { + if pos != 0 { + return Err(std::io::ErrorKind::UnexpectedEof.into()); + } else { + return Ok(0); + } + } + pos += readlen; + if pos == buf.len() { + return Ok(pos); + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use std::cmp::min; + + struct ZeroReader { + remaining: usize, + max_read_len: usize, + } + + impl AsyncRead for ZeroReader { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let max_read_len = self.as_ref().max_read_len; + let remaining = self.as_ref().remaining; + if remaining == 0 { + return std::task::Poll::Ready(Ok(())); + } + let to_fill = min(min(remaining, buf.remaining()), max_read_len); + buf.initialize_unfilled_to(to_fill).fill(0); + buf.set_filled(to_fill); + self.as_mut().remaining -= to_fill; + return std::task::Poll::Ready(Ok(())); + } + } + #[tokio::test] + async fn test_full_read_is_not_error() { + let mut r = ZeroReader { + remaining: 10, + max_read_len: 10, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert_eq!(result.ok(), Some(10)); + assert_eq!(&buf[..], &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + } + + #[tokio::test] + async fn test_eof_is_not_error() { + let mut r = ZeroReader { + remaining: 0, + max_read_len: 10, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert_eq!(result.ok(), Some(0)); + assert_eq!(&buf[..], &[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + } + #[tokio::test] + async fn test_repeated_read_calls() { + let mut r = ZeroReader { + remaining: 10, + max_read_len: 4, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert_eq!(result.ok(), Some(10)); + assert_eq!(&buf[..], &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + } + #[tokio::test] + async fn test_partial_read_is_error() { + let mut r = ZeroReader { + remaining: 4, + max_read_len: 2, + }; + let mut buf: Vec = vec![1; 10]; + let result = read_exact_or_zero(&mut r, &mut buf).await; + assert!(!result.is_ok()); + assert_eq!(&buf[..], &[0, 0, 0, 0, 1, 1, 1, 1, 1, 1]); + } +} diff --git a/rust/src/write.rs b/rust/src/write.rs index 32e47a0524..3be18f9282 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -153,10 +153,7 @@ impl WriteOptions { /// If `None`, chunks will not be automatically closed and the user must call `flush()` to /// begin a new chunk. pub fn chunk_size(self, chunk_size: Option) -> Self { - Self { - chunk_size: chunk_size, - ..self - } + Self { chunk_size, ..self } } /// specifies whether to use chunks for storing messages. @@ -711,7 +708,7 @@ enum Compressor { #[cfg(feature = "zstd")] Zstd(zstd::Encoder<'static, W>), #[cfg(feature = "lz4")] - Lz4(lz4_flex::frame::FrameEncoder), + Lz4(lz4::Encoder), } impl Compressor { @@ -721,7 +718,11 @@ impl Compressor { #[cfg(feature = "zstd")] Compressor::Zstd(w) => w.finish()?, #[cfg(feature = "lz4")] - Compressor::Lz4(w) => w.finish()?, + Compressor::Lz4(w) => { + let (output, result) = w.finish(); + result?; + output + } }) } } @@ -797,7 +798,7 @@ impl ChunkWriter { Compressor::Zstd(enc) } #[cfg(feature = "lz4")] - Some(Compression::Lz4) => Compressor::Lz4(lz4_flex::frame::FrameEncoder::new(writer)), + Some(Compression::Lz4) => Compressor::Lz4(lz4::EncoderBuilder::new().build(writer)?), #[cfg(not(any(feature = "zstd", feature = "lz4")))] Some(_) => unreachable!("`Compression` is an empty enum that cannot be instantiated"), None => Compressor::Null(writer), diff --git a/rust/tests/attachment.rs b/rust/tests/attachment.rs index 76b6762777..57eb98b1eb 100644 --- a/rust/tests/attachment.rs +++ b/rust/tests/attachment.rs @@ -66,7 +66,8 @@ fn round_trip() -> Result<()> { ..Default::default() }), attachment_indexes: vec![mcap::records::AttachmentIndex { - offset: 38, // Finicky - depends on the length of the library version string + // offset depends on the length of the embedded library string, which includes the crate version + offset: 33 + (env!("CARGO_PKG_VERSION").len() as u64), length: 78, log_time: 2, create_time: 1, diff --git a/rust/tests/metadata.rs b/rust/tests/metadata.rs index 606d81e791..905ab98ff7 100644 --- a/rust/tests/metadata.rs +++ b/rust/tests/metadata.rs @@ -56,7 +56,8 @@ fn round_trip() -> Result<()> { ..Default::default() }), metadata_indexes: vec![mcap::records::MetadataIndex { - offset: 38, // Finicky - depends on the length of the library version string + // offset depends on the length of the embedded library string, which includes the crate version + offset: 33 + (env!("CARGO_PKG_VERSION").len() as u64), length: 41, name: String::from("myMetadata"), }], diff --git a/tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts b/tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts new file mode 100644 index 0000000000..8418d62aed --- /dev/null +++ b/tests/conformance/scripts/run-tests/runners/RustAsyncReaderTestRunner.ts @@ -0,0 +1,22 @@ +import { exec } from "child_process"; +import { join } from "path"; +import { promisify } from "util"; +import { TestVariant } from "variants/types"; + +import { StreamedReadTestRunner } from "./TestRunner"; +import { StreamedReadTestResult } from "../types"; + +export default class RustAsyncReaderTestRunner extends StreamedReadTestRunner { + readonly name = "rust-async-streamed-reader"; + + async runReadTest(filePath: string): Promise { + const { stdout } = await promisify(exec)(`./conformance_reader_async ${filePath}`, { + cwd: join(__dirname, "../../../../../rust/target/debug/examples"), + }); + return JSON.parse(stdout.trim()) as StreamedReadTestResult; + } + + supportsVariant(_variant: TestVariant): boolean { + return true; + } +} diff --git a/tests/conformance/scripts/run-tests/runners/index.ts b/tests/conformance/scripts/run-tests/runners/index.ts index 3c76f02148..6af475da47 100644 --- a/tests/conformance/scripts/run-tests/runners/index.ts +++ b/tests/conformance/scripts/run-tests/runners/index.ts @@ -8,6 +8,7 @@ import KaitaiStructReaderTestRunner from "./KaitaiStructReaderTestRunner"; import PythonIndexedReaderTestRunner from "./PythonIndexedReaderTestRunner"; import PythonStreamedReaderTestRunner from "./PythonStreamedReaderTestRunner"; import PythonWriterTestRunner from "./PythonWriterTestRunner"; +import RustAsyncReaderTestRunner from "./RustAsyncReaderTestRunner"; import RustReaderTestRunner from "./RustReaderTestRunner"; import RustWriterTestRunner from "./RustWriterTestRunner"; import SwiftIndexedReaderTestRunner from "./SwiftIndexedReaderTestRunner"; @@ -31,6 +32,7 @@ const runners: readonly (IndexedReadTestRunner | StreamedReadTestRunner | WriteT new TypescriptIndexedReaderTestRunner(), new TypescriptStreamedReaderTestRunner(), new TypescriptWriterTestRunner(), + new RustAsyncReaderTestRunner(), new RustReaderTestRunner(), new RustWriterTestRunner(), new SwiftWriterTestRunner(),