Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rust: add async reading functionality #1211

Merged
merged 25 commits into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
75ba3c0
rust: add async under `tokio` feature flag
james-rms Aug 9, 2024
71ba806
discard bytes after reading chunk
james-rms Aug 14, 2024
07e6571
nicer namespace usage
james-rms Aug 14, 2024
4dbb4cc
tweak attachment test
james-rms Aug 14, 2024
39b00f9
remove stream implementation
james-rms Aug 15, 2024
db3131b
no longer depend on crate version string
james-rms Aug 15, 2024
829c5db
add conformance test, rename to parse_record
james-rms Aug 15, 2024
69bbdf6
add default exports from tokio module
james-rms Aug 15, 2024
d9b3e69
test all features
james-rms Aug 15, 2024
6b2d0af
zstd is fully optional
james-rms Aug 15, 2024
5e726d5
transpose result of read_record
james-rms Aug 15, 2024
fcc6a47
do not fail on subslices if skipping end magic
james-rms Aug 16, 2024
93d1260
Add benchmark(s)
james-rms Aug 16, 2024
86f362d
fix lz4
james-rms Aug 16, 2024
8d25bc9
add read_exact_or_zero
james-rms Aug 16, 2024
2e792a7
remove constraint from lz4 decoder constructor fns
james-rms Aug 16, 2024
38434b0
reword docstring
james-rms Aug 17, 2024
1056c7c
remove benches (until we have a messagestream impl)
james-rms Aug 20, 2024
31877c6
test read_exact_or_zero
james-rms Aug 20, 2024
cc53dbd
lz4: clarify and comment on async reader
james-rms Sep 11, 2024
c9a4179
tokio/read.rs: do not save opcodes in test
james-rms Sep 11, 2024
986022f
update to lz4 v1.27
james-rms Sep 13, 2024
6854c86
check builds for wasm32
james-rms Sep 13, 2024
c5b48e2
add a non-full read test
james-rms Sep 15, 2024
77d542b
remove byteorder dep
james-rms Sep 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ jobs:
with:
toolchain: stable
default: true
- run: cd rust && cargo build --example=conformance_reader
- run: cargo build --example=conformance_reader --example=conformance_reader_async --features=tokio
working-directory: rust
- run: yarn install --immutable
- run: yarn test:conformance:generate-inputs --verify
- run: yarn test:conformance --runner rust-
Expand Down Expand Up @@ -490,13 +491,19 @@ jobs:
toolchain: stable
default: true
components: "rustfmt, clippy"
- run: rustup target add wasm32-unknown-unknown
- run: cargo fmt --all -- --check
- run: cargo clippy -- --no-deps
- run: cargo clippy --no-default-features -- --no-deps
- run: cargo clippy --no-default-features --features lz4 -- --no-deps
- run: cargo clippy --no-default-features --features zstd -- --no-deps
- run: cargo build
- run: cargo test
- run: cargo clippy --no-default-features --features tokio -- --no-deps
- run: cargo clippy --no-default-features --features tokio,lz4 -- --no-deps
- run: cargo clippy --no-default-features --features tokio,zstd -- --no-deps
- run: cargo build --all-features
- run: cargo test --all-features
- run: cargo build --all-features --target wasm32-unknown-unknown
- run: cargo check --all-features --target wasm32-unknown-unknown
- name: "publish to crates.io"
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/releases/rust/v')
run: cargo publish --token ${{ secrets.RUST_CRATES_IO_TOKEN }}
18 changes: 13 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ]
repository = "https://github.com/foxglove/mcap"
documentation = "https://docs.rs/mcap"
readme = "README.md"
version = "0.9.2"
version = "0.10.0"
edition = "2021"
license = "MIT"

