Skip to content

Commit

Permalink
rust: add async reading functionality (#1211)
Browse files Browse the repository at this point in the history
### Changelog
- rust: switches the LZ4 compression dependency from `lz4_flex` to
`lz4-rs`. This moves us from using a pure-rust lz4 implementation to C
bindings. I believe this is worthwhile because `lz4_flex` does not
support LZ4 "high compression mode". The practical reason for doing so
in this PR is that `lz4_flex` does not expose interfaces that make it
easy to build an AsyncRead adapter for it, but `lz4-rs` does.
- rust: Adds structs to read MCAP data asynchronously in a linear
stream.

### Docs

- Check generated rust docs for review.


### Description
Adds an async `RecordReader`implementation, for reading MCAP data
asynchronously. This is an optional feature, named `tokio`. I chose this
feature flag name and this module name because this functionality is
tied heavily into the Tokio ecosystem. If at some point we rebuild this
to be async-executor-agnostic, we can add that functionality under a new
module and feature flag name.

<!-- Describe the problem, what has changed, and motivation behind those
changes. Pretend you are advocating for this change and the reader is
skeptical. -->

<!-- In addition to unit tests, describe any manual testing you did to
validate this change. -->

<table><tr><th>Before</th><th>After</th></tr><tr><td>

<!--before content goes here-->

</td><td>

<!--after content goes here-->

</td></tr></table>

<!-- If necessary, link relevant Linear or Github issues. Use `Fixes:
foxglove/repo#1234` to auto-close the Github issue or Fixes: FG-### for
Linear isses. -->
  • Loading branch information
james-rms authored Sep 15, 2024
1 parent 6a2fe35 commit c67c632
Show file tree
Hide file tree
Showing 17 changed files with 971 additions and 153 deletions.
13 changes: 10 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ jobs:
with:
toolchain: stable
default: true
- run: cd rust && cargo build --example=conformance_reader
- run: cargo build --example=conformance_reader --example=conformance_reader_async --features=tokio
working-directory: rust
- run: yarn install --immutable
- run: yarn test:conformance:generate-inputs --verify
- run: yarn test:conformance --runner rust-
Expand Down Expand Up @@ -490,13 +491,19 @@ jobs:
toolchain: stable
default: true
components: "rustfmt, clippy"
- run: rustup target add wasm32-unknown-unknown
- run: cargo fmt --all -- --check
- run: cargo clippy -- --no-deps
- run: cargo clippy --no-default-features -- --no-deps
- run: cargo clippy --no-default-features --features lz4 -- --no-deps
- run: cargo clippy --no-default-features --features zstd -- --no-deps
- run: cargo build
- run: cargo test
- run: cargo clippy --no-default-features --features tokio -- --no-deps
- run: cargo clippy --no-default-features --features tokio,lz4 -- --no-deps
- run: cargo clippy --no-default-features --features tokio,zstd -- --no-deps
- run: cargo build --all-features
- run: cargo test --all-features
- run: cargo build --all-features --target wasm32-unknown-unknown
- run: cargo check --all-features --target wasm32-unknown-unknown
- name: "publish to crates.io"
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/releases/rust/v')
run: cargo publish --token ${{ secrets.RUST_CRATES_IO_TOKEN }}
18 changes: 13 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ]
repository = "https://github.com/foxglove/mcap"
documentation = "https://docs.rs/mcap"
readme = "README.md"
version = "0.9.2"
version = "0.10.0"
edition = "2021"
license = "MIT"

