From 8284e1d9573f13b0c20c972e423713a2ee432311 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Mon, 4 Nov 2024 08:43:10 +0100 Subject: [PATCH 1/5] register single recording --- .../proto/rerun/v0/remote_store.proto | 14 ++---- .../src/v0/rerun.remote_store.v0.rs | 49 ++++++++----------- 2 files changed, 24 insertions(+), 39 deletions(-) diff --git a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto index 05e42cacd85d..7a14181f0302 100644 --- a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto @@ -9,18 +9,17 @@ service StorageNode { rpc Query(QueryRequest) returns (stream QueryResponse) {} rpc FetchRecording(FetchRecordingRequest) returns (stream FetchRecordingResponse) {} rpc GetRecordingMetadata(GetRecordingMetadataRequest) returns (GetRecordingMetadataResponse) {} - // TODO(zehiko) - should this be singular recording registration? Currently we can have 1 rrd => many recordings - rpc RegisterRecordings(RegisterRecordingsRequest) returns (RegisterRecordingsResponse) {} + rpc RegisterRecording(RegisterRecordingRequest) returns (RegisterRecordingResponse) {} } // ---------------- RegisterRecording ------------------ -message RegisterRecordingsRequest { +message RegisterRecordingRequest { // human readable description of the recording string description = 1; // information about recording's backing storage // TODO(zehiko) add separate info about the "source" recording - ObjectStorage obj_storage = 2; + string url = 2; // type of recording RecordingType typ = 3; // (optional) any additional metadata that should be associated with the recording @@ -34,12 +33,7 @@ message RecordingMetadata { bytes payload = 2; } -message ObjectStorage { - string bucket_name = 1; - string url = 2; -} - -message RegisterRecordingsResponse { +message RegisterRecordingResponse { // Note / TODO(zehiko): this implies we read the record (for example go through entire .rrd file // chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e. // only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true" diff --git a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs index b136d7f6fd47..cc19f73182af 100644 --- a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs @@ -243,14 +243,14 @@ impl ErrorCode { } } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct RegisterRecordingsRequest { +pub struct RegisterRecordingRequest { /// human readable description of the recording #[prost(string, tag = "1")] pub description: ::prost::alloc::string::String, /// information about recording's backing storage /// TODO(zehiko) add separate info about the "source" recording - #[prost(message, optional, tag = "2")] - pub obj_storage: ::core::option::Option, + #[prost(string, tag = "2")] + pub url: ::prost::alloc::string::String, /// type of recording #[prost(enumeration = "RecordingType", tag = "3")] pub typ: i32, @@ -268,14 +268,7 @@ pub struct RecordingMetadata { pub payload: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ObjectStorage { - #[prost(string, tag = "1")] - pub bucket_name: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub url: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct RegisterRecordingsResponse { +pub struct RegisterRecordingResponse { /// Note / TODO(zehiko): this implies we read the record (for example go through entire .rrd file /// chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e. /// only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true" @@ -577,23 +570,22 @@ pub mod storage_node_client { )); self.inner.unary(req, path, codec).await } - /// TODO(zehiko) - should this be singular recording registration? Currently we can have 1 rrd => many recordings - pub async fn register_recordings( + pub async fn register_recording( &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::unknown(format!("Service was not ready: {}", e.into())) })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/rerun.remote_store.v0.StorageNode/RegisterRecordings", + "/rerun.remote_store.v0.StorageNode/RegisterRecording", ); let mut req = request.into_request(); req.extensions_mut().insert(GrpcMethod::new( "rerun.remote_store.v0.StorageNode", - "RegisterRecordings", + "RegisterRecording", )); self.inner.unary(req, path, codec).await } @@ -638,11 +630,10 @@ pub mod storage_node_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - /// TODO(zehiko) - should this be singular recording registration? Currently we can have 1 rrd => many recordings - async fn register_recordings( + async fn register_recording( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct StorageNodeServer { @@ -884,22 +875,22 @@ pub mod storage_node_server { }; Box::pin(fut) } - "/rerun.remote_store.v0.StorageNode/RegisterRecordings" => { + "/rerun.remote_store.v0.StorageNode/RegisterRecording" => { #[allow(non_camel_case_types)] - struct RegisterRecordingsSvc(pub Arc); + struct RegisterRecordingSvc(pub Arc); impl - tonic::server::UnaryService - for RegisterRecordingsSvc + tonic::server::UnaryService + for RegisterRecordingSvc { - type Response = super::RegisterRecordingsResponse; + type Response = super::RegisterRecordingResponse; type Future = BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::register_recordings(&inner, request).await + ::register_recording(&inner, request).await }; Box::pin(fut) } @@ -910,7 +901,7 @@ pub mod storage_node_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = RegisterRecordingsSvc(inner); + let method = RegisterRecordingSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( From 907e54a73f5c25e5dff5f53b9c9f802513fa9328 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Mon, 4 Nov 2024 08:47:49 +0100 Subject: [PATCH 2/5] register returns single recording metadata --- .../re_remote_store_types/proto/rerun/v0/remote_store.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto index 7a14181f0302..510056293ec7 100644 --- a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto @@ -38,7 +38,7 @@ message RegisterRecordingResponse { // chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e. // only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true" // or 3/ do it always - repeated RecordingMetadata metadata = 2; + RecordingMetadata metadata = 2; } // Server can include details about the error as part of gRPC error (Status) From 731f512326125274b48f732181554e089999527c Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Mon, 4 Nov 2024 08:53:52 +0100 Subject: [PATCH 3/5] codegen --- .../re_remote_store_types/src/v0/rerun.remote_store.v0.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs index cc19f73182af..2208a0b82ef1 100644 --- a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs @@ -273,8 +273,8 @@ pub struct RegisterRecordingResponse { /// chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e. /// only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true" /// or 3/ do it always - #[prost(message, repeated, tag = "2")] - pub metadata: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub metadata: ::core::option::Option, } /// Server can include details about the error as part of gRPC error (Status) #[derive(Clone, PartialEq, ::prost::Message)] From 0cf41e570f1070045e8724fb98bbb2a1cfa19037 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Mon, 4 Nov 2024 11:05:39 +0100 Subject: [PATCH 4/5] RecordingMetadata relies on TransportChunk for carrying data around --- .../store/re_remote_store_types/src/codec.rs | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/crates/store/re_remote_store_types/src/codec.rs b/crates/store/re_remote_store_types/src/codec.rs index 184126b10e17..a880234543db 100644 --- a/crates/store/re_remote_store_types/src/codec.rs +++ b/crates/store/re_remote_store_types/src/codec.rs @@ -139,23 +139,23 @@ pub fn decode(version: EncoderVersion, data: &[u8]) -> Result>, + metadata: &TransportChunk, ) -> Result { - if unit_batch.len() > 1 { + if metadata.data.len() != 1 { return Err(CodecError::InvalidArgument(format!( "metadata record batch can only have a single row, batch with {} rows given", - unit_batch.len() + metadata.data.len() ))); - } + }; match version { EncoderVersion::V0 => { let mut data: Vec = Vec::new(); - write_arrow_to_bytes(&mut data, schema, unit_batch)?; + write_arrow_to_bytes(&mut data, &metadata.schema, &metadata.data)?; Ok(Self { encoder_version: version as i32, @@ -166,14 +166,17 @@ impl RecordingMetadata { } /// Get metadata as arrow data - pub fn data(&self) -> Result<(ArrowSchema, ArrowChunk>), CodecError> { + pub fn data(&self) -> Result { let mut reader = std::io::Cursor::new(self.payload.clone()); let encoder_version = EncoderVersion::try_from(self.encoder_version) .map_err(|err| CodecError::InvalidArgument(err.to_string()))?; match encoder_version { - EncoderVersion::V0 => read_arrow_from_bytes(&mut reader), + EncoderVersion::V0 => { + let (schema, data) = read_arrow_from_bytes(&mut reader)?; + Ok(TransportChunk { schema, data }) + } } } } @@ -224,6 +227,7 @@ mod tests { use arrow2::chunk::Chunk as ArrowChunk; use arrow2::{array::Int32Array, datatypes::Field, datatypes::Schema as ArrowSchema}; use re_dataframe::external::re_chunk::{Chunk, RowId}; + use re_dataframe::TransportChunk; use re_log_types::{example_components::MyPoint, Timeline}; use crate::v0::RecordingMetadata; @@ -328,15 +332,17 @@ mod tests { )]); let my_ints = Int32Array::from_slice([42]); let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]); + let metadata_tc = TransportChunk { + schema: expected_schema.clone(), + data: expected_chunk.clone(), + }; - let metadata = - RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk) - .unwrap(); + let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc).unwrap(); - let (schema, chunk) = metadata.data().unwrap(); + let tc = metadata.data().unwrap(); - assert_eq!(expected_schema, schema); - assert_eq!(expected_chunk, chunk); + assert_eq!(expected_schema, tc.schema); + assert_eq!(expected_chunk, tc.data); } #[test] @@ -350,9 +356,12 @@ mod tests { let my_ints = Int32Array::from_slice([41, 42]); let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]); + let metadata_tc = TransportChunk { + schema: expected_schema.clone(), + data: expected_chunk, + }; - let metadata = - RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk); + let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc); assert!(matches!( metadata.err().unwrap(), From 1bcd40b21470129c7d9ed127cf26337940862cdb Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Mon, 4 Nov 2024 11:28:05 +0100 Subject: [PATCH 5/5] include separate rec id in the register metadata response --- .../re_remote_store_types/proto/rerun/v0/remote_store.proto | 1 + .../store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto index 510056293ec7..4455ec4c0903 100644 --- a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto @@ -34,6 +34,7 @@ message RecordingMetadata { } message RegisterRecordingResponse { + RecordingId id = 1; // Note / TODO(zehiko): this implies we read the record (for example go through entire .rrd file // chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e. // only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true" diff --git a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs index 2208a0b82ef1..615cb9a32681 100644 --- a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs @@ -269,6 +269,8 @@ pub struct RecordingMetadata { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterRecordingResponse { + #[prost(message, optional, tag = "1")] + pub id: ::core::option::Option, /// Note / TODO(zehiko): this implies we read the record (for example go through entire .rrd file /// chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e. /// only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true"