Skip to content

Commit

Permalink
Test for get_signal_id and HashMap:with_capacity adaption
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Aug 13, 2024
1 parent 26bbe93 commit 8968f48
Showing 1 changed file with 211 additions and 49 deletions.
260 changes: 211 additions & 49 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{
permissions::Permissions,
};

use core::result::Result::Ok;
use databroker_proto::kuksa::val::v2::{
self as proto,
open_provider_stream_request::Action::{
Expand Down Expand Up @@ -95,19 +94,23 @@ impl proto::val_server::Val for broker::DataBroker {
let request = request.into_inner();

let signal_ids = request.signal_ids;
let size = signal_ids.len();

let mut valid_requests: HashMap<i32, HashSet<broker::Field>> = HashMap::new();
let mut valid_requests: HashMap<i32, HashSet<broker::Field>> = HashMap::with_capacity(size);

for signal_id in signal_ids {
valid_requests.insert(
get_signal_id(Some(signal_id), &broker).await.unwrap(),
match get_signal_id(Some(signal_id), &broker).await {
Ok(signal_id) => signal_id,
Err(err) => return Err(err),
},
vec![broker::Field::Datapoint].into_iter().collect(),
);
}

match broker.subscribe(valid_requests).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);
let stream = convert_to_proto_stream(stream, size);
Ok(tonic::Response::new(Box::pin(stream)))
}
Err(SubscriptionError::NotFound) => {
Expand Down Expand Up @@ -227,6 +230,34 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

// Publish a signal value. Used for low frequency signals (e.g. attributes).
/// # Arguments
///
/// ```
/// `request`:
/// PublishValueRequest {
/// signal_id: <String or i32>
/// datapoint: Datapoint
/// }
///
/// # Response
/// `response`:
/// PublishValueResponse {
/// error: Error
/// }
/// ```
///
/// # Errors
/// Returns (GRPC error code):
/// NOT_FOUND if any of the signals are non-existant.
/// PERMISSION_DENIED
/// - if access is denied for any of the signals.
/// - if the signal is already provided by another provider.
/// INVALID_ARGUMENT
/// - if the data type used in the request does not match
/// the data type of the addressed signal
/// - if the published value is not accepted,
/// e.g. if sending an unsupported enum value
async fn publish_value(
&self,
request: tonic::Request<proto::PublishValueRequest>,
Expand All @@ -244,30 +275,44 @@ impl proto::val_server::Val for broker::DataBroker {

let request = request.into_inner();

let mut updates: HashMap<i32, proto::Datapoint> = HashMap::new();
let mut updates: HashMap<i32, broker::EntryUpdate> = HashMap::with_capacity(1);

updates.insert(
get_signal_id(request.signal_id, &broker).await.unwrap(),
request.data_point.unwrap(),
match get_signal_id(request.signal_id, &broker).await {
Ok(signal_id) => signal_id,
Err(err) => return Err(err),
},
broker::EntryUpdate {
path: None,
datapoint: Some(broker::Datapoint::from(&request.data_point.unwrap())),
actuator_target: None,
entry_type: None,
data_type: None,
description: None,
allowed: None,
unit: None,
},
);

let values_request: proto::PublishValuesRequest = proto::PublishValuesRequest {
request_id: 1,
datapoints: updates,
};
let publish_values_response = publish_values(&broker, &values_request).await;
if publish_values_response.status.is_empty() {
Ok(tonic::Response::new(proto::PublishValueResponse {
match broker.update_entries(updates).await {
Ok(()) => Ok(tonic::Response::new(proto::PublishValueResponse {
error: None,
}))
} else if let Some((_, err)) = publish_values_response.status.iter().next() {
Ok(tonic::Response::new(proto::PublishValueResponse {
error: Some(err.clone()),
}))
} else {
Err(tonic::Status::internal(
"There is no error provided for the entry",
))
})),
Err(errors) => {
if errors.is_empty() {
Ok(tonic::Response::new(proto::PublishValueResponse {
error: None,
}))
} else if let Some((_, err)) = errors.first() {
Ok(tonic::Response::new(proto::PublishValueResponse {
error: Some(err.into()),
}))
} else {
Err(tonic::Status::internal(
"There is no error provided for the entry",
))
}
}
}
}

Expand Down Expand Up @@ -424,7 +469,7 @@ impl proto::val_server::Val for broker::DataBroker {
break;
},
Some(PublishValuesRequest(publish_values_request)) => {
let response = provider_stream_publish_values(&broker, &publish_values_request).await;
let response = publish_values(&broker, &publish_values_request).await;
if let Err(err) = response_stream_sender.send(Ok(response)).await
{
debug!("Failed to send response: {}", err);
Expand Down Expand Up @@ -480,9 +525,7 @@ impl proto::val_server::Val for broker::DataBroker {
async fn publish_values(
broker: &AuthorizedAccess<'_, '_>,
request: &databroker_proto::kuksa::val::v2::PublishValuesRequest,
) -> PublishValuesResponse {
debug!(?request);

) -> OpenProviderStreamResponse {
let ids: Vec<(i32, broker::EntryUpdate)> = request
.datapoints
.iter()
Expand All @@ -504,32 +547,32 @@ async fn publish_values(
.collect();

match broker.update_entries(ids).await {
Ok(_) => proto::PublishValuesResponse {
request_id: request.request_id,
status: HashMap::new(),
Ok(_) => OpenProviderStreamResponse {
action: Some(
open_provider_stream_response::Action::PublishValuesResponse(
PublishValuesResponse {
request_id: request.request_id,
status: HashMap::new(),
},
),
),
},
Err(err) => PublishValuesResponse {
request_id: request.request_id,
status: err
.iter()
.map(|(id, error)| (*id, proto::Error::from(error)))
.collect(),
Err(err) => OpenProviderStreamResponse {
action: Some(
open_provider_stream_response::Action::PublishValuesResponse(
PublishValuesResponse {
request_id: request.request_id,
status: err
.iter()
.map(|(id, error)| (*id, proto::Error::from(error)))
.collect(),
},
),
),
},
}
}

async fn provider_stream_publish_values(
broker: &AuthorizedAccess<'_, '_>,
request: &databroker_proto::kuksa::val::v2::PublishValuesRequest,
) -> OpenProviderStreamResponse {
let publish_values_response = publish_values(broker, request).await;
OpenProviderStreamResponse {
action: Some(
open_provider_stream_response::Action::PublishValuesResponse(publish_values_response),
),
}
}

async fn get_signal_id(
signal_id: Option<proto::SignalId>,
broker: &AuthorizedAccess<'_, '_>,
Expand Down Expand Up @@ -563,9 +606,10 @@ async fn get_signal_id(

fn convert_to_proto_stream(
input: impl Stream<Item = broker::EntryUpdates>,
size: usize,
) -> impl Stream<Item = Result<proto::SubscribeResponse, tonic::Status>> {
input.map(move |item| {
let mut entries: HashMap<String, proto::Datapoint> = HashMap::new();
let mut entries: HashMap<String, proto::Datapoint> = HashMap::with_capacity(size);
for update in item.updates {
let update_datapoint: Option<proto::Datapoint> = match update.update.datapoint {
Some(datapoint) => datapoint.into(),
Expand Down Expand Up @@ -633,6 +677,124 @@ mod tests {
}
}

#[tokio::test]
async fn test_publish_value() {
let broker = DataBroker::default();
let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL);
let f = false;

let entry_id = authorized_access
.add_entry(
"test.datapoint1".to_owned(),
broker::DataType::Bool,
broker::ChangeType::OnChange,
broker::EntryType::Sensor,
"Test datapoint 1".to_owned(),
None,
None,
)
.await
.unwrap();

let request = proto::PublishValueRequest {
signal_id: Some(proto::SignalId {
signal: Some(proto::signal_id::Signal::Id(entry_id)),
}),
data_point: {
let timestamp = Some(std::time::SystemTime::now().into());

let value = proto::Value {
typed_value: Some(proto::value::TypedValue::Bool(true)),
};

Some(proto::Datapoint {
timestamp,
value_state: Some(proto::datapoint::ValueState::Value(value)),
})
},
};

// Manually insert permissions
let mut publish_value_request = tonic::Request::new(request);
publish_value_request
.extensions_mut()
.insert(permissions::ALLOW_ALL.clone());

match broker.publish_value(publish_value_request).await {
Ok(response) => {
// Handle the successful response
let publish_response = response.into_inner();

// Check if there is an error in the response
if let Some(error) = publish_response.error {
assert!(f, "Publish failed with error: {:?}", error);
} else {
println!("Publish succeeded.");
}
}
Err(status) => {
// Handle the error from the publish_value function
assert!(f, "Publish failed with status: {:?}", status);
}
}
}

#[tokio::test]
async fn test_publish_value_signal_id_not_found() {
let broker = DataBroker::default();
let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL);
let f = false;

let _entry_id = authorized_access
.add_entry(
"test.datapoint1".to_owned(),
broker::DataType::Bool,
broker::ChangeType::OnChange,
broker::EntryType::Sensor,
"Test datapoint 1".to_owned(),
None,
None,
)
.await
.unwrap();

let request = proto::PublishValueRequest {
signal_id: Some(proto::SignalId {
signal: Some(proto::signal_id::Signal::Id(1234)),
}),
data_point: {
let timestamp = Some(std::time::SystemTime::now().into());

let value = proto::Value {
typed_value: Some(proto::value::TypedValue::Bool(true)),
};

Some(proto::Datapoint {
timestamp,
value_state: Some(proto::datapoint::ValueState::Value(value)),
})
},
};

// Manually insert permissions
let mut publish_value_request = tonic::Request::new(request);
publish_value_request
.extensions_mut()
.insert(permissions::ALLOW_ALL.clone());

match broker.publish_value(publish_value_request).await {
Ok(_) => {
// Handle the successful response
assert!(f, "Should not happen!");
}
Err(status) => {
// Handle the error from the publish_value function
assert_eq!(status.code(), tonic::Code::NotFound);
assert_eq!(status.message(), "Path not found");
}
}
}

/*
Test subscribe service method
*/
Expand Down

0 comments on commit 8968f48

Please sign in to comment.