Expand All @@ -22,7 +22,9 @@ log = "0.4"
num_cpus = "1.13"
paste = "1.0"
thiserror = "1.0"
lz4_flex = { version = "0.11.1", optional = true }
lz4 = { version = "1.27", optional = true }
async-compression = { version = "*", features = ["tokio"], optional = true }
tokio = { version = "1", features = ["io-util"] , optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
zstd = { version = "0.11", features = ["wasm"], optional = true }
Expand All @@ -32,22 +34,24 @@ zstd = { version = "0.11", features = ["zstdmt"], optional = true }

[features]
default = ["zstd", "lz4"]
zstd = ["dep:zstd"]
lz4 = ["dep:lz4_flex"]
zstd = ["dep:zstd", "async-compression/zstd"]
lz4 = ["dep:lz4"]
tokio = ["dep:async-compression", "dep:tokio"]

[dev-dependencies]
anyhow = "1"
atty = "0.2"
camino = "1.0"
clap = { version = "3.2", features = ["derive"]}
criterion = "0.5.1"
criterion = { version = "0.5.1", features = ["async_tokio"] }
itertools = "0.10"
memmap = "0.7"
rayon = "1.5"
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1"
simplelog = "0.12"
tempfile = "3.3"
tokio = { version = "1", features = ["io-util", "macros", "rt", "fs"] }

[[bench]]
name = "reader"
Expand All @@ -57,3 +61,7 @@ harness = false
opt-level = 3
debug = true
lto = true

[[example]]
name = "conformance_reader_async"
required-features = ["tokio"]
133 changes: 133 additions & 0 deletions rust/examples/common/serialization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use mcap::records::Record;

use std::collections::BTreeMap;

use serde_json::{json, Value};

// We don't want to force Serde on users just for the sake of the conformance tests.
// (In what context would you want to serialize individual records of a MCAP?)
// Stamp out and stringify them ourselves:

fn get_type(rec: &Record<'_>) -> &'static str {
match rec {
Record::Header(_) => "Header",
Record::Footer(_) => "Footer",
Record::Schema { .. } => "Schema",
Record::Channel(_) => "Channel",
Record::Message { .. } => "Message",
Record::Chunk { .. } => "Chunk",
Record::MessageIndex(_) => "MessageIndex",
Record::ChunkIndex(_) => "ChunkIndex",
Record::Attachment { .. } => "Attachment",
Record::AttachmentIndex(_) => "AttachmentIndex",
Record::Statistics(_) => "Statistics",
Record::Metadata(_) => "Metadata",
Record::MetadataIndex(_) => "MetadataIndex",
Record::SummaryOffset(_) => "SummaryOffset",
Record::DataEnd(_) => "DataEnd",
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

fn get_fields(rec: &Record<'_>) -> Value {
fn b2s(bytes: &[u8]) -> Vec<String> {
bytes.iter().map(|b| b.to_string()).collect()
}
fn m2s(map: &BTreeMap<u16, u64>) -> BTreeMap<String, String> {
map.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}

match rec {
Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]),
Record::Footer(f) => json!([
["summary_crc", f.summary_crc.to_string()],
["summary_offset_start", f.summary_offset_start.to_string()],
["summary_start", f.summary_start.to_string()]
]),
Record::Schema { header, data } => json!([
["data", b2s(data)],
["encoding", header.encoding],
["id", header.id.to_string()],
["name", header.name]
]),
Record::Channel(c) => json!([
["id", c.id.to_string()],
["message_encoding", c.message_encoding],
["metadata", c.metadata],
["schema_id", c.schema_id.to_string()],
["topic", c.topic]
]),
Record::Message { header, data } => json!([
["channel_id", header.channel_id.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["publish_time", header.publish_time.to_string()],
["sequence", header.sequence.to_string()]
]),
Record::Chunk { .. } => unreachable!("Chunks are flattened"),
Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"),
Record::ChunkIndex(i) => json!([
["chunk_length", i.chunk_length.to_string()],
["chunk_start_offset", i.chunk_start_offset.to_string()],
["compressed_size", i.compressed_size.to_string()],
["compression", i.compression],
["message_end_time", i.message_end_time.to_string()],
["message_index_length", i.message_index_length.to_string()],
["message_index_offsets", m2s(&i.message_index_offsets)],
["message_start_time", i.message_start_time.to_string()],
["uncompressed_size", i.uncompressed_size.to_string()]
]),
Record::Attachment { header, data } => json!([
["create_time", header.create_time.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["media_type", header.media_type],
["name", header.name]
]),
Record::AttachmentIndex(i) => json!([
["create_time", i.create_time.to_string()],
["data_size", i.data_size.to_string()],
["length", i.length.to_string()],
["log_time", i.log_time.to_string()],
["media_type", i.media_type],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::Statistics(s) => json!([
["attachment_count", s.attachment_count.to_string()],
["channel_count", s.channel_count.to_string()],
["channel_message_counts", m2s(&s.channel_message_counts)],
["chunk_count", s.chunk_count.to_string()],
["message_count", s.message_count.to_string()],
["message_end_time", s.message_end_time.to_string()],
["message_start_time", s.message_start_time.to_string()],
["metadata_count", s.metadata_count.to_string()],
["schema_count", s.schema_count.to_string()]
]),
Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]),
Record::MetadataIndex(i) => json!([
["length", i.length.to_string()],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::SummaryOffset(s) => json!([
["group_length", s.group_length.to_string()],
["group_opcode", s.group_opcode.to_string()],
["group_start", s.group_start.to_string()]
]),
Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]),
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

pub fn as_json(view: &Record<'_>) -> Value {
let typename = get_type(view);
let fields = get_fields(view);
json!({"type": typename, "fields": fields})
}
137 changes: 6 additions & 131 deletions rust/examples/conformance_reader.rs
Original file line number Diff line number Diff line change
@@ -1,136 +1,11 @@
use mcap::records::Record;

use std::{collections::BTreeMap, env, process};
#[path = "common/serialization.rs"]
mod serialization;

use serde_json::{json, Value};

// We don't want to force Serde on users just for the sake of the conformance tests.
// (In what context would you want to serialize individual records of a MCAP?)
// Stamp out and stringify them ourselves:

fn get_type(rec: &Record<'_>) -> &'static str {
match rec {
Record::Header(_) => "Header",
Record::Footer(_) => "Footer",
Record::Schema { .. } => "Schema",
Record::Channel(_) => "Channel",
Record::Message { .. } => "Message",
Record::Chunk { .. } => "Chunk",
Record::MessageIndex(_) => "MessageIndex",
Record::ChunkIndex(_) => "ChunkIndex",
Record::Attachment { .. } => "Attachment",
Record::AttachmentIndex(_) => "AttachmentIndex",
Record::Statistics(_) => "Statistics",
Record::Metadata(_) => "Metadata",
Record::MetadataIndex(_) => "MetadataIndex",
Record::SummaryOffset(_) => "SummaryOffset",
Record::DataEnd(_) => "DataEnd",
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

fn get_fields(rec: &Record<'_>) -> Value {
fn b2s(bytes: &[u8]) -> Vec<String> {
bytes.iter().map(|b| b.to_string()).collect()
}
fn m2s(map: &BTreeMap<u16, u64>) -> BTreeMap<String, String> {
map.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}

match rec {
Record::Header(h) => json!([["library", h.library], ["profile", h.profile]]),
Record::Footer(f) => json!([
["summary_crc", f.summary_crc.to_string()],
["summary_offset_start", f.summary_offset_start.to_string()],
["summary_start", f.summary_start.to_string()]
]),
Record::Schema { header, data } => json!([
["data", b2s(data)],
["encoding", header.encoding],
["id", header.id.to_string()],
["name", header.name]
]),
Record::Channel(c) => json!([
["id", c.id.to_string()],
["message_encoding", c.message_encoding],
["metadata", c.metadata],
["schema_id", c.schema_id.to_string()],
["topic", c.topic]
]),
Record::Message { header, data } => json!([
["channel_id", header.channel_id.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["publish_time", header.publish_time.to_string()],
["sequence", header.sequence.to_string()]
]),
Record::Chunk { .. } => unreachable!("Chunks are flattened"),
Record::MessageIndex(_) => unreachable!("MessageIndexes are skipped"),
Record::ChunkIndex(i) => json!([
["chunk_length", i.chunk_length.to_string()],
["chunk_start_offset", i.chunk_start_offset.to_string()],
["compressed_size", i.compressed_size.to_string()],
["compression", i.compression],
["message_end_time", i.message_end_time.to_string()],
["message_index_length", i.message_index_length.to_string()],
["message_index_offsets", m2s(&i.message_index_offsets)],
["message_start_time", i.message_start_time.to_string()],
["uncompressed_size", i.uncompressed_size.to_string()]
]),
Record::Attachment { header, data } => json!([
["create_time", header.create_time.to_string()],
["data", b2s(data)],
["log_time", header.log_time.to_string()],
["media_type", header.media_type],
["name", header.name]
]),
Record::AttachmentIndex(i) => json!([
["create_time", i.create_time.to_string()],
["data_size", i.data_size.to_string()],
["length", i.length.to_string()],
["log_time", i.log_time.to_string()],
["media_type", i.media_type],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::Statistics(s) => json!([
["attachment_count", s.attachment_count.to_string()],
["channel_count", s.channel_count.to_string()],
["channel_message_counts", m2s(&s.channel_message_counts)],
["chunk_count", s.chunk_count.to_string()],
["message_count", s.message_count.to_string()],
["message_end_time", s.message_end_time.to_string()],
["message_start_time", s.message_start_time.to_string()],
["metadata_count", s.metadata_count.to_string()],
["schema_count", s.schema_count.to_string()]
]),
Record::Metadata(m) => json!([["metadata", m.metadata], ["name", m.name]]),
Record::MetadataIndex(i) => json!([
["length", i.length.to_string()],
["name", i.name],
["offset", i.offset.to_string()]
]),
Record::SummaryOffset(s) => json!([
["group_length", s.group_length.to_string()],
["group_opcode", s.group_opcode.to_string()],
["group_start", s.group_start.to_string()]
]),
Record::DataEnd(d) => json!([["data_section_crc", d.data_section_crc.to_string()]]),
Record::Unknown { opcode, .. } => {
panic!("Unknown record in conformance test: (op {opcode})")
}
}
}

fn as_json(view: &Record<'_>) -> Value {
let typename = get_type(view);
let fields = get_fields(view);
json!({"type": typename, "fields": fields})
}
use mcap::records::Record;
use std::env;
use std::process;

pub fn main() {
let args: Vec<String> = env::args().collect();
Expand All @@ -143,7 +18,7 @@ pub fn main() {
for rec in mcap::read::ChunkFlattener::new(&file).expect("Couldn't read file") {
let r = rec.expect("failed to read next record");
if !matches!(r, Record::MessageIndex(_)) {
json_records.push(as_json(&r));
json_records.push(serialization::as_json(&r));
}
}
let out = json!({ "records": json_records });
Expand Down
Loading
Loading