Skip to content

Commit

Permalink
Convert clear alarm message (#2162)
Browse files Browse the repository at this point in the history
Signed-off-by: Pradeep Kumar K J <[email protected]>
Co-authored-by: Pradeep Kumar K J <[email protected]>
  • Loading branch information
PradeepKiruvale and PradeepKiruvale authored Aug 18, 2023
1 parent ab57a58 commit 2643f6a
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 59 deletions.
62 changes: 34 additions & 28 deletions crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use log::error;
use serde_json::Value;
use std::collections::HashMap;
use std::convert::Infallible;
Expand All @@ -15,7 +14,7 @@ impl Converter for TedgetoTeConverter {

fn convert(&mut self, input: &Self::Input) -> Result<Vec<Self::Output>, Self::Error> {
let messages_or_err = self.try_convert(input.clone());
Ok(self.wrap_errors(messages_or_err))
Ok(messages_or_err)
}
}

Expand All @@ -24,20 +23,17 @@ impl TedgetoTeConverter {
TedgetoTeConverter {}
}

fn try_convert(
&mut self,
message: MqttMessage,
) -> Result<Vec<tedge_mqtt_ext::Message>, serde_json::Error> {
fn try_convert(&mut self, message: MqttMessage) -> Vec<tedge_mqtt_ext::Message> {
match message.topic.clone() {
topic if topic.name.starts_with("tedge/measurements") => {
Ok(self.convert_measurement(message))
self.convert_measurement(message)
}
topic if topic.name.starts_with("tedge/events") => Ok(self.convert_event(message)),
topic if topic.name.starts_with("tedge/events") => self.convert_event(message),
topic if topic.name.starts_with("tedge/alarms") => self.convert_alarm(message),
topic if topic.name.starts_with("tedge/health") => {
Ok(self.convert_health_status_message(message))
self.convert_health_status_message(message)
}
_ => Ok(vec![]),
_ => vec![],
}
}

Expand All @@ -58,10 +54,7 @@ impl TedgetoTeConverter {

// tedge/alarms/severity/alarm_type -> te/device/main///a/alarm_type, put severity in payload
// tedge/alarms/severity/alarm_type/child -> te/device/child///a/alarm_type, put severity in payload
fn convert_alarm(
&mut self,
mut message: MqttMessage,
) -> Result<Vec<MqttMessage>, serde_json::Error> {
fn convert_alarm(&mut self, mut message: MqttMessage) -> Vec<MqttMessage> {
let (te_topic, severity) = match message.topic.name.split('/').collect::<Vec<_>>()[..] {
["tedge", "alarms", severity, alarm_type] => (
Topic::new_unchecked(format!("te/device/main///a/{alarm_type}").as_str()),
Expand All @@ -71,15 +64,25 @@ impl TedgetoTeConverter {
Topic::new_unchecked(format!("te/device/{cid}///a/{alarm_type}").as_str()),
severity,
),
_ => return Ok(vec![]),
_ => return vec![],
};

let mut alarm: HashMap<String, Value> = serde_json::from_slice(message.payload.as_bytes())?;
alarm.insert("severity".to_string(), severity.into());
// if alarm payload is empty, then it's a clear alarm message. So, forward empty payload
// if the alarm payload is not empty then update the severity.
if !message.payload().is_empty() {
let res: Result<HashMap<String, Value>, serde_json::Error> =
serde_json::from_slice(message.payload.as_bytes());
if let Ok(mut alarm) = res {
alarm.insert("severity".to_string(), severity.into());
// serialize the payload after updating the severity
if let Ok(payload) = serde_json::to_string(&alarm) {
message.payload = payload.into()
}
}
}
message.topic = te_topic;
message.payload = serde_json::to_string(&alarm)?.into();
message.retain = true;
Ok(vec![message])
vec![message]
}

// tedge/events/event_type -> te/device/main///e/event_type
Expand Down Expand Up @@ -115,16 +118,19 @@ impl TedgetoTeConverter {
message.retain = true;
vec![message]
}
}
#[cfg(test)]
mod tests {
use crate::tedge_to_te_converter::converter::TedgetoTeConverter;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;

fn wrap_errors(
&self,
messages_or_err: Result<Vec<MqttMessage>, serde_json::Error>,
) -> Vec<MqttMessage> {
messages_or_err.unwrap_or_else(|error| vec![self.new_error_message(error)])
}
#[test]

fn new_error_message(&self, error: serde_json::Error) -> MqttMessage {
error!("Mapping error: {}", error);
MqttMessage::new(&Topic::new_unchecked("tedge/errors"), error.to_string())
fn convert_incoming_wrong_topic() {
let mqtt_message = MqttMessage::new(&Topic::new_unchecked("tedge///MyCustomAlarm"), "");
let mut converter = TedgetoTeConverter::new();
let res = converter.try_convert(mqtt_message);
assert!(res.is_empty())
}
}
135 changes: 115 additions & 20 deletions crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn convert_incoming_main_device_measurement_topic() -> Result<(), DynError
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate measurement MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/measurements"),
r#"{"temperature": 2500 }"#,
Expand All @@ -40,7 +40,7 @@ async fn convert_incoming_child_device_measurement_topic() -> Result<(), DynErro
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate child measurement MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/measurements/child1"),
r#"{"temperature": 2500 }"#,
Expand All @@ -51,7 +51,7 @@ async fn convert_incoming_child_device_measurement_topic() -> Result<(), DynErro
);
mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert measurement message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}
Expand All @@ -61,7 +61,7 @@ async fn convert_incoming_main_device_alarm_topic() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate alarm MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/alarms/critical/MyCustomAlarm"),
r#"{
Expand All @@ -85,8 +85,9 @@ async fn convert_incoming_main_device_alarm_topic() -> Result<(), DynError> {
}

fn same_json_over_mqtt_msg(left: &MqttMessage, right: &MqttMessage) -> bool {
let left_msg: serde_json::Value = serde_json::from_slice(left.payload.as_bytes()).unwrap();
let right_msg: serde_json::Value = serde_json::from_slice(right.payload.as_bytes()).unwrap();
let left_msg: Option<serde_json::Value> = serde_json::from_slice(left.payload.as_bytes()).ok();
let right_msg: Option<serde_json::Value> =
serde_json::from_slice(right.payload.as_bytes()).ok();

(left.topic == right.topic) && (left_msg == right_msg)
}
Expand All @@ -96,7 +97,7 @@ async fn convert_incoming_custom_main_device_alarm_topic() -> Result<(), DynErro
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate custom alarm MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/alarms/critical/MyCustomAlarm"),
r#"{
Expand All @@ -112,7 +113,101 @@ async fn convert_incoming_custom_main_device_alarm_topic() -> Result<(), DynErro

mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert alarm message
mqtt_box
.assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message])
.await;
Ok(())
}

