Skip to content

Commit

Permalink
fix: Ignore extra HealthStatus fields in service_monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Sep 25, 2024
1 parent bd3734e commit de6e408
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
16 changes: 8 additions & 8 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use mqtt_channel::Topic;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use serde_json::Value as JsonValue;
use std::fmt::Display;
use std::process;
use std::sync::Arc;
Expand Down Expand Up @@ -97,11 +96,16 @@ impl ServiceHealthTopic {
}
}

/// Payload of the health status message.
///
/// Contains only fields required for the payload to be considered a valid health status message.
/// Other components are free to require additional fields for their purposes.
///
/// https://thin-edge.github.io/thin-edge.io/operate/troubleshooting/monitoring-service-health/
#[derive(Deserialize, Serialize, Debug, Default)]
pub struct HealthStatus {
/// Current status of the service, synced by the mapper to the cloud
pub status: Status,
pub pid: Option<u32>,
pub time: Option<JsonValue>,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
Expand Down Expand Up @@ -146,11 +150,7 @@ impl HealthStatus {
Ok("0") => Status::Down,
_ => Status::default(),
};
HealthStatus {
status,
pid: None,
time: None,
}
HealthStatus { status }
} else {
serde_json::from_slice(message.payload()).unwrap_or_default()
};
Expand Down
22 changes: 17 additions & 5 deletions crates/core/tedge_watchdog/src/systemd_watchdog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::error::WatchdogError;
use anyhow::Context;
use freedesktop_entry_parser::parse_entry;
use futures::channel::mpsc;
Expand All @@ -8,6 +7,9 @@ use futures::StreamExt;
use mqtt_channel::MqttMessage;
use mqtt_channel::PubChannel;
use mqtt_channel::Topic;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::path::PathBuf;
use std::process;
use std::process::Command;
Expand All @@ -20,7 +22,6 @@ use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::HealthStatus;
use tedge_config::TEdgeConfigLocation;
use tedge_utils::timestamp::IsoOrUnix;
use time::OffsetDateTime;
Expand All @@ -29,6 +30,8 @@ use tracing::error;
use tracing::info;
use tracing::warn;

use crate::error::WatchdogError;

const SERVICE_NAME: &str = "tedge-watchdog";

/// How many times more often do we send notify to systemd watchdog, than is necessary from the
Expand All @@ -38,6 +41,16 @@ const SERVICE_NAME: &str = "tedge-watchdog";
/// a timing misalignment.
const NOTIFY_SEND_FREQ_RATIO: u64 = 4;

/// A subset of fields of health status payload required by the watchdog.
///
/// https://thin-edge.github.io/thin-edge.io/operate/troubleshooting/monitoring-service-health/
#[derive(Debug, Serialize, Deserialize)]
struct HealthStatusExt {
/// Used for tracking service restarts
pub pid: Option<u32>,
pub time: Option<JsonValue>,
}

pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> {
// Send ready notification to systemd.
notify_systemd(process::id(), "--ready")?;
Expand Down Expand Up @@ -128,7 +141,6 @@ async fn start_watchdog_for_tedge_services(tedge_config_dir: PathBuf) {

let tedge_config_location = tedge_config_location.clone();
watchdog_tasks.push(tokio::spawn(async move {
//
let interval = Duration::from_secs((interval / NOTIFY_SEND_FREQ_RATIO).max(1));
monitor_tedge_service(
tedge_config_location,
Expand Down Expand Up @@ -243,11 +255,11 @@ async fn monitor_tedge_service(
async fn get_latest_health_status_message(
request_timestamp: OffsetDateTime,
messages: &mut mpsc::UnboundedReceiver<MqttMessage>,
) -> Result<HealthStatus, WatchdogError> {
) -> Result<HealthStatusExt, WatchdogError> {
while let Some(message) = messages.next().await {
if let Ok(message) = message.payload_str() {
debug!("Health response received: {message}");
if let Ok(health_status) = serde_json::from_str::<HealthStatus>(message) {
if let Ok(health_status) = serde_json::from_str::<HealthStatusExt>(message) {
if health_status.time.is_none() {
error!("Ignoring invalid health response: {health_status:?} without a `time` field in it");
continue;
Expand Down
16 changes: 11 additions & 5 deletions crates/extensions/c8y_mapper_ext/src/service_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ pub fn convert_health_status_message(
return vec![];
}

let HealthStatus {
status,
pid: _,
time: _,
} = HealthStatus::try_from_health_status_message(message, mqtt_schema).unwrap();
let HealthStatus { status } =
HealthStatus::try_from_health_status_message(message, mqtt_schema).unwrap();

let display_name = entity
.other
Expand Down Expand Up @@ -83,6 +80,15 @@ mod tests {
r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#;
"service-monitoring-thin-edge-device"
)]
// If there are any problems with fields other than `status`, we want to ignore them and still send status update
#[test_case(
"test_device",
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"unrecognised_field": [42], "time": "invalid timestamp", "pid": "invalid pid", "status": "up"}"#,
"c8y/s/us",
r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#;
"service-monitoring-thin-edge-device-optional-fields-invalid"
)]
#[test_case(
"test_device",
"te/device/child/service/tedge-mapper-c8y/status/health",
Expand Down

0 comments on commit de6e408

Please sign in to comment.