Skip to content

Commit

Permalink
Merge pull request #2919 from albinsuresh/imp/cleanup-health-check
Browse files Browse the repository at this point in the history
refactor: Refactor health status code
  • Loading branch information
albinsuresh authored Jun 7, 2024
2 parents 0d49756 + 4914cff commit 4861be8
Show file tree
Hide file tree
Showing 16 changed files with 290 additions and 185 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 0 additions & 55 deletions crates/core/c8y_api/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,3 @@
pub mod bridge {
use mqtt_channel::MqttMessage;
use tedge_api::main_device_health_topic;
use tedge_api::MQTT_BRIDGE_DOWN_PAYLOAD;
use tedge_api::MQTT_BRIDGE_UP_PAYLOAD;

pub fn is_c8y_bridge_established(message: &MqttMessage, service: &str) -> bool {
let c8y_bridge_health_topic = main_device_health_topic(service);
match message.payload_str() {
Ok(payload) => {
message.topic.name == c8y_bridge_health_topic
&& (payload == MQTT_BRIDGE_UP_PAYLOAD
|| payload == MQTT_BRIDGE_DOWN_PAYLOAD
|| is_valid_status_payload(payload))
}
Err(_err) => false,
}
}

#[derive(serde::Deserialize)]
struct HealthStatus<'a> {
status: &'a str,
}

fn is_valid_status_payload(payload: &str) -> bool {
serde_json::from_str::<HealthStatus>(payload)
.map_or(false, |h| h.status == "up" || h.status == "down")
}
}

pub mod child_device {
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
Expand All @@ -40,28 +10,3 @@ pub mod child_device {
)
}
}

#[cfg(test)]
mod tests {
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use test_case::test_case;

use crate::utils::bridge::is_c8y_bridge_established;

const C8Y_BRIDGE_HEALTH_TOPIC: &str =
"te/device/main/service/tedge-mapper-bridge-c8y/status/health";

#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "1", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "0", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "bad payload", false)]
#[test_case("tedge/not/health/topic", "1", false)]
#[test_case("tedge/not/health/topic", "0", false)]
fn test_bridge_is_established(topic: &str, payload: &str, expected: bool) {
let topic = Topic::new(topic).unwrap();
let message = MqttMessage::new(&topic, payload);

let actual = is_c8y_bridge_established(&message, "tedge-mapper-bridge-c8y");
assert_eq!(actual, expected);
}
}
172 changes: 163 additions & 9 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use crate::mqtt_topics::Channel;
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::ServiceTopicId;
use clock::Clock;
use clock::WallClock;
use log::error;
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use serde_json::Value as JsonValue;
use std::fmt::Display;
use std::process;
use std::sync::Arc;
use tedge_utils::timestamp::TimeFormat;

pub const MQTT_BRIDGE_UP_PAYLOAD: &str = "1";
pub const MQTT_BRIDGE_DOWN_PAYLOAD: &str = "0";
pub const UP_STATUS: &str = "up";
pub const DOWN_STATUS: &str = "down";
pub const UNKNOWN_STATUS: &str = "unknown";

// FIXME: doesn't account for custom topic root, use MQTT scheme API here
pub fn main_device_health_topic(service: &str) -> String {
format!("te/device/main/service/{service}/status/health")
pub fn service_health_topic(
mqtt_schema: &MqttSchema,
device_topic_id: &EntityTopicId,
service: &str,
) -> Topic {
mqtt_schema.topic_for(
&device_topic_id.default_service_for_device(service).unwrap(),
&Channel::Health,
)
}

/// Encodes a valid health topic.
Expand Down Expand Up @@ -92,14 +97,163 @@ impl ServiceHealthTopic {
}
}

#[derive(Deserialize, Serialize, Debug, Default)]
pub struct HealthStatus {
pub status: Status,
pub pid: Option<u32>,
pub time: Option<JsonValue>,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Status {
Up,
Down,
#[serde(untagged)]
Other(String),
}

impl Default for Status {
fn default() -> Self {
Status::Other("unknown".to_string())
}
}

impl Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = match self {
Status::Up => "up",
Status::Down => "down",
Status::Other(val) if val.is_empty() => "unknown",
Status::Other(val) => val,
};
write!(f, "{}", status)
}
}

#[derive(Debug)]
pub struct HealthTopicError;

impl HealthStatus {
pub fn try_from_health_status_message(
message: &MqttMessage,
mqtt_schema: &MqttSchema,
) -> Result<Self, HealthTopicError> {
if let Ok((topic_id, Channel::Health)) = mqtt_schema.entity_channel_of(&message.topic) {
let health_status = if entity_is_mosquitto_bridge_service(&topic_id) {
let status = match message.payload_str() {
Ok("1") => Status::Up,
Ok("0") => Status::Down,
_ => Status::default(),
};
HealthStatus {
status,
pid: None,
time: None,
}
} else {
serde_json::from_slice(message.payload()).unwrap_or_default()
};
Ok(health_status)
} else {
Err(HealthTopicError)
}
}

pub fn is_valid(&self) -> bool {
self.status == Status::Up || self.status == Status::Down
}
}

