Skip to content

Commit

Permalink
leverage TransportChunk for registration metadata instead of carrying…
Browse files Browse the repository at this point in the history
… arrow data + schema tuple around (#7985)

Using TransportChunk makes it simpler to work with recording metadata. Also adding id to the recording metadata to avoid the need to decode arrow data just for the recording id.
  • Loading branch information
zehiko authored Nov 4, 2024
1 parent 045048f commit 85d152a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message RecordingMetadata {
}

message RegisterRecordingResponse {
RecordingId id = 1;
// Note / TODO(zehiko): this implies we read the record (for example go through entire .rrd file
// chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e.
// only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true"
Expand Down
43 changes: 26 additions & 17 deletions crates/store/re_remote_store_types/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,23 @@ pub fn decode(version: EncoderVersion, data: &[u8]) -> Result<Option<TransportCh
}

impl RecordingMetadata {
/// Create `RecordingMetadata` from arrow schema and arrow record batch
/// Create `RecordingMetadata` from `TransportChunk`. We rely on `TransportChunk` until
/// we migrate from arrow2 to arrow.
pub fn try_from(
version: EncoderVersion,
schema: &ArrowSchema,
unit_batch: &ArrowChunk<Box<dyn ArrowArray>>,
metadata: &TransportChunk,
) -> Result<Self, CodecError> {
if unit_batch.len() > 1 {
if metadata.data.len() != 1 {
return Err(CodecError::InvalidArgument(format!(
"metadata record batch can only have a single row, batch with {} rows given",
unit_batch.len()
metadata.data.len()
)));
}
};

match version {
EncoderVersion::V0 => {
let mut data: Vec<u8> = Vec::new();
write_arrow_to_bytes(&mut data, schema, unit_batch)?;
write_arrow_to_bytes(&mut data, &metadata.schema, &metadata.data)?;

Ok(Self {
encoder_version: version as i32,
Expand All @@ -166,14 +166,17 @@ impl RecordingMetadata {
}

/// Get metadata as arrow data
pub fn data(&self) -> Result<(ArrowSchema, ArrowChunk<Box<dyn ArrowArray>>), CodecError> {
pub fn data(&self) -> Result<TransportChunk, CodecError> {
let mut reader = std::io::Cursor::new(self.payload.clone());

let encoder_version = EncoderVersion::try_from(self.encoder_version)
.map_err(|err| CodecError::InvalidArgument(err.to_string()))?;

match encoder_version {
EncoderVersion::V0 => read_arrow_from_bytes(&mut reader),
EncoderVersion::V0 => {
let (schema, data) = read_arrow_from_bytes(&mut reader)?;
Ok(TransportChunk { schema, data })
}
}
}
}
Expand Down Expand Up @@ -224,6 +227,7 @@ mod tests {
use arrow2::chunk::Chunk as ArrowChunk;
use arrow2::{array::Int32Array, datatypes::Field, datatypes::Schema as ArrowSchema};
use re_dataframe::external::re_chunk::{Chunk, RowId};
use re_dataframe::TransportChunk;
use re_log_types::{example_components::MyPoint, Timeline};

use crate::v0::RecordingMetadata;
Expand Down Expand Up @@ -328,15 +332,17 @@ mod tests {
)]);
let my_ints = Int32Array::from_slice([42]);
let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);
let metadata_tc = TransportChunk {
schema: expected_schema.clone(),
data: expected_chunk.clone(),
};

let metadata =
RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk)
.unwrap();
let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc).unwrap();

let (schema, chunk) = metadata.data().unwrap();
let tc = metadata.data().unwrap();

assert_eq!(expected_schema, schema);
assert_eq!(expected_chunk, chunk);
assert_eq!(expected_schema, tc.schema);
assert_eq!(expected_chunk, tc.data);
}

#[test]
Expand All @@ -350,9 +356,12 @@ mod tests {
let my_ints = Int32Array::from_slice([41, 42]);

let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);
let metadata_tc = TransportChunk {
schema: expected_schema.clone(),
data: expected_chunk,
};

let metadata =
RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk);
let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc);

assert!(matches!(
metadata.err().unwrap(),
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 85d152a

Please sign in to comment.