From 79fb63c1cf4f5ca8fac151bac40e3e4854909a05 Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Mon, 5 Aug 2024 21:09:59 +0200 Subject: [PATCH] don't process operation status duplicates If c8y mapper receives operation message with the same status again, don't send smartrest operation status message again. It's still not clear what should be done if operation is cleared while processing or if some component publishes a previous status (e.g. c8y mapper received operation message with status SUCCESSFUL, but later received operation with the same ID with status EXECUTING). Signed-off-by: Marcel Guzik --- .../c8y_mapper_ext/src/operations/handler.rs | 354 +++++++++++++++--- .../src/operations/handlers/mod.rs | 21 +- crates/extensions/c8y_mapper_ext/src/tests.rs | 37 ++ 3 files changed, 347 insertions(+), 65 deletions(-) diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs index 41ce2f0121..d8fe6c9e3d 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -1,7 +1,6 @@ use super::handlers::EntityTarget; use super::handlers::OperationContext; use super::handlers::OperationMessage; -use super::handlers::UpdateStatus; use crate::actor::IdDownloadRequest; use crate::actor::IdDownloadResult; use crate::actor::IdUploadRequest; @@ -11,6 +10,7 @@ use crate::Capabilities; use c8y_api::http_proxy::C8yEndPoint; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use tedge_actors::ClientMessageBox; @@ -19,8 +19,11 @@ use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::ChannelFilter; use tedge_api::mqtt_topics::EntityFilter; use tedge_api::mqtt_topics::IdGenerator; +use tedge_api::workflow::GenericCommandState; use tedge_mqtt_ext::MqttMessage; -use tokio::task::JoinError; +use tracing::debug; +use tracing::error; +use tracing::warn; /// Handles operations. /// @@ -138,23 +141,74 @@ impl OperationHandler { message, }; - let topic: Arc = message.message.topic.name.clone().into(); + let topic = Arc::from(message.message.topic.name.as_str()); - let running_operation = self.running_operations.remove(&topic); + let status = match GenericCommandState::from_command_message(&message.message) { + Ok(command) if command.is_cleared() => None, + Ok(command) => Some(command.status), + Err(err) => { + error!(%err, ?message, "could not parse command payload"); + return; + } + }; - let running_operation = - running_operation.unwrap_or_else(|| RunningOperation::spawn(Arc::clone(&self.context))); + let current_operation = self.running_operations.entry(topic); - let operation_status = running_operation - .update(message) - .await - .expect("operation task should not panic"); + match current_operation { + Entry::Vacant(entry) => { + let Some(status) = status else { + debug!(topic = %entry.key(), "unexpected clearing message"); + return; + }; + + let context = Arc::clone(&self.context); + let handle = tokio::spawn(async move { context.update(message).await }); + + let running_operation = RunningOperation { handle, status }; + + entry.insert(running_operation); + } + + Entry::Occupied(entry) => { + let previous_status = entry.get().status.as_str(); + if status.as_ref().is_some_and(|s| *s == previous_status) { + debug!( + "already handling operation message with this topic and status, ignoring" + ); + return; + } - match operation_status { - OperationStatus::Ongoing(operation) => { - self.running_operations.insert(topic, operation); + // if handling a clearing message, wait for a task to finish + let Some(status) = status else { + let operation = entry.remove(); + operation + .handle + .await + .expect("operation task should not panic"); + return; + }; + + // we got a new status, check if it's not invalid and then await previous one and + // handle the new one + if !is_operation_status_transition_valid(previous_status, &status) { + warn!( + topic = %entry.key(), + previous = previous_status, + next = status, + "attempted invalid status transition, ignoring" + ); + return; + } + + let (key, operation) = entry.remove_entry(); + let context = Arc::clone(&self.context); + let handle = tokio::spawn(async move { + operation.handle.await.unwrap(); + context.update(message).await; + }); + let running_operation = RunningOperation { handle, status }; + self.running_operations.insert(key, running_operation); } - OperationStatus::Terminated => {} } } @@ -206,51 +260,27 @@ impl OperationHandler { topics } } - struct RunningOperation { handle: tokio::task::JoinHandle<()>, - tx: tokio::sync::mpsc::UnboundedSender, + status: String, } -impl RunningOperation { - /// Spawns a task that handles the operation. - /// - /// The task handles a single operation with a given command id, and via a channel it receives - /// operation state changes (if any) to drive an operation to completion. - fn spawn(operation: Arc) -> Self { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - - let handle = tokio::spawn(async move { - while let Some(message) = rx.recv().await { - if let UpdateStatus::Terminated = operation.update(message).await { - break; - } - } - }); +// TODO: logic of which status transitions are valid should be defined in tedge_api and be +// considered together with custom statuses of custom workflows +fn is_operation_status_transition_valid(previous: &str, next: &str) -> bool { + #[allow(clippy::match_like_matches_macro)] + match (previous, next) { + // not really a transition but false to make sure we're not sending multiple smartrest msgs + (prev, next) if prev == next => false, - Self { handle, tx } - } + // successful and failed are terminal, can't change them + ("successful", _) => false, + ("failed", _) => false, - /// Updates the operation with new state. - /// - /// Can keep an operation running, or terminate it. Returns an error if operation panicked. - async fn update(self, message: OperationMessage) -> Result { - let send_result = self.tx.send(message); - - if send_result.is_err() { - self.handle.await?; - Ok(OperationStatus::Terminated) - } else { - Ok(OperationStatus::Ongoing(self)) - } + _ => true, } } -enum OperationStatus { - Ongoing(RunningOperation), - Terminated, -} - #[cfg(test)] mod tests { use super::*; @@ -472,6 +502,230 @@ mod tests { assert_eq!(sut.running_operations.len(), 0); } + #[tokio::test] + async fn ignores_malformed_command_payloads() { + let TestHandle { + operation_handler: mut sut, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let command_topic = mqtt_schema.topic_for( + &entity_topic_id, + &Channel::Command { + operation: OperationType::ConfigSnapshot, + cmd_id: "config-snapshot-1".to_string(), + }, + ); + + let invalid_command_message = MqttMessage::new(&command_topic, "invalid command payload"); + + sut.handle(entity_target, invalid_command_message).await; + + assert!(!sut + .running_operations + .contains_key(command_topic.name.as_str())); + } + + #[tokio::test] + async fn ignores_unexpected_clearing_messages() { + let TestHandle { + operation_handler: mut sut, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Executing, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + let clearing_message = config_snapshot_operation.clearing_message(&mqtt_schema); + let clearing_message_topic = clearing_message.topic.name.clone(); + + sut.handle(entity_target, clearing_message).await; + + assert!(!sut + .running_operations + .contains_key(clearing_message_topic.as_str())); + } + + #[tokio::test] + async fn shouldnt_process_duplicate_messages() { + let TestHandle { + operation_handler: mut sut, + downloader: dl, + uploader: ul, + mqtt, + c8y_proxy, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + let _dl = dl.with_timeout(TEST_TIMEOUT_MS); + let _ul = ul.with_timeout(TEST_TIMEOUT_MS); + let _c8y_proxy = c8y_proxy.with_timeout(TEST_TIMEOUT_MS); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let config_snapshot_operation = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Executing, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + // check that if the same message is handled 3 times by mistake, we don't call process it multiple times + for _ in 0..3 { + sut.handle( + entity_target.clone(), + config_snapshot_operation.command_message(&mqtt_schema), + ) + .await; + } + + let smartrest_executing_message = mqtt.recv().await.unwrap(); + assert_eq!( + smartrest_executing_message.payload_str().unwrap(), + "501,c8y_UploadConfigFile" + ); + + assert_eq!( + mqtt.recv().await, + None, + "shouldn't receive duplicates of EXECUTING message" + ) + } + + #[tokio::test] + async fn shouldnt_process_invalid_status_transitions() { + tedge_config::system_services::set_log_level(tracing::Level::DEBUG); + let TestHandle { + operation_handler: mut sut, + ttd: _ttd, + .. + } = setup_operation_handler(); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + let failed_message = ConfigSnapshotCmd { + target: entity_topic_id.clone(), + cmd_id: "config-snapshot-1".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Failed { + reason: "test".to_string(), + }, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + let successful_message = ConfigSnapshotCmd { + target: entity_topic_id, + cmd_id: "config-snapshot-2".to_string(), + payload: ConfigSnapshotCmdPayload { + status: CommandStatus::Successful, + tedge_url: Some("asdf".to_string()), + config_type: "typeA".to_string(), + path: None, + log_path: None, + }, + }; + + let failed_message_mqtt = failed_message.command_message(&mqtt_schema); + let failed_topic = failed_message_mqtt.topic.name.as_str(); + sut.handle(entity_target.clone(), failed_message_mqtt.clone()) + .await; + assert_eq!( + &sut.running_operations.get(failed_topic).unwrap().status, + "failed" + ); + + let successful_message_mqtt = successful_message.command_message(&mqtt_schema); + let successful_topic = successful_message_mqtt.topic.name.as_str(); + sut.handle(entity_target.clone(), successful_message_mqtt.clone()) + .await; + assert_eq!( + &sut.running_operations + .get(successful_message_mqtt.topic.name.as_str()) + .unwrap() + .status, + "successful" + ); + + // status shouldn't change from successful/failed to executing + let executing_message = failed_message.with_status(CommandStatus::Executing); + sut.handle( + entity_target.clone(), + executing_message.command_message(&mqtt_schema), + ) + .await; + assert_eq!( + &sut.running_operations + .get(dbg!(failed_topic)) + .unwrap() + .status, + "failed" + ); + + let executing_message = successful_message.with_status(CommandStatus::Executing); + sut.handle( + entity_target.clone(), + executing_message.command_message(&mqtt_schema), + ) + .await; + assert_eq!( + &sut.running_operations.get(successful_topic).unwrap().status, + "successful" + ); + } + #[tokio::test] #[should_panic] async fn handle_should_panic_when_background_task_panics() { diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs index c16795f215..5bf7121dd2 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handlers/mod.rs @@ -61,7 +61,7 @@ pub(super) struct OperationContext { } impl OperationContext { - pub async fn update(&self, message: OperationMessage) -> UpdateStatus { + pub async fn update(&self, message: OperationMessage) { let OperationMessage { entity, cmd_id, @@ -74,7 +74,7 @@ impl OperationContext { Ok(command) => command, Err(err) => { error!(%err, ?message, "could not parse command payload"); - return UpdateStatus::Terminated; + return; } }; @@ -109,11 +109,11 @@ impl OperationContext { } } // command is not yet finished, avoid clearing the command topic - Ok(_) => return UpdateStatus::Ongoing, + Ok(_) => return, } clear_command_topic(command, &mut mqtt_publisher).await; - return UpdateStatus::Terminated; + return; } OperationType::SoftwareUpdate => { self.publish_software_update_status(&entity, &cmd_id, &message) @@ -152,7 +152,7 @@ impl OperationContext { c8y_operation, &entity.smartrest_publish_topic, ) { - OperationOutcome::Ignored => UpdateStatus::Ongoing, + OperationOutcome::Ignored => {} OperationOutcome::Executing { mut extra_messages } => { let c8y_state_executing_payload = set_operation_executing(c8y_operation); let c8y_state_executing_message = @@ -164,8 +164,6 @@ impl OperationContext { for message in messages { mqtt_publisher.send(message).await.unwrap(); } - - UpdateStatus::Ongoing } OperationOutcome::Finished { messages } => { if let Err(e) = self @@ -180,19 +178,11 @@ impl OperationContext { } clear_command_topic(command, &mut mqtt_publisher).await; - - UpdateStatus::Terminated } } } } -/// Whether or not this operation requires more messages to be handled or is it terminated. -pub enum UpdateStatus { - Ongoing, - Terminated, -} - async fn clear_command_topic( command: GenericCommandState, mqtt_publisher: &mut LoggingSender, @@ -276,6 +266,7 @@ fn to_c8y_operation(operation_type: &OperationType) -> Option