pub fn entity_is_mosquitto_bridge_service(entity_topic_id: &EntityTopicId) -> bool {
entity_topic_id
.default_service_name()
.filter(|name| name.starts_with("mosquitto-") && name.ends_with("-bridge"))
.is_some()
}

#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use serde_json::Value;
use test_case::test_case;

#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"up"}"#,
Status::Up;
"service-health-status-up"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"down"}"#,
Status::Down;
"service-health-status-down"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"foo"}"#,
Status::Other("foo".into());
"service-health-status-other-value"
)]
#[test_case(
"te/device/child/service/tedge-mapper-c8y/status/health",
r#"{"pid":1234,"status":"up"}"#,
Status::Up;
"service-health-status-with-extra-fields"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"pid":"123456"}"#,
Status::Other("unknown".into());
"service-health-status-no-value"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":""}"#,
Status::Other("".into());
"service-health-status-empty-value"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
"{}",
Status::default();
"service-health-status-empty-message"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"1",
Status::Up;
"mosquitto-bridge-service-health-status-up"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"0",
Status::Down;
"mosquitto-bridge-service-health-status-down"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"invalid payload",
Status::default();
"mosquitto-bridge-service-health-status-invalid-payload"
)]
#[test_case(
"te/device/main/service/tedge-mapper-bridge-c8y/status/health",
r#"{"status":"up"}"#,
Status::Up;
"builtin-bridge-service-health-status-up"
)]
fn parse_heath_status(health_topic: &str, health_payload: &str, expected_status: Status) {
let mqtt_schema = MqttSchema::new();
let topic = Topic::new_unchecked(health_topic);
let health_message = MqttMessage::new(&topic, health_payload.as_bytes().to_owned());

let health_status =
HealthStatus::try_from_health_status_message(&health_message, &mqtt_schema);
assert_eq!(health_status.unwrap().status, expected_status);
}

#[test]
fn is_rfc3339_timestamp() {
Expand Down
6 changes: 0 additions & 6 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,6 @@ impl EntityTopicId {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

// FIXME: can also match "device/bridge//" or "/device/main/service/my_custom_bridge"
// should match ONLY the single mapper bridge
pub fn is_bridge_health_topic(&self) -> bool {
self.as_str().contains("bridge")
}
}

/// Contains a topic id of the service itself and the associated device.
Expand Down
16 changes: 14 additions & 2 deletions crates/core/tedge_mapper/src/aws/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use async_trait::async_trait;
use aws_mapper_ext::converter::AwsConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::str::FromStr;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::service_health_topic;
use tedge_config::TEdgeConfig;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AWS_MAPPER_NAME: &str = "tedge-mapper-aws";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-aws";

pub struct AwsMapper;

Expand All @@ -32,8 +36,12 @@ impl TEdgeComponent for AwsMapper {
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;

let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
if tedge_config.mqtt.bridge.built_in {
let device_id = tedge_config.device.id.try_read(&tedge_config)?;
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let rules = built_in_bridge_rules(device_id)?;

let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
Expand All @@ -47,17 +55,21 @@ impl TEdgeComponent for AwsMapper {
&tedge_config.aws.root_cert_path,
&tedge_config,
)?;

let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
"tedge-mapper-bridge-aws".to_owned(),
BUILT_IN_BRIDGE_NAME,
&health_topic,
rules,
cloud_config,
)
.await;
runtime.spawn(bridge_actor).await?;
}
let clock = Box::new(WallClock);
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
let aws_converter = AwsConverter::new(
tedge_config.aws.mapper.timestamp,
clock,
Expand Down
14 changes: 13 additions & 1 deletion crates/core/tedge_mapper/src/az/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use async_trait::async_trait;
use az_mapper_ext::converter::AzureConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::str::FromStr;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::service_health_topic;
use tedge_config::TEdgeConfig;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-az";

pub struct AzureMapper;

Expand All @@ -32,7 +36,11 @@ impl TEdgeComponent for AzureMapper {
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());

if tedge_config.mqtt.bridge.built_in {
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let remote_clientid = tedge_config.device.id.try_read(&tedge_config)?;
let rules = built_in_bridge_rules(remote_clientid)?;

Expand All @@ -55,9 +63,13 @@ impl TEdgeComponent for AzureMapper {
&tedge_config,
)?;

let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
"tedge-mapper-bridge-az".to_owned(),
BUILT_IN_BRIDGE_NAME,
&health_topic,
rules,
cloud_config,
)
Expand Down
Loading

0 comments on commit 4861be8

Please sign in to comment.