Expand All @@ -22,7 +22,9 @@ log = "0.4"
num_cpus = "1.13"
paste = "1.0"
thiserror = "1.0"
lz4_flex = { version = "0.11.1", optional = true }
lz4 = { version = "1.27", optional = true }
async-compression = { version = "*", features = ["tokio"], optional = true }
tokio = { version = "1", features = ["io-util"] , optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
zstd = { version = "0.11", features = ["wasm"], optional = true }
Expand All @@ -32,22 +34,24 @@ zstd = { version = "0.11", features = ["zstdmt"], optional = true }

[features]
default = ["zstd", "lz4"]
zstd = ["dep:zstd"]
lz4 = ["dep:lz4_flex"]
zstd = ["dep:zstd", "async-compression/zstd"]
lz4 = ["dep:lz4"]
tokio = ["dep:async-compression", "dep:tokio"]

[dev-dependencies]
anyhow = "1"
atty = "0.2"
camino = "1.0"
clap = { version = "3.2", features = ["derive"]}
criterion = "0.5.1"
criterion = { version = "0.5.1", features = ["async_tokio"] }
itertools = "0.10"
memmap = "0.7"
rayon = "1.5"
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1"
simplelog = "0.12"
tempfile = "3.3"
tokio = { version = "1", features = ["io-util", "macros", "rt", "fs"] }

[[bench]]
name = "reader"
Expand All @@ -57,3 +61,7 @@ harness = false
opt-level = 3
debug = true
lto = true

[[example]]
name = "conformance_reader_async"
required-features = ["tokio"]
133 changes: 133 additions & 0 deletions rust/examples/common/serialization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use mcap::records::Record;

use std::collections::BTreeMap;

use serde_json::{json, Value};

// We don't want to force Serde on users just for the sake of the conformance tests.
// (In what context would you want to serialize individual records of a MCAP?)
// Stamp out and stringify them ourselves:

fn get_type(rec: &Record<'_>) -> &'static str {
match rec {
Record::Header(_) => "Header",
Record::Footer(_) => "Footer",
Record::Schema { .. } => "Schema",
Record::Channel(_) => "Channel",
Record::Message { .. } => "Message",
Record::Chunk { .. } => "Chunk",
Record::MessageIndex(_) => "MessageIndex",
Record::ChunkIndex(_) => "ChunkIndex",
Record::Attachment { .. } => "Attachment",
Record::AttachmentIndex(_) => "AttachmentIndex",
Record::Statistics(_) => "Statistics",
Record::Metadata(_) => "Metadata",
Record::MetadataIndex(_) => "MetadataIndex",
Record::SummaryOffset(_) => "SummaryOffset",
Record::DataEnd(_) => "DataEnd",
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

fn get_fields(rec: &Record<'_>) -> Value {
fn b2s(bytes: &[u8]) -> Vec<String> {
bytes.iter().map(|b| b.to_string()).collect()
}
fn m2s(map: &BTreeMap<u16, u64>) -> BTreeMap<String, String> {
map.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}

match rec {
Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]),
Record::Footer(f) => json!([
["summary_crc", f.summary_crc.to_string()],
["summary_offset_start", f.summary_offset_start.to_string()],
["summary_start", f.summary_start.to_string()]
]),
Record::Schema { header, data } => json!([
["data", b2s(data)],
["encoding", header.encoding],
["id", header.id.to_string()],
["name", header.name]
]),
Record::Channel(c) => json!([
["id", c.id.to_string()],
["message_encoding", c.message_encoding],
["metadata", c.metadata],
["schema_id", c.schema_id.to_string()],
["topic", c.topic]
]),
Record::Message { header, data } => json!([
["channel_id", header.channel_id.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["publish_time", header.publish_time.to_string()],
["sequence", header.sequence.to_string()]
]),
Record::Chunk { .. } => unreachable!("Chunks are flattened"),
Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"),
Record::ChunkIndex(i) => json!([
["chunk_length", i.chunk_length.to_string()],
["chunk_start_offset", i.chunk_start_offset.to_string()],
["compressed_size", i.compressed_size.to_string()],
["compression", i.compression],
["message_end_time", i.message_end_time.to_string()],
["message_index_length", i.message_index_length.to_string()],
["message_index_offsets", m2s(&i.message_index_offsets)],
["message_start_time", i.message_start_time.to_string()],
["uncompressed_size", i.uncompressed_size.to_string()]
]),
Record::Attachment { header, data } => json!([
["create_time", header.create_time.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["media_type", header.media_type],
["name", header.name]
]),
Record::AttachmentIndex(i) => json!([
["create_time", i.create_time.to_string()],
["data_size", i.data_size.to_string()],
["length", i.length.to_string()],
["log_time", i.log_time.to_string()],
["media_type", i.media_type],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::Statistics(s) => json!([
["attachment_count", s.attachment_count.to_string()],
["channel_count", s.channel_count.to_string()],
["channel_message_counts", m2s(&s.channel_message_counts)],
["chunk_count", s.chunk_count.to_string()],
["message_count", s.message_count.to_string()],
["message_end_time", s.message_end_time.to_string()],
["message_start_time", s.message_start_time.to_string()],
["metadata_count", s.metadata_count.to_string()],
["schema_count", s.schema_count.to_string()]
]),
Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]),
Record::MetadataIndex(i) => json!([
["length", i.length.to_string()],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::SummaryOffset(s) => json!([
["group_length", s.group_length.to_string()],
["group_opcode", s.group_opcode.to_string()],
["group_start", s.group_start.to_string()]
]),
Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]),
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

pub fn as_json(view: &Record<'_>) -> Value {
let typename = get_type(view);
let fields = get_fields(view);
json!({"type": typename, "fields": fields})
}
137 changes: 6 additions & 131 deletions rust/examples/conformance_reader.rs
Original file line number Diff line number Diff line change
@@ -1,136 +1,11 @@
use mcap::records::Record;

use std::{collections::BTreeMap, env, process};
#[path = "common/serialization.rs"]
mod serialization;

use serde_json::{json, Value};

// We don't want to force Serde on users just for the sake of the conformance tests.
// (In what context would you want to serialize individual records of a MCAP?)
// Stamp out and stringify them ourselves:

fn get_type(rec: &Record<'_>) -> &'static str {
match rec {
Record::Header(_) => "Header",
Record::Footer(_) => "Footer",
Record::Schema { .. } => "Schema",
Record::Channel(_) => "Channel",
Record::Message { .. } => "Message",
Record::Chunk { .. } => "Chunk",
Record::MessageIndex(_) => "MessageIndex",
Record::ChunkIndex(_) => "ChunkIndex",
Record::Attachment { .. } => "Attachment",
Record::AttachmentIndex(_) => "AttachmentIndex",
Record::Statistics(_) => "Statistics",
Record::Metadata(_) => "Metadata",
Record::MetadataIndex(_) => "MetadataIndex",
Record::SummaryOffset(_) => "SummaryOffset",
Record::DataEnd(_) => "DataEnd",
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

fn get_fields(rec: &Record<'_>) -> Value {
fn b2s(bytes: &[u8]) -> Vec<String> {
bytes.iter().map(|b| b.to_string()).collect()
}
fn m2s(map: &BTreeMap<u16, u64>) -> BTreeMap<String, String> {
map.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}

match rec {
Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]),
Record::Footer(f) => json!([
["summary_crc", f.summary_crc.to_string()],
["summary_offset_start", f.summary_offset_start.to_string()],
["summary_start", f.summary_start.to_string()]
]),
Record::Schema { header, data } => json!([
["data", b2s(data)],
["encoding", header.encoding],
["id", header.id.to_string()],
["name", header.name]
]),
Record::Channel(c) => json!([
["id", c.id.to_string()],
["message_encoding", c.message_encoding],
["metadata", c.metadata],
["schema_id", c.schema_id.to_string()],
["topic", c.topic]
]),
Record::Message { header, data } => json!([
["channel_id", header.channel_id.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["publish_time", header.publish_time.to_string()],
["sequence", header.sequence.to_string()]
]),
Record::Chunk { .. } => unreachable!("Chunks are flattened"),
Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"),
Record::ChunkIndex(i) => json!([
["chunk_length", i.chunk_length.to_string()],
["chunk_start_offset", i.chunk_start_offset.to_string()],
["compressed_size", i.compressed_size.to_string()],
["compression", i.compression],
["message_end_time", i.message_end_time.to_string()],
["message_index_length", i.message_index_length.to_string()],
["message_index_offsets", m2s(&i.message_index_offsets)],
["message_start_time", i.message_start_time.to_string()],
["uncompressed_size", i.uncompressed_size.to_string()]
]),
Record::Attachment { header, data } => json!([
["create_time", header.create_time.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["media_type", header.media_type],
["name", header.name]
]),
Record::AttachmentIndex(i) => json!([
["create_time", i.create_time.to_string()],
["data_size", i.data_size.to_string()],
["length", i.length.to_string()],
["log_time", i.log_time.to_string()],
["media_type", i.media_type],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::Statistics(s) => json!([
["attachment_count", s.attachment_count.to_string()],
["channel_count", s.channel_count.to_string()],
["channel_message_counts", m2s(&s.channel_message_counts)],
["chunk_count", s.chunk_count.to_string()],
["message_count", s.message_count.to_string()],
["message_end_time", s.message_end_time.to_string()],
["message_start_time", s.message_start_time.to_string()],
["metadata_count", s.metadata_count.to_string()],
["schema_count", s.schema_count.to_string()]
]),
Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]),
Record::MetadataIndex(i) => json!([
["length", i.length.to_string()],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::SummaryOffset(s) => json!([
["group_length", s.group_length.to_string()],
["group_opcode", s.group_opcode.to_string()],
["group_start", s.group_start.to_string()]
]),
Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]),
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

fn as_json(view: &Record<'_>) -> Value {
let typename = get_type(view);
let fields = get_fields(view);
json!({"type": typename, "fields": fields})
}
use mcap::records::Record;
use std::env;
use std::process;

pub fn main() {
let args: Vec<String> = env::args().collect();
Expand All @@ -143,7 +18,7 @@ pub fn main() {
for rec in mcap::read::ChunkFlattener::new(&file).expect("Couldn't read file") {
let r = rec.expect("failed to read next record");
if !matches!(r, Record::MessageIndex(_)) {
json_records.push(as_json(&r));
json_records.push(serialization::as_json(&r));
}
}
let out = json!({ "records": json_records });
Expand Down
Loading

0 comments on commit c67c632

Please sign in to comment.