diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs index 498bb830..62f6cb64 100644 --- a/databroker/src/grpc/kuksa_val_v2/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -43,6 +43,156 @@ impl From<&proto::Datapoint> for broker::Datapoint { } } +impl From for Option { + fn from(from: broker::Datapoint) -> Self { + match from.value { + broker::DataValue::NotAvailable => None, + broker::DataValue::Bool(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::String(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::String(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int32(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int64(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int64(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint32(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint64(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Float(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Float(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Double(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Double(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::BoolArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::BoolArray(proto::BoolArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::StringArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::StringArray(proto::StringArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int32Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int32Array(proto::Int32Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int64Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int64Array(proto::Int64Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint32Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32Array(proto::Uint32Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint64Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64Array(proto::Uint64Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::FloatArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::FloatArray(proto::FloatArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::DoubleArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::DoubleArray(proto::DoubleArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::ValueFailure(failure) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Failure(i32::from(&failure))), + timestamp: Some(from.ts.into()), + }), + } + } +} + +impl From<&broker::ValueFailure> for i32 { + fn from(from: &broker::ValueFailure) -> Self { + match from { + broker::ValueFailure::Unspecified => 0, + broker::ValueFailure::InvalidValue => 1, + broker::ValueFailure::NotProvided => 2, + broker::ValueFailure::UnknownSignal => 3, + broker::ValueFailure::AccessDenied => 4, + broker::ValueFailure::InternalError => 5, + } + } +} + +impl From<&i32> for broker::ValueFailure { + fn from(from: &i32) -> Self { + match from { + 1 => broker::ValueFailure::InvalidValue, + 2 => broker::ValueFailure::NotProvided, + 3 => broker::ValueFailure::UnknownSignal, + 4 => broker::ValueFailure::AccessDenied, + 5 => broker::ValueFailure::InternalError, + _ => broker::ValueFailure::Unspecified, + } + } +} + fn from_i32(value: i32) -> proto::ValueFailure { // Use a match statement to convert the i32 to the corresponding enum variant match value { diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index 28640afb..179a1e16 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -14,10 +14,11 @@ use std::{collections::HashMap, pin::Pin}; use crate::{ - broker::{self, AuthorizedAccess}, + broker::{self, AuthorizedAccess, SubscriptionError}, permissions::Permissions, }; +use core::result::Result::Ok; use databroker_proto::kuksa::val::v2::{ self as proto, open_provider_stream_request::Action::{ @@ -26,32 +27,43 @@ use databroker_proto::kuksa::val::v2::{ open_provider_stream_response, OpenProviderStreamResponse, PublishValuesResponse, }; +use std::collections::HashSet; use tokio::{select, sync::mpsc}; -use tokio_stream::{wrappers::ReceiverStream, Stream}; -use tonic::{Code, Response}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::debug; +const MAX_REQUEST_PATH_LENGTH: usize = 1000; + #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { async fn get_value( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) } async fn get_values( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) } async fn list_values( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) } type SubscribeStream = Pin< @@ -65,37 +77,124 @@ impl proto::val_server::Val for broker::DataBroker { async fn subscribe( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let signal_ids = request.signal_ids; + + let mut valid_requests: HashMap> = HashMap::new(); + + for signal_id in signal_ids { + valid_requests.insert( + get_signal_id(Some(signal_id), &broker).await.unwrap(), + vec![broker::Field::Datapoint].into_iter().collect(), + ); + } + + match broker.subscribe(valid_requests).await { + Ok(stream) => { + let stream = convert_to_proto_stream(stream); + Ok(tonic::Response::new(Box::pin(stream))) + } + Err(SubscriptionError::NotFound) => { + Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")) + } + Err(SubscriptionError::InvalidInput) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Invalid Argument", + )), + Err(SubscriptionError::InternalError) => { + Err(tonic::Status::new(tonic::Code::Internal, "Internal Error")) + } + } } async fn actuate( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) } async fn batch_actuate( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) } async fn list_metadata( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) } async fn publish_value( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let mut updates: HashMap = HashMap::new(); + + updates.insert( + get_signal_id(request.signal_id, &broker).await.unwrap(), + request.data_point.unwrap(), + ); + + let values_request: proto::PublishValuesRequest = proto::PublishValuesRequest { + request_id: 1, + datapoints: updates, + }; + let publish_values_response = publish_values(&broker, &values_request).await; + if publish_values_response.status.is_empty() { + Ok(tonic::Response::new(proto::PublishValueResponse { + error: None, + })) + } else { + if let Some((_, err)) = publish_values_response.status.iter().next() { + Ok(tonic::Response::new(proto::PublishValueResponse { + error: Some(err.clone()), + })) + } else { + Err(tonic::Status::internal( + "There is no error provided for the entry", + )) + } + } } // type OpenProviderStreamStream = Pin< @@ -251,7 +350,7 @@ impl proto::val_server::Val for broker::DataBroker { break; }, Some(PublishValuesRequest(publish_values_request)) => { - let response = publish_values(&broker, &publish_values_request).await; + let response = provider_stream_publish_values(&broker, &publish_values_request).await; if let Err(err) = response_stream_sender.send(Ok(response)).await { debug!("Failed to send response: {}", err); @@ -288,22 +387,28 @@ impl proto::val_server::Val for broker::DataBroker { } }); - // Return the error stream - Ok(Response::new(ReceiverStream::new(response_stream_receiver))) + Ok(tonic::Response::new(ReceiverStream::new( + response_stream_receiver, + ))) } async fn get_server_info( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) } } async fn publish_values( broker: &AuthorizedAccess<'_, '_>, request: &databroker_proto::kuksa::val::v2::PublishValuesRequest, -) -> OpenProviderStreamResponse { +) -> PublishValuesResponse { + debug!(?request); + let ids: Vec<(i32, broker::EntryUpdate)> = request .datapoints .iter() @@ -325,32 +430,86 @@ async fn publish_values( .collect(); match broker.update_entries(ids).await { - Ok(_) => OpenProviderStreamResponse { - action: Some( - open_provider_stream_response::Action::PublishValuesResponse( - PublishValuesResponse { - request_id: request.request_id, - status: HashMap::new(), - }, - ), - ), + Ok(_) => proto::PublishValuesResponse { + request_id: request.request_id, + status: HashMap::new(), }, - Err(err) => OpenProviderStreamResponse { - action: Some( - open_provider_stream_response::Action::PublishValuesResponse( - PublishValuesResponse { - request_id: request.request_id, - status: err - .iter() - .map(|(id, error)| (*id, proto::Error::from(error))) - .collect(), - }, - ), - ), + Err(err) => PublishValuesResponse { + request_id: request.request_id, + status: err + .iter() + .map(|(id, error)| (*id, proto::Error::from(error))) + .collect(), }, } } +async fn provider_stream_publish_values( + broker: &AuthorizedAccess<'_, '_>, + request: &databroker_proto::kuksa::val::v2::PublishValuesRequest, +) -> OpenProviderStreamResponse { + let publish_values_response = publish_values(broker, request).await; + OpenProviderStreamResponse { + action: Some( + open_provider_stream_response::Action::PublishValuesResponse(publish_values_response), + ), + } +} + +async fn get_signal_id( + signal_id: Option, + broker: &AuthorizedAccess<'_, '_>, +) -> Result { + if let Some(signal) = signal_id.unwrap().signal { + match signal { + proto::signal_id::Signal::Path(path) => { + if path.len() > MAX_REQUEST_PATH_LENGTH { + return Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "The provided path is too long", + )); + } + match broker.get_id_by_path(&path).await { + Some(id) => Ok(id), + None => Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")), + } + } + proto::signal_id::Signal::Id(id) => match broker.get_metadata(id).await { + Some(_metadata) => Ok(id), + None => Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")), + }, + } + } else { + Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "No SignalId provided", + )) + } +} + +fn convert_to_proto_stream( + input: impl Stream, +) -> impl Stream> { + input.map(move |item| { + let mut entries: HashMap = HashMap::new(); + for update in item.updates { + let update_datapoint: Option = match update.update.datapoint { + Some(datapoint) => datapoint.into(), + None => None, + }; + entries.insert( + update + .update + .path + .expect("Something wrong with subscriptions!"), + update_datapoint.expect("Something wrong with subscriptions!"), + ); + } + let response = proto::SubscribeResponse { entries }; + Ok(response) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -360,6 +519,127 @@ mod tests { BatchActuateStreamRequest, ProvideActuatorResponse, PublishValuesResponse, }; use proto::{open_provider_stream_request, OpenProviderStreamRequest, PublishValuesRequest}; + use tokio::stream; + use std::time::SystemTime; + + /* + Test subscribe service method + */ + #[tokio::test] + async fn test_subscribe() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + let entry_id_1 = authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let entry_id_2 = authorized_access + .add_entry( + "test.datapoint2".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 2".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let request = tonic::Request::new(proto::SubscribeRequest { + signal_ids: vec![ + proto::SignalId { + signal: Some(proto::signal_id::Signal::Path("sample_path".to_string())), + }, + proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id_2)), + }, + ], + }); + + let result = broker.subscribe(request).await; + + tokio::spawn(async move { + if let Ok(stream) = result { + // Process the stream by iterating over the items + let mut stream = stream.into_inner(); + let mut item_count = 0; + while let Some(item) = stream.next().await { + match item { + Ok(subscribe_response) => { + // Process the SubscribeResponse + let response = subscribe_response.entries; + assert_eq!(response.len(), 1); + if let Some((path, datapoint)) = response.iter().next() { + if item_count == 1 { + assert_eq!(path, "test.datapoint1") + } + if item_count == 2 { + assert_eq!(path, "test.datapoint2") + } + if let Some(value) = &datapoint.value_state { + assert_eq!( + *value, + proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)) + }) + ); + } else { + assert!(false); + } + } + item_count += 1; + } + Err(_) => { + assert!(false); + } + } + } + + // Assert the total number of items processed + assert_eq!(item_count, 2); + } else if let Err(_) = result { + assert!(false) + } + }); + + tokio::spawn(async move { + let request_1 = proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id_1)), + }), + data_point: Some(proto::Datapoint { + timestamp: None, + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + })), + }), + }; + let _ = broker.publish_value(tonic::Request::new(request_1)); + let request_2 = proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id_2)), + }), + data_point: Some(proto::Datapoint { + timestamp: None, + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + })), + }), + }; + let _ = broker.publish_value(tonic::Request::new(request_2)); + }); + } /* Test open_provider_stream service method