Skip to content

Commit

Permalink
Merge pull request #3050 from Bravo555/fix/3048/no-operation-status-f…
Browse files Browse the repository at this point in the history
…or-sub-workflow

fix(#3048): don't send status update for sub-workflow operations
  • Loading branch information
Bravo555 authored Aug 2, 2024
2 parents ee032a8 + dc7c336 commit 193ac91
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
54 changes: 54 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ impl OperationHandler {
return;
};

// don't process sub-workflow calls
if cmd_id.starts_with("sub:") {
return;
}

let message = OperationMessage {
operation,
entity,
Expand Down Expand Up @@ -303,6 +308,55 @@ mod tests {
assert_eq!(sut.running_operations.len(), 0);
}

#[tokio::test]
async fn handle_ignores_subcommand_topics_3048() {
let test_handle = setup_operation_handler();
let mut sut = test_handle.operation_handler;
let mqtt = test_handle.mqtt;
let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);

let mqtt_schema = sut.context.mqtt_schema.clone();

let entity_topic_id = EntityTopicId::default_main_device();
let entity_target = EntityTarget {
topic_id: entity_topic_id.clone(),
external_id: EntityExternalId::from("anything"),
smartrest_publish_topic: Topic::new("anything").unwrap(),
};

// Using a firmware operation here, but should hold for any operation type
let sub_workflow_topic = mqtt_schema.topic_for(
&entity_topic_id,
&Channel::Command {
operation: OperationType::Restart,
cmd_id: "sub:firmware_update:c8y-mapper-192481".to_string(),
},
);
let sub_workflow_message =
MqttMessage::new(&sub_workflow_topic, r#"{"status":"executing"}"#);

sut.handle(entity_target.clone(), sub_workflow_message)
.await;

assert_eq!(sut.running_operations.len(), 0);

let topic = mqtt_schema.topic_for(
&entity_topic_id,
&Channel::CommandMetadata {
operation: OperationType::Restart,
},
);
let message_wrong_channel = MqttMessage::new(&topic, []);
sut.handle(entity_target, message_wrong_channel).await;

assert_eq!(
sut.running_operations.len(),
0,
"task shouldn't be spawned for sub-workflow"
);
assert_eq!(mqtt.recv().await, None);
}

#[tokio::test]
async fn handle_joins_terminated_operations() {
let TestHandle {
Expand Down
27 changes: 27 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,33 @@ async fn mapper_processes_other_operations_while_uploads_and_downloads_are_ongoi
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "503,c8y_Restart")]).await;
}

#[tokio::test]
async fn mapper_doesnt_update_status_of_subworkflow_commands_3048() {
let ttd = TempTedgeDir::new();
let test_handle = spawn_c8y_mapper_actor(&ttd, true).await;
let TestHandle {
mqtt, mut timer, ..
} = test_handle;

// Complete sync phase so that alarm mapping starts
trigger_timeout(&mut timer).await;

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);
skip_init_messages(&mut mqtt).await;

// should hold for any operation type
mqtt.send(MqttMessage::new(
&Topic::new_unchecked(
"te/device/rpizero2-d83add42f121///cmd/restart/sub:firmware_update:c8y-mapper-192481",
),
r#"{"logPath":"/var/log/tedge/agent/workflow-firmware_update-c8y-mapper-192481.log","status":"executing"}"#,
)).await.unwrap();

while let Some(msg) = dbg!(mqtt.recv().await) {
assert_ne!(msg.payload_str().unwrap(), "501,c8y_Restart");
}
}

#[tokio::test]
async fn mapper_converts_config_metadata_to_supported_op_and_types_for_main_device() {
let ttd = TempTedgeDir::new();
Expand Down

0 comments on commit 193ac91

Please sign in to comment.