#[tokio::test]
async fn convert_incoming_clear_alarm_message() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate clear alarm MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/alarms/critical/MyCustomAlarm"),
"",
);

let expected_mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/main///a/MyCustomAlarm"),
"",
);

mqtt_box.send(mqtt_message).await?;

// Assert mqtt message
mqtt_box
.assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message])
.await;
Ok(())
}

#[tokio::test]
async fn convert_incoming_empty_alarm_message() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate empty alarm MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/alarms/critical/MyCustomAlarm"),
r#"{}"#,
);

let expected_mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/main///a/MyCustomAlarm"),
r#"{"severity":"critical"}"#,
);

mqtt_box.send(mqtt_message).await?;

// Assert empty alarm mqtt message
mqtt_box
.assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message])
.await;
Ok(())
}

#[tokio::test]
async fn convert_incoming_empty_alarm_type() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate empty alarm type MQTT message received.
let mqtt_message = MqttMessage::new(&Topic::new_unchecked("tedge/alarms/critical/"), r#"{}"#);

let expected_mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/main///a/"),
r#"{"severity":"critical"}"#,
);

mqtt_box.send(mqtt_message).await?;

// Assert empty alarm type message
mqtt_box
.assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message])
.await;
Ok(())
}

#[tokio::test]
async fn convert_incoming_empty_severity() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate empty severity MQTT message received.
let mqtt_message = MqttMessage::new(&Topic::new_unchecked("tedge/alarms//test_type"), r#"{}"#);

let expected_mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/main///a/test_type"),
r#"{"severity":""}"#,
);

mqtt_box.send(mqtt_message).await?;

// Assert empty severity mqtt message
mqtt_box
.assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message])
.await;
Expand All @@ -124,7 +219,7 @@ async fn convert_incoming_child_device_alarm_topic() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate child device alarm MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/alarms/critical/MyCustomAlarm/child"),
r#"{
Expand All @@ -140,7 +235,7 @@ async fn convert_incoming_child_device_alarm_topic() -> Result<(), DynError> {

mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert child device alarm message
mqtt_box
.assert_received_matching(same_json_over_mqtt_msg, [expected_mqtt_message])
.await;
Expand All @@ -152,7 +247,7 @@ async fn convert_incoming_main_device_event_topic() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate main device event MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/events/MyEvent"),
r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00"}"#,
Expand All @@ -165,7 +260,7 @@ async fn convert_incoming_main_device_event_topic() -> Result<(), DynError> {

mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert event message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}
Expand All @@ -175,7 +270,7 @@ async fn convert_custom_incoming_main_device_event_topic() -> Result<(), DynErro
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate main device custom MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/events/MyEvent"),
r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00","someOtherCustomFragment":{"nested":{"value":"extra info"}}}"#,
Expand All @@ -188,7 +283,7 @@ async fn convert_custom_incoming_main_device_event_topic() -> Result<(), DynErro

mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert event mqtt message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}
Expand All @@ -198,7 +293,7 @@ async fn convert_incoming_child_device_event_topic() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate child event MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/events/MyEvent/child"),
r#"{"text":"Some test event","time":"2021-04-23T19:00:00+05:00"}"#,
Expand All @@ -211,7 +306,7 @@ async fn convert_incoming_child_device_event_topic() -> Result<(), DynError> {

mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert event mqtt message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}
Expand All @@ -223,7 +318,7 @@ async fn convert_incoming_main_device_service_health_status() -> Result<(), DynE
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate health status of main device service MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/health/myservice"),
r#"{""pid":1234,"status":"up","time":1674739912}"#,
Expand All @@ -238,7 +333,7 @@ async fn convert_incoming_main_device_service_health_status() -> Result<(), DynE

mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert health status message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}
Expand All @@ -248,7 +343,7 @@ async fn convert_incoming_child_device_service_health_status() -> Result<(), Dyn
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate SoftwareList MQTT message received.
// Simulate child device service health status MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/health/child/myservice"),
r#"{""pid":1234,"status":"up","time":1674739912}"#,
Expand All @@ -263,7 +358,7 @@ async fn convert_incoming_child_device_service_health_status() -> Result<(), Dyn

mqtt_box.send(mqtt_message).await?;

// Assert SoftwareListRequest
// Assert health status mqtt message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}
Expand Down
Loading

1 comment on commit 2643f6a

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass %
255 0 5 255 100

Please sign in to comment.