diff --git a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs index 1850f2c51d..a8dd1fad55 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/handler.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/handler.rs @@ -126,6 +126,11 @@ impl OperationHandler { return; }; + // don't process sub-workflow calls + if cmd_id.starts_with("sub:") { + return; + } + let message = OperationMessage { operation, entity, @@ -303,6 +308,55 @@ mod tests { assert_eq!(sut.running_operations.len(), 0); } + #[tokio::test] + async fn handle_ignores_subcommand_topics_3048() { + let test_handle = setup_operation_handler(); + let mut sut = test_handle.operation_handler; + let mqtt = test_handle.mqtt; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + let mqtt_schema = sut.context.mqtt_schema.clone(); + + let entity_topic_id = EntityTopicId::default_main_device(); + let entity_target = EntityTarget { + topic_id: entity_topic_id.clone(), + external_id: EntityExternalId::from("anything"), + smartrest_publish_topic: Topic::new("anything").unwrap(), + }; + + // Using a firmware operation here, but should hold for any operation type + let sub_workflow_topic = mqtt_schema.topic_for( + &entity_topic_id, + &Channel::Command { + operation: OperationType::Restart, + cmd_id: "sub:firmware_update:c8y-mapper-192481".to_string(), + }, + ); + let sub_workflow_message = + MqttMessage::new(&sub_workflow_topic, r#"{"status":"executing"}"#); + + sut.handle(entity_target.clone(), sub_workflow_message) + .await; + + assert_eq!(sut.running_operations.len(), 0); + + let topic = mqtt_schema.topic_for( + &entity_topic_id, + &Channel::CommandMetadata { + operation: OperationType::Restart, + }, + ); + let message_wrong_channel = MqttMessage::new(&topic, []); + sut.handle(entity_target, message_wrong_channel).await; + + assert_eq!( + sut.running_operations.len(), + 0, + "task shouldn't be spawned for sub-workflow" + ); + assert_eq!(mqtt.recv().await, None); + } + #[tokio::test] async fn handle_joins_terminated_operations() { let TestHandle { diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index daef388105..f954d4409b 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2452,6 +2452,33 @@ async fn mapper_processes_other_operations_while_uploads_and_downloads_are_ongoi assert_received_contains_str(&mut mqtt, [("c8y/s/us", "503,c8y_Restart")]).await; } +#[tokio::test] +async fn mapper_doesnt_update_status_of_subworkflow_commands_3048() { + let ttd = TempTedgeDir::new(); + let test_handle = spawn_c8y_mapper_actor(&ttd, true).await; + let TestHandle { + mqtt, mut timer, .. + } = test_handle; + + // Complete sync phase so that alarm mapping starts + trigger_timeout(&mut timer).await; + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + skip_init_messages(&mut mqtt).await; + + // should hold for any operation type + mqtt.send(MqttMessage::new( + &Topic::new_unchecked( + "te/device/rpizero2-d83add42f121///cmd/restart/sub:firmware_update:c8y-mapper-192481", + ), + r#"{"logPath":"/var/log/tedge/agent/workflow-firmware_update-c8y-mapper-192481.log","status":"executing"}"#, + )).await.unwrap(); + + while let Some(msg) = dbg!(mqtt.recv().await) { + assert_ne!(msg.payload_str().unwrap(), "501,c8y_Restart"); + } +} + #[tokio::test] async fn mapper_converts_config_metadata_to_supported_op_and_types_for_main_device() { let ttd = TempTedgeDir::new();