From 3b789599bf1c19c465de863e80ec7a8d33ba6801 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 8 Oct 2024 21:26:31 +0200 Subject: [PATCH 01/16] Attach versions to operation workflows and commands Signed-off-by: Didier Wenzek --- .../src/operation_workflows/mod.rs | 15 +++- crates/core/tedge_api/src/workflow/error.rs | 3 + crates/core/tedge_api/src/workflow/mod.rs | 1 + crates/core/tedge_api/src/workflow/state.rs | 14 +++- .../core/tedge_api/src/workflow/supervisor.rs | 77 ++++++++++++++----- .../workflows/download-command-expected.log | 2 +- .../workflows/gp-command-expected.log | 4 +- .../workflows/super-command-expected.log | 2 +- 8 files changed, 91 insertions(+), 27 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/mod.rs b/crates/core/tedge_agent/src/operation_workflows/mod.rs index 2c460b6f6c3..92f86e2ada1 100644 --- a/crates/core/tedge_agent/src/operation_workflows/mod.rs +++ b/crates/core/tedge_agent/src/operation_workflows/mod.rs @@ -6,6 +6,7 @@ use std::path::Path; use tedge_api::workflow::IllFormedOperationWorkflow; use tedge_api::workflow::OperationWorkflow; use tedge_api::workflow::WorkflowSupervisor; +use tedge_api::workflow::WorkflowVersion; use tracing::info; mod actor; @@ -28,8 +29,9 @@ pub async fn load_operation_workflows( if file.extension() == Some(OsStr::new("toml")) { match read_operation_workflow(&file) .await - .and_then(|workflow| load_operation_workflow(&mut workflows, workflow)) - { + .and_then(|(workflow, version)| { + load_operation_workflow(&mut workflows, workflow, version) + }) { Ok(cmd) => { info!( "Using operation workflow definition from {file:?} for '{cmd}' operation" @@ -44,9 +46,12 @@ pub async fn load_operation_workflows( Ok(workflows) } -async fn read_operation_workflow(path: &Path) -> Result { +async fn read_operation_workflow( + path: &Path, +) -> Result<(OperationWorkflow, WorkflowVersion), anyhow::Error> { let bytes = tokio::fs::read(path).await.context("Fail to read file")?; let input = std::str::from_utf8(&bytes).context("Fail to extract UTF8 content")?; + let version = sha256::digest(input); toml::from_str::(input) .context("Fail to parse TOML") @@ -58,13 +63,15 @@ async fn read_operation_workflow(path: &Path) -> Result Result { let name = workflow.operation.to_string(); - workflows.register_custom_workflow(workflow)?; + workflows.register_custom_workflow(workflow, version)?; Ok(name) } diff --git a/crates/core/tedge_api/src/workflow/error.rs b/crates/core/tedge_api/src/workflow/error.rs index c8545fe54ee..d85bbe4a1c8 100644 --- a/crates/core/tedge_api/src/workflow/error.rs +++ b/crates/core/tedge_api/src/workflow/error.rs @@ -82,6 +82,9 @@ pub enum WorkflowExecutionError { #[error("No workflow is defined for the operation: {operation}")] UnknownOperation { operation: String }, + #[error("Unknown version for the {operation} operation: {version}")] + UnknownVersion { operation: String, version: String }, + #[error("No command has been initiated on the command topic: {topic}")] UnknownRequest { topic: String }, diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index fdfa01658cc..9a2d921d232 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -26,6 +26,7 @@ pub type OperationName = String; pub type StateName = String; pub type CommandId = String; pub type JsonPath = String; +pub type WorkflowVersion = String; /// An OperationWorkflow defines the state machine that rules an operation #[derive(Clone, Debug, Deserialize)] diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index ccfd7fdd4c0..ed12c54862e 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -19,7 +19,8 @@ use serde_json::Value; use std::collections::HashMap; use std::fmt::Display; -pub const OP_LOG_PATH_KEY: &str = "logPath"; +const OP_LOG_PATH_KEY: &str = "logPath"; +const OP_WORKFLOW_VERSION_KEY: &str = "@version"; #[derive(Clone, Debug, Eq, PartialEq)] pub enum GenericCommandData { @@ -197,6 +198,17 @@ impl GenericCommandState { self.update_with_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str()) } + pub fn workflow_version(&self) -> Option { + self.payload + .get(OP_WORKFLOW_VERSION_KEY) + .and_then(|val| val.as_str()) + .map(|str| str.to_string()) + } + + pub fn set_workflow_version(self, version: &str) -> Self { + self.update_with_key_value(OP_WORKFLOW_VERSION_KEY, version) + } + /// Update the command state with the outcome of a script pub fn update_with_script_output( self, diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 2d183171701..3154fe2d11d 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -7,42 +7,56 @@ use serde::Serialize; #[derive(Default)] pub struct WorkflowSupervisor { /// The user-defined operation workflow definitions - workflows: HashMap, + workflows: HashMap, /// Operation instances under execution commands: CommandBoard, } +struct WorkflowVersions { + current: WorkflowVersion, + workflows: HashMap, +} + impl WorkflowSupervisor { /// Register a builtin workflow provided by thin-edge pub fn register_builtin_workflow( &mut self, operation: OperationType, ) -> Result<(), WorkflowRegistrationError> { - self.register_custom_workflow(OperationWorkflow::built_in(operation)) + self.register_custom_workflow( + OperationWorkflow::built_in(operation), + "builtin".to_string(), + ) } /// Register a user-defined workflow pub fn register_custom_workflow( &mut self, workflow: OperationWorkflow, + version: WorkflowVersion, ) -> Result<(), WorkflowRegistrationError> { - if let Some(previous) = self.workflows.get(&workflow.operation) { - if previous.built_in == workflow.built_in { - return Err(WorkflowRegistrationError::DuplicatedWorkflow { - operation: workflow.operation.to_string(), - }); + let operation = workflow.operation.clone(); + if let Some(versions) = self.workflows.get_mut(&operation) { + if version == versions.current || versions.workflows.contains_key(&version) { + // Already registered + return Ok(()); } - info!( - "The built-in {} operation has been customized", - workflow.operation - ); if workflow.built_in { + info!("The built-in {operation} operation has been customized",); return Ok(()); } + + versions.workflows.insert(version.clone(), workflow); + versions.current = version + } else { + let versions = WorkflowVersions { + current: version.clone(), + workflows: HashMap::from([(version, workflow)]), + }; + self.workflows.insert(operation, versions); } - self.workflows.insert(workflow.operation.clone(), workflow); Ok(()) } @@ -67,7 +81,11 @@ impl WorkflowSupervisor { target: &EntityTopicId, ) -> Vec { // To ease testing the capability messages are emitted in a deterministic order - let mut operations = self.workflows.values().collect::>(); + let mut operations = self + .workflows + .values() + .filter_map(|versions| versions.current_workflow()) + .collect::>(); operations.sort_by(|&a, &b| a.operation.to_string().cmp(&b.operation.to_string())); operations .iter() @@ -83,7 +101,7 @@ impl WorkflowSupervisor { operation: &OperationType, command_state: GenericCommandState, ) -> Result, WorkflowExecutionError> { - if !self.workflows.contains_key(operation) { + let Some(workflow_versions) = self.workflows.get(operation) else { return Err(WorkflowExecutionError::UnknownOperation { operation: operation.to_string(), }); @@ -94,6 +112,7 @@ impl WorkflowSupervisor { Ok(Some(command_state)) } else if command_state.is_init() { // This is a new command request + let command_state = command_state.set_workflow_version(&workflow_versions.current); self.commands.insert(command_state.clone())?; Ok(Some(command_state)) } else { @@ -118,11 +137,13 @@ impl WorkflowSupervisor { }); }; + let version = command_state.workflow_version(); self.workflows .get(&operation_name.as_str().into()) .ok_or(WorkflowExecutionError::UnknownOperation { - operation: operation_name, + operation: operation_name.clone(), }) + .and_then(|versions| versions.get(&operation_name, version.as_ref())) .and_then(|workflow| workflow.get_action(command_state)) } @@ -229,6 +250,26 @@ impl WorkflowSupervisor { } } +impl WorkflowVersions { + fn get( + &self, + operation: &OperationName, + version: Option<&WorkflowVersion>, + ) -> Result<&OperationWorkflow, WorkflowExecutionError> { + let version = version.unwrap_or(&self.current); + self.workflows + .get(version) + .ok_or(WorkflowExecutionError::UnknownVersion { + operation: operation.clone(), + version: version.to_string(), + }) + } + + fn current_workflow(&self) -> Option<&OperationWorkflow> { + self.workflows.get(&self.current) + } +} + /// A view of all the operation instances under execution. #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(try_from = "OnDiskCommandBoard", into = "OnDiskCommandBoard")] @@ -340,7 +381,7 @@ mod tests { // Start a level_1 operation let level_1_cmd = GenericCommandState::from_command_message(&MqttMessage::new( &Topic::new_unchecked("te/device/foo///cmd/level_1/id_1"), - r#"{ "status":"init" }"#, + r#"{ "@version": "builtin", "status":"init" }"#, )) .unwrap(); workflows @@ -356,7 +397,7 @@ mod tests { // Start a level_2 operation, sub-command of the previous level_1 command let level_2_cmd = GenericCommandState::from_command_message(&MqttMessage::new( &Topic::new_unchecked("te/device/foo///cmd/level_2/sub:level_1:id_1"), - r#"{ "status":"init" }"#, + r#"{ "@version": "builtin", "status":"init" }"#, )) .unwrap(); workflows @@ -377,7 +418,7 @@ mod tests { // Start a level_3 operation, sub-command of the previous level_2 command let level_3_cmd = GenericCommandState::from_command_message(&MqttMessage::new( &Topic::new_unchecked("te/device/foo///cmd/level_3/sub:level_2:sub:level_1:id_1"), - r#"{ "status":"init" }"#, + r#"{ "@version": "builtin", "status":"init" }"#, )) .unwrap(); workflows diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/download-command-expected.log b/tests/RobotFramework/tests/tedge_agent/workflows/download-command-expected.log index 95f3f45e402..84f965aa7be 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/download-command-expected.log +++ b/tests/RobotFramework/tests/tedge_agent/workflows/download-command-expected.log @@ -1,3 +1,3 @@ schedule download command target=device/main// launch download url=https://from/there file=/put/it/here -check download command outcome={"payload":{"file":"/put/it/here","logPath":"/var/log/tedge/agent/workflow-download-robot-123.log","status":"downloaded","tmp":"/tmp/download/robot-123","url":"https://from/there"},"topic":"te/device/main///cmd/download/robot-123"} +check download command outcome={"payload":{"@version":"cacd2014145f08691896472617fbe6d800a5c970b2d6133ab691f06c423c61d7","file":"/put/it/here","logPath":"/var/log/tedge/agent/workflow-download-robot-123.log","status":"downloaded","tmp":"/tmp/download/robot-123","url":"https://from/there"},"topic":"te/device/main///cmd/download/robot-123"} diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/gp-command-expected.log b/tests/RobotFramework/tests/tedge_agent/workflows/gp-command-expected.log index abaf052ff2b..2d766b24166 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/gp-command-expected.log +++ b/tests/RobotFramework/tests/tedge_agent/workflows/gp-command-expected.log @@ -1,2 +1,2 @@ -{"payload":{"logPath":"/var/log/tedge/agent/workflow-gp_command-test-sub-sub.log","output_file":"/tmp/test-sub-sub.json","status":"dump_payload","x_ter":"some x value","y":"some y value","y_ter":"some y value"},"topic":"te/device/main///cmd/super_command/sub:gp_command:test-sub-sub"} -{"payload":{"logPath":"/var/log/tedge/agent/workflow-gp_command-test-sub-sub.log","output_file":"/tmp/test-sub-sub.json","status":"dump_payload","sub_operation":"super_command","x":"some x value","y":"some y value"},"topic":"te/device/main///cmd/gp_command/test-sub-sub"} +{"payload":{"@version":"2753dc8ea41e86c556c3cf841fdcaa8a71a2c843a013ce293310ed54eab6d40c","logPath":"/var/log/tedge/agent/workflow-gp_command-test-sub-sub.log","output_file":"/tmp/test-sub-sub.json","status":"dump_payload","x_ter":"some x value","y":"some y value","y_ter":"some y value"},"topic":"te/device/main///cmd/super_command/sub:gp_command:test-sub-sub"} +{"payload":{"@version":"c87c09ac241f9f146c90a9d1c5c9ea037004f4af35d9c05d59dd193acaf0ed92","logPath":"/var/log/tedge/agent/workflow-gp_command-test-sub-sub.log","output_file":"/tmp/test-sub-sub.json","status":"dump_payload","sub_operation":"super_command","x":"some x value","y":"some y value"},"topic":"te/device/main///cmd/gp_command/test-sub-sub"} diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/super-command-expected.log b/tests/RobotFramework/tests/tedge_agent/workflows/super-command-expected.log index 948a3d3f30a..3cca89cc83f 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/super-command-expected.log +++ b/tests/RobotFramework/tests/tedge_agent/workflows/super-command-expected.log @@ -1 +1 @@ -{"payload":{"logPath":"/var/log/tedge/agent/workflow-super_command-test-42.log","output_file":"/tmp/test-42.json","status":"dump_payload","x_ter":"some x value","y":"some y value","y_ter":"some y value"},"topic":"te/device/main///cmd/super_command/test-42"} +{"payload":{"@version":"2753dc8ea41e86c556c3cf841fdcaa8a71a2c843a013ce293310ed54eab6d40c","logPath":"/var/log/tedge/agent/workflow-super_command-test-42.log","output_file":"/tmp/test-42.json","status":"dump_payload","x_ter":"some x value","y":"some y value","y_ter":"some y value"},"topic":"te/device/main///cmd/super_command/test-42"} From a83642f043e0cb6cfc269147151a161b8bf068b8 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 9 Oct 2024 16:30:53 +0200 Subject: [PATCH 02/16] Move on-disk workflow representation in a sub-module Signed-off-by: Didier Wenzek --- .../src/operation_workflows/mod.rs | 69 +------------------ .../src/operation_workflows/persist.rs | 66 ++++++++++++++++++ 2 files changed, 68 insertions(+), 67 deletions(-) create mode 100644 crates/core/tedge_agent/src/operation_workflows/persist.rs diff --git a/crates/core/tedge_agent/src/operation_workflows/mod.rs b/crates/core/tedge_agent/src/operation_workflows/mod.rs index 92f86e2ada1..a3d1b25ce45 100644 --- a/crates/core/tedge_agent/src/operation_workflows/mod.rs +++ b/crates/core/tedge_agent/src/operation_workflows/mod.rs @@ -1,77 +1,12 @@ -use anyhow::Context; -use camino::Utf8PathBuf; -use log::error; -use std::ffi::OsStr; -use std::path::Path; -use tedge_api::workflow::IllFormedOperationWorkflow; -use tedge_api::workflow::OperationWorkflow; -use tedge_api::workflow::WorkflowSupervisor; -use tedge_api::workflow::WorkflowVersion; -use tracing::info; - mod actor; mod builder; mod config; mod message_box; +mod persist; #[cfg(test)] mod tests; pub use builder::WorkflowActorBuilder; pub use config::OperationConfig; - -pub async fn load_operation_workflows( - dir_path: &Utf8PathBuf, -) -> Result { - let mut workflows = WorkflowSupervisor::default(); - for entry in std::fs::read_dir(dir_path)?.flatten() { - let file = entry.path(); - if file.extension() == Some(OsStr::new("toml")) { - match read_operation_workflow(&file) - .await - .and_then(|(workflow, version)| { - load_operation_workflow(&mut workflows, workflow, version) - }) { - Ok(cmd) => { - info!( - "Using operation workflow definition from {file:?} for '{cmd}' operation" - ); - } - Err(err) => { - error!("Ignoring {file:?}: {err:?}") - } - }; - } - } - Ok(workflows) -} - -async fn read_operation_workflow( - path: &Path, -) -> Result<(OperationWorkflow, WorkflowVersion), anyhow::Error> { - let bytes = tokio::fs::read(path).await.context("Fail to read file")?; - let input = std::str::from_utf8(&bytes).context("Fail to extract UTF8 content")?; - let version = sha256::digest(input); - - toml::from_str::(input) - .context("Fail to parse TOML") - .or_else(|err| { - error!("Ill-formed operation workflow definition from {path:?}: {err:?}"); - let workflow = toml::from_str::(input) - .context("Extracting operation name")?; - - let reason = format!("Invalid operation workflow definition {path:?}: {err:?}"); - Ok(OperationWorkflow::ill_formed(workflow.operation, reason)) - }) - .map(|workflow| (workflow, version)) -} - -fn load_operation_workflow( - workflows: &mut WorkflowSupervisor, - workflow: OperationWorkflow, - version: WorkflowVersion, -) -> Result { - let name = workflow.operation.to_string(); - workflows.register_custom_workflow(workflow, version)?; - Ok(name) -} +pub use persist::load_operation_workflows; diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs new file mode 100644 index 00000000000..13eab94a025 --- /dev/null +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -0,0 +1,66 @@ +use anyhow::Context; +use camino::Utf8PathBuf; +use std::ffi::OsStr; +use std::path::Path; +use tedge_api::workflow::IllFormedOperationWorkflow; +use tedge_api::workflow::OperationWorkflow; +use tedge_api::workflow::WorkflowSupervisor; +use tedge_api::workflow::WorkflowVersion; +use tracing::error; +use tracing::info; + +pub async fn load_operation_workflows( + dir_path: &Utf8PathBuf, +) -> Result { + let mut workflows = WorkflowSupervisor::default(); + for entry in std::fs::read_dir(dir_path)?.flatten() { + let file = entry.path(); + if file.extension() == Some(OsStr::new("toml")) { + match read_operation_workflow(&file) + .await + .and_then(|(workflow, version)| { + load_operation_workflow(&mut workflows, workflow, version) + }) { + Ok(cmd) => { + info!( + "Using operation workflow definition from {file:?} for '{cmd}' operation" + ); + } + Err(err) => { + error!("Ignoring {file:?}: {err:?}") + } + }; + } + } + Ok(workflows) +} + +async fn read_operation_workflow( + path: &Path, +) -> Result<(OperationWorkflow, WorkflowVersion), anyhow::Error> { + let bytes = tokio::fs::read(path).await.context("Fail to read file")?; + let input = std::str::from_utf8(&bytes).context("Fail to extract UTF8 content")?; + let version = sha256::digest(input); + + toml::from_str::(input) + .context("Fail to parse TOML") + .or_else(|err| { + error!("Ill-formed operation workflow definition from {path:?}: {err:?}"); + let workflow = toml::from_str::(input) + .context("Extracting operation name")?; + + let reason = format!("Invalid operation workflow definition {path:?}: {err:?}"); + Ok(OperationWorkflow::ill_formed(workflow.operation, reason)) + }) + .map(|workflow| (workflow, version)) +} + +fn load_operation_workflow( + workflows: &mut WorkflowSupervisor, + workflow: OperationWorkflow, + version: WorkflowVersion, +) -> Result { + let name = workflow.operation.to_string(); + workflows.register_custom_workflow(workflow, version)?; + Ok(name) +} From bc928e8328bf9ffd59ae9c87c3d8dc6cef8c8041 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 9 Oct 2024 16:31:59 +0200 Subject: [PATCH 03/16] Make pub the logic used to check the agent state dir This is an intermediate step, the aim being to use the same directory to persist a copy of the workflows currently used (i.e. for which there is a running operation instance). Signed-off-by: Didier Wenzek --- crates/core/tedge_agent/src/agent.rs | 4 +- .../src/operation_workflows/builder.rs | 5 +- .../tedge_agent/src/state_repository/state.rs | 47 ++++++++++++------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index a71475dd29b..3f703bfa669 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -9,7 +9,7 @@ use crate::restart_manager::builder::RestartManagerBuilder; use crate::restart_manager::config::RestartManagerConfig; use crate::software_manager::builder::SoftwareManagerBuilder; use crate::software_manager::config::SoftwareManagerConfig; -use crate::state_repository::state::agent_state_dir; +use crate::state_repository::state::agent_default_state_dir; use crate::tedge_to_te_converter::converter::TedgetoTeConverter; use crate::AgentOpt; use crate::Capabilities; @@ -219,7 +219,7 @@ impl Agent { #[instrument(skip(self), name = "sm-agent")] pub fn init(&self) -> Result<(), anyhow::Error> { // `config_dir` by default is `/etc/tedge` (or whatever the user sets with --config-dir) - create_directory_with_defaults(agent_state_dir(self.config.config_dir.clone()))?; + create_directory_with_defaults(agent_default_state_dir(self.config.config_dir.clone()))?; create_directory_with_defaults(&self.config.agent_log_dir)?; create_directory_with_defaults(&self.config.data_dir)?; create_directory_with_defaults(&self.config.http_config.file_transfer_dir)?; diff --git a/crates/core/tedge_agent/src/operation_workflows/builder.rs b/crates/core/tedge_agent/src/operation_workflows/builder.rs index 1d4d846acee..e07ca73142b 100644 --- a/crates/core/tedge_agent/src/operation_workflows/builder.rs +++ b/crates/core/tedge_agent/src/operation_workflows/builder.rs @@ -3,6 +3,7 @@ use crate::operation_workflows::actor::InternalCommandState; use crate::operation_workflows::actor::WorkflowActor; use crate::operation_workflows::config::OperationConfig; use crate::operation_workflows::message_box::CommandDispatcher; +use crate::state_repository::state::agent_state_dir; use crate::state_repository::state::AgentStateRepository; use log::error; use std::process::Output; @@ -128,8 +129,8 @@ impl Builder for WorkflowActorBuilder { } } - let repository = - AgentStateRepository::new(self.config.state_dir, self.config.config_dir, "workflows"); + let state_dir = agent_state_dir(self.config.state_dir, self.config.config_dir); + let repository = AgentStateRepository::with_state_dir(state_dir, "workflows"); WorkflowActor { mqtt_schema: self.config.mqtt_schema, device_topic_id: self.config.device_topic_id, diff --git a/crates/core/tedge_agent/src/state_repository/state.rs b/crates/core/tedge_agent/src/state_repository/state.rs index 6b128569f63..26e6153c192 100644 --- a/crates/core/tedge_agent/src/state_repository/state.rs +++ b/crates/core/tedge_agent/src/state_repository/state.rs @@ -17,30 +17,41 @@ pub struct AgentStateRepository { phantom: PhantomData, } -pub fn agent_state_dir(tedge_root: Utf8PathBuf) -> Utf8PathBuf { +/// The directory used by the agent to persist its state when tedge config agent.state.path is not set +pub fn agent_default_state_dir(tedge_root: Utf8PathBuf) -> Utf8PathBuf { tedge_root.join(".agent") } +/// Return the given `state_dir` once checked that it can be used to persist the agent state. +/// +/// If for some reason the configured state directory cannot be used, +/// then fallback to the default directory under tedge root (`/etc/tedge/.agent`). +pub fn agent_state_dir(state_dir: Utf8PathBuf, tedge_root: Utf8PathBuf) -> Utf8PathBuf { + // Check that the given directory is actually writable, by creating an empty test file + let test_file = state_dir.join(state_dir.join(".--test--")); + match File::create(test_file.clone()).and_then(|mut file| file.write_all(b"")) { + Ok(_) => { + let _ = std::fs::remove_file(test_file); + state_dir + } + Err(err) => { + warn!("Cannot use {state_dir:?} to store tedge-agent state: {err}"); + agent_default_state_dir(tedge_root) + } + } +} + impl AgentStateRepository { - /// Create a new agent state file in the the given state directory. - /// - /// If for some reason the state file cannot be created (e.g. the directory doesn't exist or is not writable) - /// then the agent state is created under the tedge root directory (`/etc/tedge/.agent`). + /// Create a new agent state file in the given state directory + /// or in the tedge root directory if the given directory is not suitable + /// (e.g. the directory doesn't exist or is not writable). pub fn new(state_dir: Utf8PathBuf, tedge_root: Utf8PathBuf, file_name: &str) -> Self { - // Check that the given directory is actually writable - let test_file = state_dir.join(state_dir.join(".--test--")); - let state_dir = - match File::create(test_file.clone()).and_then(|mut file| file.write_all(b"")) { - Ok(_) => { - let _ = std::fs::remove_file(test_file); - state_dir - } - Err(err) => { - warn!("Cannot use {state_dir:?} to store tedge-agent state: {err}"); - agent_state_dir(tedge_root) - } - }; + let state_dir = agent_state_dir(state_dir, tedge_root); + Self::with_state_dir(state_dir, file_name) + } + /// Create a new agent state file in the given state directory. + pub fn with_state_dir(state_dir: Utf8PathBuf, file_name: &str) -> Self { let state_repo_path = state_dir.join(file_name); info!("Use {state_repo_path:?} to store tedge-agent {file_name} state"); Self { From 446950d9da14a62cb42e1bedb61f9e674ec644de Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 9 Oct 2024 18:22:15 +0200 Subject: [PATCH 04/16] Group workflow loading logic in struct WorkflowRepository For this first step the behavior is unchanged: the workflows are only loaded on start Signed-off-by: Didier Wenzek --- crates/core/tedge_agent/src/agent.rs | 5 +- .../src/operation_workflows/actor.rs | 47 ++++++-- .../src/operation_workflows/builder.rs | 28 ++--- .../src/operation_workflows/config.rs | 5 +- .../src/operation_workflows/message_box.rs | 4 +- .../src/operation_workflows/mod.rs | 1 - .../src/operation_workflows/persist.rs | 108 +++++++++++++----- .../src/operation_workflows/tests.rs | 8 +- 8 files changed, 135 insertions(+), 71 deletions(-) diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 3f703bfa669..6e5a865d417 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -2,7 +2,6 @@ use crate::device_profile_manager::DeviceProfileManagerBuilder; use crate::file_transfer_server::actor::FileTransferServerBuilder; use crate::file_transfer_server::actor::FileTransferServerConfig; use crate::operation_file_cache::FileCacheActorBuilder; -use crate::operation_workflows::load_operation_workflows; use crate::operation_workflows::OperationConfig; use crate::operation_workflows::WorkflowActorBuilder; use crate::restart_manager::builder::RestartManagerBuilder; @@ -242,8 +241,7 @@ impl Agent { // as it will create the device_profile workflow if it does not already exist DeviceProfileManagerBuilder::try_new(&self.config.operations_dir)?; - // Operation workflows - let workflows = load_operation_workflows(&self.config.operations_dir).await?; + // Script actor let mut script_runner: ServerActorBuilder = ScriptActor::builder(); // Restart actor @@ -258,7 +256,6 @@ impl Agent { // Converter actor let mut converter_actor_builder = WorkflowActorBuilder::new( self.config.operation_config, - workflows, &mut mqtt_actor_builder, &mut script_runner, ); diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index b2cb34c1117..f1fece08e2a 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -1,4 +1,5 @@ use crate::operation_workflows::message_box::CommandDispatcher; +use crate::operation_workflows::persist::WorkflowRepository; use crate::state_repository::state::AgentStateRepository; use async_trait::async_trait; use camino::Utf8PathBuf; @@ -30,7 +31,6 @@ use tedge_api::workflow::GenericStateUpdate; use tedge_api::workflow::OperationAction; use tedge_api::workflow::OperationName; use tedge_api::workflow::WorkflowExecutionError; -use tedge_api::workflow::WorkflowSupervisor; use tedge_api::CommandLog; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; @@ -47,7 +47,7 @@ fan_in_message_type!(AgentInput[MqttMessage, InternalCommandState, GenericComman pub struct WorkflowActor { pub(crate) mqtt_schema: MqttSchema, pub(crate) device_topic_id: EntityTopicId, - pub(crate) workflows: WorkflowSupervisor, + pub(crate) workflow_repository: WorkflowRepository, pub(crate) state_repository: AgentStateRepository, pub(crate) log_dir: Utf8PathBuf, pub(crate) input_receiver: UnboundedLoggingReceiver, @@ -64,6 +64,7 @@ impl Actor for WorkflowActor { } async fn run(mut self) -> Result<(), RuntimeError> { + self.workflow_repository.load().await; self.publish_operation_capabilities().await?; self.load_command_board().await?; @@ -92,6 +93,7 @@ impl Actor for WorkflowActor { impl WorkflowActor { async fn publish_operation_capabilities(&mut self) -> Result<(), RuntimeError> { for capability in self + .workflow_repository .workflows .capability_messages(&self.mqtt_schema, &self.device_topic_id) { @@ -135,7 +137,11 @@ impl WorkflowActor { let mut log_file = self.open_command_log(&state, &operation, &cmd_id); - match self.workflows.apply_external_update(&operation, state) { + match self + .workflow_repository + .workflows + .apply_external_update(&operation, state) + { Ok(None) => (), Ok(Some(new_state)) => { self.persist_command_board().await?; @@ -172,7 +178,7 @@ impl WorkflowActor { }; let mut log_file = self.open_command_log(&state, &operation, &cmd_id); - let action = match self.workflows.get_action(&state) { + let action = match self.workflow_repository.workflows.get_action(&state) { Ok(action) => action, Err(WorkflowExecutionError::UnknownStep { operation, step }) => { info!("No action defined for {operation} operation {step} step"); @@ -192,7 +198,11 @@ impl WorkflowActor { match action { OperationAction::Clear => { - if let Some(invoking_command) = self.workflows.invoking_command_state(&state) { + if let Some(invoking_command) = self + .workflow_repository + .workflows + .invoking_command_state(&state) + { log_file .log_info(&format!( "Resuming invoking command {}", @@ -336,6 +346,7 @@ impl WorkflowActor { // Get the sub-operation state and resume this command when the sub-operation is in a terminal state if let Some(sub_state) = self + .workflow_repository .workflows .sub_command_state(&state) .map(|s| s.to_owned()) @@ -423,8 +434,15 @@ impl WorkflowActor { &mut self, new_state: GenericCommandState, ) -> Result<(), RuntimeError> { - let adapted_state = self.workflows.adapt_builtin_response(new_state); - if let Err(err) = self.workflows.apply_internal_update(adapted_state.clone()) { + let adapted_state = self + .workflow_repository + .workflows + .adapt_builtin_response(new_state); + if let Err(err) = self + .workflow_repository + .workflows + .apply_internal_update(adapted_state.clone()) + { error!("Fail to persist workflow operation state: {err}"); } self.persist_command_board().await?; @@ -441,6 +459,7 @@ impl WorkflowActor { cmd_id: &str, ) -> CommandLog { let (root_operation, root_cmd_id) = match self + .workflow_repository .workflows .root_invoking_command_state(state) .map(|s| s.topic.as_ref()) @@ -465,7 +484,11 @@ impl WorkflowActor { new_state: GenericCommandState, log_file: &mut CommandLog, ) -> Result<(), RuntimeError> { - if let Err(err) = self.workflows.apply_internal_update(new_state.clone()) { + if let Err(err) = self + .workflow_repository + .workflows + .apply_internal_update(new_state.clone()) + { error!("Fail to persist workflow operation state: {err}"); } self.persist_command_board().await?; @@ -483,7 +506,11 @@ impl WorkflowActor { async fn load_command_board(&mut self) -> Result<(), RuntimeError> { match self.state_repository.load().await { Ok(Some(pending_commands)) => { - for command in self.workflows.load_pending_commands(pending_commands) { + for command in self + .workflow_repository + .workflows + .load_pending_commands(pending_commands) + { self.process_command_update(command.clone()).await?; } } @@ -500,7 +527,7 @@ impl WorkflowActor { /// Persist on-disk the current state of the pending command requests async fn persist_command_board(&mut self) -> Result<(), RuntimeError> { - let pending_commands = self.workflows.pending_commands(); + let pending_commands = self.workflow_repository.workflows.pending_commands(); if let Err(err) = self.state_repository.store(pending_commands).await { error!( "Fail to persist pending command requests in {} due to: {}", diff --git a/crates/core/tedge_agent/src/operation_workflows/builder.rs b/crates/core/tedge_agent/src/operation_workflows/builder.rs index e07ca73142b..9b7a7df8f35 100644 --- a/crates/core/tedge_agent/src/operation_workflows/builder.rs +++ b/crates/core/tedge_agent/src/operation_workflows/builder.rs @@ -3,9 +3,9 @@ use crate::operation_workflows::actor::InternalCommandState; use crate::operation_workflows::actor::WorkflowActor; use crate::operation_workflows::config::OperationConfig; use crate::operation_workflows::message_box::CommandDispatcher; +use crate::operation_workflows::persist::WorkflowRepository; use crate::state_repository::state::agent_state_dir; use crate::state_repository::state::AgentStateRepository; -use log::error; use std::process::Output; use tedge_actors::futures::channel::mpsc; use tedge_actors::Builder; @@ -28,14 +28,12 @@ use tedge_api::mqtt_topics::MqttSchema; use tedge_api::workflow::GenericCommandData; use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::OperationName; -use tedge_api::workflow::WorkflowSupervisor; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; use tedge_script_ext::Execute; pub struct WorkflowActorBuilder { config: OperationConfig, - workflows: WorkflowSupervisor, input_sender: DynSender, input_receiver: UnboundedLoggingReceiver, command_dispatcher: CommandDispatcher, @@ -48,7 +46,6 @@ pub struct WorkflowActorBuilder { impl WorkflowActorBuilder { pub fn new( config: OperationConfig, - workflows: WorkflowSupervisor, mqtt_actor: &mut (impl MessageSource + MessageSink), script_runner: &mut impl Service>, ) -> Self { @@ -76,7 +73,6 @@ impl WorkflowActorBuilder { Self { config, - workflows, input_sender, input_receiver, command_dispatcher, @@ -119,23 +115,19 @@ impl Builder for WorkflowActorBuilder { Ok(self.build()) } - fn build(mut self) -> WorkflowActor { - for capability in self.command_dispatcher.capabilities() { - if let Err(err) = self - .workflows - .register_builtin_workflow(capability.as_str().into()) - { - error!("Fail to register built-in workflow for {capability} operation: {err}"); - } - } - + fn build(self) -> WorkflowActor { + let builtin_workflows = self.command_dispatcher.capabilities(); + let custom_workflows_dir = self.config.operations_dir; let state_dir = agent_state_dir(self.config.state_dir, self.config.config_dir); - let repository = AgentStateRepository::with_state_dir(state_dir, "workflows"); + let workflow_repository = + WorkflowRepository::new(builtin_workflows, custom_workflows_dir, state_dir.clone()); + let state_repository = AgentStateRepository::with_state_dir(state_dir, "workflows"); + WorkflowActor { mqtt_schema: self.config.mqtt_schema, device_topic_id: self.config.device_topic_id, - workflows: self.workflows, - state_repository: repository, + workflow_repository, + state_repository, log_dir: self.config.log_dir, input_receiver: self.input_receiver, builtin_command_dispatcher: self.command_dispatcher, diff --git a/crates/core/tedge_agent/src/operation_workflows/config.rs b/crates/core/tedge_agent/src/operation_workflows/config.rs index 3b500647859..97aa789d465 100644 --- a/crates/core/tedge_agent/src/operation_workflows/config.rs +++ b/crates/core/tedge_agent/src/operation_workflows/config.rs @@ -9,6 +9,7 @@ pub struct OperationConfig { pub log_dir: Utf8PathBuf, pub config_dir: Utf8PathBuf, pub state_dir: Utf8PathBuf, + pub operations_dir: Utf8PathBuf, } impl OperationConfig { @@ -17,14 +18,16 @@ impl OperationConfig { device_topic_id: &EntityTopicId, tedge_config_location: &tedge_config::TEdgeConfigLocation, ) -> Result { + let config_dir = &tedge_config_location.tedge_config_root_path; let tedge_config = tedge_config::TEdgeConfig::try_new(tedge_config_location.clone())?; Ok(OperationConfig { mqtt_schema: MqttSchema::with_root(topic_root), device_topic_id: device_topic_id.clone(), log_dir: tedge_config.logs.path.join("agent"), - config_dir: tedge_config_location.tedge_config_root_path.clone(), + config_dir: config_dir.clone(), state_dir: tedge_config.agent.state.path.clone(), + operations_dir: config_dir.join("operations"), }) } } diff --git a/crates/core/tedge_agent/src/operation_workflows/message_box.rs b/crates/core/tedge_agent/src/operation_workflows/message_box.rs index a40b0c1c309..d5c80aaeb38 100644 --- a/crates/core/tedge_agent/src/operation_workflows/message_box.rs +++ b/crates/core/tedge_agent/src/operation_workflows/message_box.rs @@ -40,7 +40,7 @@ impl CommandDispatcher { } /// List the operations for which a builtin handler has been registered - pub fn capabilities(&self) -> Vec<&OperationName> { - self.senders.keys().collect() + pub fn capabilities(&self) -> Vec { + self.senders.keys().cloned().collect() } } diff --git a/crates/core/tedge_agent/src/operation_workflows/mod.rs b/crates/core/tedge_agent/src/operation_workflows/mod.rs index a3d1b25ce45..86f12d6f4a4 100644 --- a/crates/core/tedge_agent/src/operation_workflows/mod.rs +++ b/crates/core/tedge_agent/src/operation_workflows/mod.rs @@ -9,4 +9,3 @@ mod tests; pub use builder::WorkflowActorBuilder; pub use config::OperationConfig; -pub use persist::load_operation_workflows; diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 13eab94a025..0e93d1bcd78 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -3,36 +3,94 @@ use camino::Utf8PathBuf; use std::ffi::OsStr; use std::path::Path; use tedge_api::workflow::IllFormedOperationWorkflow; +use tedge_api::workflow::OperationName; use tedge_api::workflow::OperationWorkflow; use tedge_api::workflow::WorkflowSupervisor; use tedge_api::workflow::WorkflowVersion; use tracing::error; use tracing::info; -pub async fn load_operation_workflows( - dir_path: &Utf8PathBuf, -) -> Result { - let mut workflows = WorkflowSupervisor::default(); - for entry in std::fs::read_dir(dir_path)?.flatten() { - let file = entry.path(); - if file.extension() == Some(OsStr::new("toml")) { - match read_operation_workflow(&file) - .await - .and_then(|(workflow, version)| { - load_operation_workflow(&mut workflows, workflow, version) - }) { - Ok(cmd) => { - info!( +/// Persist the workflow definitions. +pub struct WorkflowRepository { + builtin_workflows: Vec, + custom_workflows_dir: Utf8PathBuf, + state_dir: Utf8PathBuf, + pub(crate) workflows: WorkflowSupervisor, +} + +impl WorkflowRepository { + pub fn new( + builtin_workflows: Vec, + custom_workflows_dir: Utf8PathBuf, + state_dir: Utf8PathBuf, + ) -> Self { + let workflows = WorkflowSupervisor::default(); + Self { + builtin_workflows, + custom_workflows_dir, + state_dir, + workflows, + } + } + + pub async fn load(&mut self) { + let dir_path = &self.custom_workflows_dir.clone(); + if let Err(err) = self.load_operation_workflows(dir_path).await { + error!("Fail to read the operation workflows from {dir_path}: {err:?}"); + } + + let dir_path = &self.state_dir.clone(); + if let Err(err) = self.load_operation_workflows(dir_path).await { + error!("Fail to reload the running operation workflows from {dir_path}: {err:?}"); + } + self.load_builtin_workflows(); + } + + async fn load_operation_workflows( + &mut self, + dir_path: &Utf8PathBuf, + ) -> Result<(), anyhow::Error> { + for entry in std::fs::read_dir(dir_path)?.flatten() { + let file = entry.path(); + if file.extension() == Some(OsStr::new("toml")) { + match read_operation_workflow(&file) + .await + .and_then(|(workflow, version)| self.load_operation_workflow(workflow, version)) + { + Ok(cmd) => { + info!( "Using operation workflow definition from {file:?} for '{cmd}' operation" ); - } - Err(err) => { - error!("Ignoring {file:?}: {err:?}") - } - }; + } + Err(err) => { + error!("Ignoring {file:?}: {err:?}") + } + }; + } + } + Ok(()) + } + + fn load_operation_workflow( + &mut self, + workflow: OperationWorkflow, + version: WorkflowVersion, + ) -> Result { + let name = workflow.operation.to_string(); + self.workflows.register_custom_workflow(workflow, version)?; + Ok(name) + } + + fn load_builtin_workflows(&mut self) { + for capability in self.builtin_workflows.iter() { + if let Err(err) = self + .workflows + .register_builtin_workflow(capability.as_str().into()) + { + error!("Fail to register built-in workflow for {capability} operation: {err}"); + } } } - Ok(workflows) } async fn read_operation_workflow( @@ -54,13 +112,3 @@ async fn read_operation_workflow( }) .map(|workflow| (workflow, version)) } - -fn load_operation_workflow( - workflows: &mut WorkflowSupervisor, - workflow: OperationWorkflow, - version: WorkflowVersion, -) -> Result { - let name = workflow.operation.to_string(); - workflows.register_custom_workflow(workflow, version)?; - Ok(name) -} diff --git a/crates/core/tedge_agent/src/operation_workflows/tests.rs b/crates/core/tedge_agent/src/operation_workflows/tests.rs index ab8cd39dc4c..64dfadb2040 100644 --- a/crates/core/tedge_agent/src/operation_workflows/tests.rs +++ b/crates/core/tedge_agent/src/operation_workflows/tests.rs @@ -36,7 +36,6 @@ use tedge_api::mqtt_topics::OperationType; use tedge_api::workflow::GenericCommandData; use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::OperationName; -use tedge_api::workflow::WorkflowSupervisor; use tedge_api::RestartCommand; use tedge_api::SoftwareUpdateCommand; use tedge_mqtt_ext::test_helpers::assert_received_contains_str; @@ -357,8 +356,6 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result = SimpleMessageBoxBuilder::new("Script", 5); - let workflows = WorkflowSupervisor::default(); - let tmp_dir = tempfile::TempDir::new().unwrap(); let tmp_path = Utf8Path::from_path(tmp_dir.path()).unwrap(); let config = OperationConfig { @@ -366,10 +363,11 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result Date: Thu, 10 Oct 2024 10:23:05 +0200 Subject: [PATCH 05/16] The agent WorkflowRepository adds durability to the WorkflowSupervisor engine The WorkflowRepository acts as a facade to WorkflowSupervisor adding all disk related features: loading definitions from disk, caching definitions in-use, reloading definitions on changes. Signed-off-by: Didier Wenzek --- .../src/operation_workflows/actor.rs | 22 ++--- .../src/operation_workflows/persist.rs | 83 ++++++++++++++++++- 2 files changed, 87 insertions(+), 18 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index f1fece08e2a..fbeb290c7a5 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -94,7 +94,6 @@ impl WorkflowActor { async fn publish_operation_capabilities(&mut self) -> Result<(), RuntimeError> { for capability in self .workflow_repository - .workflows .capability_messages(&self.mqtt_schema, &self.device_topic_id) { self.mqtt_publisher.send(capability).await? @@ -139,7 +138,6 @@ impl WorkflowActor { match self .workflow_repository - .workflows .apply_external_update(&operation, state) { Ok(None) => (), @@ -178,7 +176,7 @@ impl WorkflowActor { }; let mut log_file = self.open_command_log(&state, &operation, &cmd_id); - let action = match self.workflow_repository.workflows.get_action(&state) { + let action = match self.workflow_repository.get_action(&state) { Ok(action) => action, Err(WorkflowExecutionError::UnknownStep { operation, step }) => { info!("No action defined for {operation} operation {step} step"); @@ -198,10 +196,8 @@ impl WorkflowActor { match action { OperationAction::Clear => { - if let Some(invoking_command) = self - .workflow_repository - .workflows - .invoking_command_state(&state) + if let Some(invoking_command) = + self.workflow_repository.invoking_command_state(&state) { log_file .log_info(&format!( @@ -347,7 +343,6 @@ impl WorkflowActor { // Get the sub-operation state and resume this command when the sub-operation is in a terminal state if let Some(sub_state) = self .workflow_repository - .workflows .sub_command_state(&state) .map(|s| s.to_owned()) { @@ -434,13 +429,9 @@ impl WorkflowActor { &mut self, new_state: GenericCommandState, ) -> Result<(), RuntimeError> { - let adapted_state = self - .workflow_repository - .workflows - .adapt_builtin_response(new_state); + let adapted_state = self.workflow_repository.adapt_builtin_response(new_state); if let Err(err) = self .workflow_repository - .workflows .apply_internal_update(adapted_state.clone()) { error!("Fail to persist workflow operation state: {err}"); @@ -460,7 +451,6 @@ impl WorkflowActor { ) -> CommandLog { let (root_operation, root_cmd_id) = match self .workflow_repository - .workflows .root_invoking_command_state(state) .map(|s| s.topic.as_ref()) .and_then(|root_topic| self.extract_command_identifiers(root_topic).ok()) @@ -486,7 +476,6 @@ impl WorkflowActor { ) -> Result<(), RuntimeError> { if let Err(err) = self .workflow_repository - .workflows .apply_internal_update(new_state.clone()) { error!("Fail to persist workflow operation state: {err}"); @@ -508,7 +497,6 @@ impl WorkflowActor { Ok(Some(pending_commands)) => { for command in self .workflow_repository - .workflows .load_pending_commands(pending_commands) { self.process_command_update(command.clone()).await?; @@ -527,7 +515,7 @@ impl WorkflowActor { /// Persist on-disk the current state of the pending command requests async fn persist_command_board(&mut self) -> Result<(), RuntimeError> { - let pending_commands = self.workflow_repository.workflows.pending_commands(); + let pending_commands = self.workflow_repository.pending_commands(); if let Err(err) = self.state_repository.store(pending_commands).await { error!( "Fail to persist pending command requests in {} due to: {}", diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 0e93d1bcd78..86987fbd180 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -2,20 +2,34 @@ use anyhow::Context; use camino::Utf8PathBuf; use std::ffi::OsStr; use std::path::Path; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; +use tedge_api::workflow::CommandBoard; +use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::IllFormedOperationWorkflow; +use tedge_api::workflow::OperationAction; use tedge_api::workflow::OperationName; use tedge_api::workflow::OperationWorkflow; +use tedge_api::workflow::WorkflowExecutionError; use tedge_api::workflow::WorkflowSupervisor; use tedge_api::workflow::WorkflowVersion; +use tedge_mqtt_ext::MqttMessage; use tracing::error; use tracing::info; /// Persist the workflow definitions. +/// +/// The WorkflowRepository acts as a facade to WorkflowSupervisor +/// adding all disk related features: +/// - loading definitions from disk, +/// - caching definitions in-use, +/// - reloading definitions on changes. pub struct WorkflowRepository { builtin_workflows: Vec, custom_workflows_dir: Utf8PathBuf, state_dir: Utf8PathBuf, - pub(crate) workflows: WorkflowSupervisor, + workflows: WorkflowSupervisor, } impl WorkflowRepository { @@ -91,6 +105,73 @@ impl WorkflowRepository { } } } + + pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec { + self.workflows.load_pending_commands(commands) + } + + pub fn pending_commands(&self) -> &CommandBoard { + self.workflows.pending_commands() + } + + pub fn capability_messages( + &self, + schema: &MqttSchema, + target: &EntityTopicId, + ) -> Vec { + self.workflows.capability_messages(schema, target) + } + + pub fn apply_external_update( + &mut self, + operation: &OperationType, + command_state: GenericCommandState, + ) -> Result, WorkflowExecutionError> { + self.workflows + .apply_external_update(operation, command_state) + } + + pub fn apply_internal_update( + &mut self, + new_command_state: GenericCommandState, + ) -> Result<(), WorkflowExecutionError> { + self.workflows.apply_internal_update(new_command_state) + } + + pub fn get_action( + &self, + command_state: &GenericCommandState, + ) -> Result { + self.workflows.get_action(command_state) + } + + pub fn root_invoking_command_state( + &self, + leaf_command: &GenericCommandState, + ) -> Option<&GenericCommandState> { + self.workflows.root_invoking_command_state(leaf_command) + } + + pub fn invoking_command_state( + &self, + sub_command: &GenericCommandState, + ) -> Option<&GenericCommandState> { + self.workflows.invoking_command_state(sub_command) + } + + pub fn sub_command_state( + &self, + command_state: &GenericCommandState, + ) -> Option<&GenericCommandState> { + self.workflows.sub_command_state(command_state) + } + + pub fn adapt_builtin_response( + &self, + command_state: GenericCommandState, + ) -> GenericCommandState { + self.workflows.adapt_builtin_response(command_state) + } } async fn read_operation_workflow( From 6cc005c1d22cfcb1fdfdee616706ac4415b170c9 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 10 Oct 2024 15:08:30 +0200 Subject: [PATCH 06/16] Persist operation definition when a new instance is created Signed-off-by: Didier Wenzek --- .../src/operation_workflows/actor.rs | 1 + .../src/operation_workflows/persist.rs | 93 ++++++++++++++++--- 2 files changed, 81 insertions(+), 13 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index fbeb290c7a5..946d7c78020 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -139,6 +139,7 @@ impl WorkflowActor { match self .workflow_repository .apply_external_update(&operation, state) + .await { Ok(None) => (), Ok(Some(new_state)) => { diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 86987fbd180..0b7810bdf57 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -1,7 +1,7 @@ use anyhow::Context; +use camino::Utf8Path; use camino::Utf8PathBuf; -use std::ffi::OsStr; -use std::path::Path; +use std::collections::HashMap; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; @@ -26,9 +26,22 @@ use tracing::info; /// - caching definitions in-use, /// - reloading definitions on changes. pub struct WorkflowRepository { + // Names of the builtin workflows, i.e. without any file representation builtin_workflows: Vec, + + // Directory for the user-defined workflows custom_workflows_dir: Utf8PathBuf, + + // Directory of user-defined workflow copies of the workflow in-use state_dir: Utf8PathBuf, + + // Map each workflow version to its workflow file + // + // For a fresh new workflow definition, this points to the user-defined file + // When the workflow definition is in use, this points to a copy in the state directory. + definitions: HashMap, + + // The in-memory representation of all the workflows (builtin, user-defined, i,n-use). workflows: WorkflowSupervisor, } @@ -39,24 +52,36 @@ impl WorkflowRepository { state_dir: Utf8PathBuf, ) -> Self { let workflows = WorkflowSupervisor::default(); + let state_dir = state_dir.join("workflows-in-use"); + let definitions = HashMap::new(); Self { builtin_workflows, custom_workflows_dir, state_dir, + definitions, workflows, } } pub async fn load(&mut self) { + // Note that the loading order matters. + + // First, all the user-defined workflows are loaded let dir_path = &self.custom_workflows_dir.clone(); if let Err(err) = self.load_operation_workflows(dir_path).await { error!("Fail to read the operation workflows from {dir_path}: {err:?}"); } + // Then, the definitions of the workflow still in-use are loaded + // If a definition has not changed, then self.definitions is updated accordingly + // so the known location of this definition is the copy not the original let dir_path = &self.state_dir.clone(); + let _ = tokio::fs::create_dir(dir_path).await; // if the creation fails, this will be reported next line on read if let Err(err) = self.load_operation_workflows(dir_path).await { error!("Fail to reload the running operation workflows from {dir_path}: {err:?}"); } + + // Finally, builtin workflows are installed if not better definition has been provided by the user self.load_builtin_workflows(); } @@ -64,17 +89,18 @@ impl WorkflowRepository { &mut self, dir_path: &Utf8PathBuf, ) -> Result<(), anyhow::Error> { - for entry in std::fs::read_dir(dir_path)?.flatten() { + for entry in dir_path.read_dir_utf8()?.flatten() { let file = entry.path(); - if file.extension() == Some(OsStr::new("toml")) { - match read_operation_workflow(&file) + if file.extension() == Some("toml") { + match read_operation_workflow(file) .await - .and_then(|(workflow, version)| self.load_operation_workflow(workflow, version)) - { + .and_then(|(workflow, version)| { + self.load_operation_workflow(file.into(), workflow, version) + }) { Ok(cmd) => { info!( - "Using operation workflow definition from {file:?} for '{cmd}' operation" - ); + "Using operation workflow definition from {file:?} for '{cmd}' operation" + ); } Err(err) => { error!("Ignoring {file:?}: {err:?}") @@ -87,10 +113,12 @@ impl WorkflowRepository { fn load_operation_workflow( &mut self, + definition: Utf8PathBuf, workflow: OperationWorkflow, version: WorkflowVersion, ) -> Result { let name = workflow.operation.to_string(); + self.definitions.insert(version.clone(), definition); self.workflows.register_custom_workflow(workflow, version)?; Ok(name) } @@ -106,6 +134,25 @@ impl WorkflowRepository { } } + /// Copy the workflow definition file to the persisted state directory, + /// unless this has already been done. + async fn persist_workflow_definition( + &mut self, + operation: &OperationName, + version: &WorkflowVersion, + ) { + if let Some(source) = self.definitions.get(version) { + if !source.starts_with(&self.state_dir) { + let target = self.state_dir.join(operation).with_extension("toml"); + if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { + error!("Fail to persist a copy of {source} as {target}: {err}"); + } else { + self.definitions.insert(version.clone(), target); + } + } + } + } + pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec { self.workflows.load_pending_commands(commands) } @@ -122,13 +169,33 @@ impl WorkflowRepository { self.workflows.capability_messages(schema, target) } - pub fn apply_external_update( + /// Update the state of the command board on reception of a message sent by a peer over MQTT + /// + /// If this is the first time a command of that type is created, + /// then a copy of its definition is persisted in the state directory. + /// The point is to be sure the command execution is ruled by its initial workflow unchanged + /// even if the user pushes a new version meantime. + pub async fn apply_external_update( &mut self, operation: &OperationType, command_state: GenericCommandState, ) -> Result, WorkflowExecutionError> { - self.workflows - .apply_external_update(operation, command_state) + match self + .workflows + .apply_external_update(operation, command_state)? + { + None => Ok(None), + + Some(new_state) if new_state.is_init() => { + if let Some(version) = new_state.workflow_version() { + self.persist_workflow_definition(&operation.to_string(), &version) + .await; + } + Ok(Some(new_state)) + } + + Some(updated_state) => Ok(Some(updated_state)), + } } pub fn apply_internal_update( @@ -175,7 +242,7 @@ impl WorkflowRepository { } async fn read_operation_workflow( - path: &Path, + path: &Utf8Path, ) -> Result<(OperationWorkflow, WorkflowVersion), anyhow::Error> { let bytes = tokio::fs::read(path).await.context("Fail to read file")?; let input = std::str::from_utf8(&bytes).context("Fail to extract UTF8 content")?; From dca2f6fe8a05bdede78b4eaa3ea40614e049b157 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 11 Oct 2024 10:27:47 +0200 Subject: [PATCH 07/16] Reload workflow definitions on file change using inotify Signed-off-by: Didier Wenzek --- crates/core/tedge_agent/src/agent.rs | 5 ++- .../src/operation_workflows/actor.rs | 8 ++++- .../src/operation_workflows/builder.rs | 5 +++ .../src/operation_workflows/persist.rs | 34 +++++++++++++++++++ .../src/operation_workflows/tests.rs | 11 ++++-- 5 files changed, 59 insertions(+), 4 deletions(-) diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 6e5a865d417..24c8c042584 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -241,6 +241,9 @@ impl Agent { // as it will create the device_profile workflow if it does not already exist DeviceProfileManagerBuilder::try_new(&self.config.operations_dir)?; + // Inotify actor + let mut fs_watch_actor_builder = FsWatchActorBuilder::new(); + // Script actor let mut script_runner: ServerActorBuilder = ScriptActor::builder(); @@ -258,6 +261,7 @@ impl Agent { self.config.operation_config, &mut mqtt_actor_builder, &mut script_runner, + &mut fs_watch_actor_builder, ); converter_actor_builder.register_builtin_operation(&mut restart_actor_builder); converter_actor_builder.register_builtin_operation(&mut software_update_builder); @@ -281,7 +285,6 @@ impl Agent { &self.config.service, ); - let mut fs_watch_actor_builder = FsWatchActorBuilder::new(); let mut downloader_actor_builder = DownloaderActor::new( self.config.identity.clone(), self.config.cloud_root_certs.clone(), diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index 946d7c78020..eeb888c211b 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -32,6 +32,7 @@ use tedge_api::workflow::OperationAction; use tedge_api::workflow::OperationName; use tedge_api::workflow::WorkflowExecutionError; use tedge_api::CommandLog; +use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_script_ext::Execute; @@ -42,7 +43,7 @@ use tokio::time::sleep; #[derive(Debug)] pub struct InternalCommandState(GenericCommandState); -fan_in_message_type!(AgentInput[MqttMessage, InternalCommandState, GenericCommandData] : Debug); +fan_in_message_type!(AgentInput[MqttMessage, InternalCommandState, GenericCommandData, FsWatchEvent] : Debug); pub struct WorkflowActor { pub(crate) mqtt_schema: MqttSchema, @@ -84,6 +85,11 @@ impl Actor for WorkflowActor { )) => { self.publish_builtin_capability(operation, payload).await?; } + AgentInput::FsWatchEvent(file_update) => { + self.workflow_repository + .update_operation_workflows(file_update) + .await; + } } } Ok(()) diff --git a/crates/core/tedge_agent/src/operation_workflows/builder.rs b/crates/core/tedge_agent/src/operation_workflows/builder.rs index 9b7a7df8f35..ff80847ce5a 100644 --- a/crates/core/tedge_agent/src/operation_workflows/builder.rs +++ b/crates/core/tedge_agent/src/operation_workflows/builder.rs @@ -6,6 +6,7 @@ use crate::operation_workflows::message_box::CommandDispatcher; use crate::operation_workflows::persist::WorkflowRepository; use crate::state_repository::state::agent_state_dir; use crate::state_repository::state::AgentStateRepository; +use std::path::PathBuf; use std::process::Output; use tedge_actors::futures::channel::mpsc; use tedge_actors::Builder; @@ -28,6 +29,7 @@ use tedge_api::mqtt_topics::MqttSchema; use tedge_api::workflow::GenericCommandData; use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::OperationName; +use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; use tedge_script_ext::Execute; @@ -48,6 +50,7 @@ impl WorkflowActorBuilder { config: OperationConfig, mqtt_actor: &mut (impl MessageSource + MessageSink), script_runner: &mut impl Service>, + fs_notify: &mut impl MessageSource, ) -> Self { let (input_sender, input_receiver) = mpsc::unbounded(); let (signal_sender, signal_receiver) = mpsc::channel(10); @@ -71,6 +74,8 @@ impl WorkflowActorBuilder { let script_runner = ClientMessageBox::new(script_runner); + fs_notify.connect_sink(config.operations_dir.clone().into(), &input_sender); + Self { config, input_sender, diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 0b7810bdf57..2b2f144cd23 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -14,6 +14,7 @@ use tedge_api::workflow::OperationWorkflow; use tedge_api::workflow::WorkflowExecutionError; use tedge_api::workflow::WorkflowSupervisor; use tedge_api::workflow::WorkflowVersion; +use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::MqttMessage; use tracing::error; use tracing::info; @@ -134,6 +135,39 @@ impl WorkflowRepository { } } + pub async fn update_operation_workflows(&mut self, file_update: FsWatchEvent) { + match file_update { + FsWatchEvent::Modified(path) | FsWatchEvent::FileCreated(path) => { + if let Ok(path) = Utf8PathBuf::try_from(path) { + if self.is_user_defined(&path) { + self.reload_operation_workflow(&path).await + } + } + } + + FsWatchEvent::FileDeleted(_) => {} + + FsWatchEvent::DirectoryDeleted(_) | FsWatchEvent::DirectoryCreated(_) => {} + } + } + + fn is_user_defined(&mut self, path: &Utf8PathBuf) -> bool { + path.extension() == Some("toml") && path.parent() == Some(&self.custom_workflows_dir) + } + + async fn reload_operation_workflow(&mut self, path: &Utf8PathBuf) { + match read_operation_workflow(path).await { + Ok((workflow, version)) => { + if let Ok(cmd) = self.load_operation_workflow(path.clone(), workflow, version) { + info!("Using the updated operation workflow definition from {path} for '{cmd}' operation"); + } + } + Err(err) => { + error!("Fail to reload {path}: {err}") + } + } + } + /// Copy the workflow definition file to the persisted state directory, /// unless this has already been done. async fn persist_workflow_definition( diff --git a/crates/core/tedge_agent/src/operation_workflows/tests.rs b/crates/core/tedge_agent/src/operation_workflows/tests.rs index 64dfadb2040..ee9d74ffe9c 100644 --- a/crates/core/tedge_agent/src/operation_workflows/tests.rs +++ b/crates/core/tedge_agent/src/operation_workflows/tests.rs @@ -38,6 +38,7 @@ use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::OperationName; use tedge_api::RestartCommand; use tedge_api::SoftwareUpdateCommand; +use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::test_helpers::assert_received_contains_str; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; @@ -355,6 +356,8 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result>, NoMessage, > = SimpleMessageBoxBuilder::new("Script", 5); + let mut inotify_builder: SimpleMessageBoxBuilder = + SimpleMessageBoxBuilder::new("Inotify", 5); let tmp_dir = tempfile::TempDir::new().unwrap(); let tmp_path = Utf8Path::from_path(tmp_dir.path()).unwrap(); @@ -366,8 +369,12 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result Date: Fri, 11 Oct 2024 15:28:13 +0200 Subject: [PATCH 08/16] Unregister user-defined workflows which definitions are removed Signed-off-by: Didier Wenzek --- .../src/operation_workflows/actor.rs | 14 ++- .../src/operation_workflows/persist.rs | 76 +++++++++++-- .../core/tedge_api/src/workflow/supervisor.rs | 102 +++++++++++++++--- 3 files changed, 165 insertions(+), 27 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index eeb888c211b..170a4c02a2d 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -86,9 +86,19 @@ impl Actor for WorkflowActor { self.publish_builtin_capability(operation, payload).await?; } AgentInput::FsWatchEvent(file_update) => { - self.workflow_repository + if let Some(deprecated_operation) = self + .workflow_repository .update_operation_workflows(file_update) - .await; + .await + { + let deprecated_capability = + self.workflow_repository.deregistration_message( + &self.mqtt_schema, + &self.device_topic_id, + &deprecated_operation, + ); + self.mqtt_publisher.send(deprecated_capability).await? + } } } } diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 2b2f144cd23..038cff42b22 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -40,7 +40,7 @@ pub struct WorkflowRepository { // // For a fresh new workflow definition, this points to the user-defined file // When the workflow definition is in use, this points to a copy in the state directory. - definitions: HashMap, + definitions: HashMap, // The in-memory representation of all the workflows (builtin, user-defined, i,n-use). workflows: WorkflowSupervisor, @@ -118,10 +118,11 @@ impl WorkflowRepository { workflow: OperationWorkflow, version: WorkflowVersion, ) -> Result { - let name = workflow.operation.to_string(); - self.definitions.insert(version.clone(), definition); + let operation_name = workflow.operation.to_string(); + self.definitions + .insert(version.clone(), (operation_name.clone(), definition)); self.workflows.register_custom_workflow(workflow, version)?; - Ok(name) + Ok(operation_name) } fn load_builtin_workflows(&mut self) { @@ -135,20 +136,39 @@ impl WorkflowRepository { } } - pub async fn update_operation_workflows(&mut self, file_update: FsWatchEvent) { + /// Update the workflow definitions after some on-disk changes + /// + /// Return the operation capability deregistration message when the operation has been deprecated. + pub async fn update_operation_workflows( + &mut self, + file_update: FsWatchEvent, + ) -> Option { match file_update { FsWatchEvent::Modified(path) | FsWatchEvent::FileCreated(path) => { if let Ok(path) = Utf8PathBuf::try_from(path) { if self.is_user_defined(&path) { - self.reload_operation_workflow(&path).await + if path.exists() { + self.reload_operation_workflow(&path).await + } else { + // FsWatchEvent returns misleading Modified events along FileDeleted events. + return self.remove_operation_workflow(&path).await; + } } } } - FsWatchEvent::FileDeleted(_) => {} + FsWatchEvent::FileDeleted(path) => { + if let Ok(path) = Utf8PathBuf::try_from(path) { + if self.is_user_defined(&path) { + return self.remove_operation_workflow(&path).await; + } + } + } FsWatchEvent::DirectoryDeleted(_) | FsWatchEvent::DirectoryCreated(_) => {} } + + None } fn is_user_defined(&mut self, path: &Utf8PathBuf) -> bool { @@ -168,6 +188,33 @@ impl WorkflowRepository { } } + /// Remove a user defined workflow. + /// + /// Return the operation name if this was the last version for that operation, + /// .i.e. there is no builtin workflow. + async fn remove_operation_workflow( + &mut self, + removed_path: &Utf8PathBuf, + ) -> Option { + // As this is not intended to be a frequent operation, there is no attempt to be efficient. + let (operation, removed_version) = self + .definitions + .iter() + .find(|(_, (_, p))| p == removed_path) + .map(|(v, (n, _))| (n.clone(), v.clone()))?; + + self.definitions.remove(&removed_version); + let deprecated = self + .workflows + .unregister_custom_workflow(&operation, &removed_version); + if deprecated { + info!("The user defined workflow for the '{operation}' operation has been removed"); + Some(operation) + } else { + None + } + } + /// Copy the workflow definition file to the persisted state directory, /// unless this has already been done. async fn persist_workflow_definition( @@ -175,13 +222,14 @@ impl WorkflowRepository { operation: &OperationName, version: &WorkflowVersion, ) { - if let Some(source) = self.definitions.get(version) { + if let Some((_, source)) = self.definitions.get(version) { if !source.starts_with(&self.state_dir) { let target = self.state_dir.join(operation).with_extension("toml"); if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { error!("Fail to persist a copy of {source} as {target}: {err}"); } else { - self.definitions.insert(version.clone(), target); + self.definitions + .insert(version.clone(), (operation.clone(), target)); } } } @@ -203,6 +251,16 @@ impl WorkflowRepository { self.workflows.capability_messages(schema, target) } + pub fn deregistration_message( + &self, + schema: &MqttSchema, + target: &EntityTopicId, + operation: &OperationName, + ) -> MqttMessage { + self.workflows + .deregistration_message(schema, target, operation) + } + /// Update the state of the command board on reception of a message sent by a peer over MQTT /// /// If this is the first time a command of that type is created, diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 3154fe2d11d..0ec47aecb3f 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -1,3 +1,4 @@ +use crate::mqtt_topics::Channel; use crate::workflow::*; use ::log::info; use on_disk::OnDiskCommandBoard; @@ -38,28 +39,34 @@ impl WorkflowSupervisor { ) -> Result<(), WorkflowRegistrationError> { let operation = workflow.operation.clone(); if let Some(versions) = self.workflows.get_mut(&operation) { - if version == versions.current || versions.workflows.contains_key(&version) { - // Already registered - return Ok(()); - } - - if workflow.built_in { - info!("The built-in {operation} operation has been customized",); - return Ok(()); - } - - versions.workflows.insert(version.clone(), workflow); - versions.current = version + versions.add(version, workflow); } else { - let versions = WorkflowVersions { - current: version.clone(), - workflows: HashMap::from([(version, workflow)]), - }; + let versions = WorkflowVersions::new(version, workflow); self.workflows.insert(operation, versions); } Ok(()) } + /// Register a user-defined workflow + /// + /// Return true is this was the last version for that operation. + pub fn unregister_custom_workflow( + &mut self, + operation: &OperationName, + version: &WorkflowVersion, + ) -> bool { + let operation = OperationType::from(operation.as_str()); + if let Some(versions) = self.workflows.get_mut(&operation) { + versions.remove(version); + } + if self.workflows.get(&operation).map(|v| v.is_empty()) == Some(true) { + self.workflows.remove(&operation); + true + } else { + false + } + } + /// The set of pending commands pub fn pending_commands(&self) -> &CommandBoard { &self.commands @@ -93,6 +100,22 @@ impl WorkflowSupervisor { .collect() } + pub fn deregistration_message( + &self, + schema: &MqttSchema, + target: &EntityTopicId, + operation: &OperationName, + ) -> MqttMessage { + let operation = OperationType::from(operation.as_str()); + let topic = schema.topic_for(target, &Channel::CommandMetadata { operation }); + MqttMessage { + topic, + payload: "".to_string().into(), + qos: QoS::AtLeastOnce, + retain: true, + } + } + /// Update the state of the command board on reception of a message sent by a peer over MQTT /// /// Return the new CommandRequest state if any. @@ -251,6 +274,53 @@ impl WorkflowSupervisor { } impl WorkflowVersions { + fn new(version: WorkflowVersion, workflow: OperationWorkflow) -> Self { + WorkflowVersions { + current: version.clone(), + workflows: HashMap::from([(version, workflow)]), + } + } + + fn add(&mut self, version: WorkflowVersion, workflow: OperationWorkflow) { + if version == self.current || self.workflows.contains_key(&version) { + // Already registered + return; + } + + // FIXME builtin registration + if workflow.built_in { + info!( + "The built-in {operation} operation has been customized", + operation = workflow.operation + ); + return; + } + + self.workflows.insert(version.clone(), workflow); + self.current = version + } + + // Remove a version from this list of versions. + // + // Return Some updated list when there is still one version + // Return None if there is no more version for that operation. + fn remove(&mut self, version: &WorkflowVersion) { + self.workflows.remove(version); + if &self.current == version { + if self.workflows.contains_key("builtin") { + self.current = "builtin".to_string() + } else { + // FIXME + self.current = "".to_string() + } + } + } + + fn is_empty(&self) -> bool { + // FIXME + self.current.is_empty() + } + fn get( &self, operation: &OperationName, From 434f8dbf016ff14a99902ef2ff15d02cc1155525 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Mon, 14 Oct 2024 16:16:00 +0200 Subject: [PATCH 09/16] Restore builtin definition when a user defined workflow is removed Signed-off-by: Didier Wenzek --- .../src/operation_workflows/persist.rs | 27 ++- crates/core/tedge_api/src/workflow/error.rs | 3 + crates/core/tedge_api/src/workflow/mod.rs | 6 - .../core/tedge_api/src/workflow/supervisor.rs | 181 +++++++++++++----- 4 files changed, 151 insertions(+), 66 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 038cff42b22..b258db8ef63 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; +use tedge_api::workflow::supervisor::WorkflowSource; use tedge_api::workflow::CommandBoard; use tedge_api::workflow::GenericCommandState; use tedge_api::workflow::IllFormedOperationWorkflow; @@ -65,11 +66,12 @@ impl WorkflowRepository { } pub async fn load(&mut self) { - // Note that the loading order matters. - // First, all the user-defined workflows are loaded let dir_path = &self.custom_workflows_dir.clone(); - if let Err(err) = self.load_operation_workflows(dir_path).await { + if let Err(err) = self + .load_operation_workflows(WorkflowSource::UserDefined, dir_path) + .await + { error!("Fail to read the operation workflows from {dir_path}: {err:?}"); } @@ -78,7 +80,10 @@ impl WorkflowRepository { // so the known location of this definition is the copy not the original let dir_path = &self.state_dir.clone(); let _ = tokio::fs::create_dir(dir_path).await; // if the creation fails, this will be reported next line on read - if let Err(err) = self.load_operation_workflows(dir_path).await { + if let Err(err) = self + .load_operation_workflows(WorkflowSource::InUseCopy, dir_path) + .await + { error!("Fail to reload the running operation workflows from {dir_path}: {err:?}"); } @@ -88,6 +93,7 @@ impl WorkflowRepository { async fn load_operation_workflows( &mut self, + source: WorkflowSource, dir_path: &Utf8PathBuf, ) -> Result<(), anyhow::Error> { for entry in dir_path.read_dir_utf8()?.flatten() { @@ -96,7 +102,7 @@ impl WorkflowRepository { match read_operation_workflow(file) .await .and_then(|(workflow, version)| { - self.load_operation_workflow(file.into(), workflow, version) + self.load_operation_workflow(source, file.into(), workflow, version) }) { Ok(cmd) => { info!( @@ -114,6 +120,7 @@ impl WorkflowRepository { fn load_operation_workflow( &mut self, + source: WorkflowSource, definition: Utf8PathBuf, workflow: OperationWorkflow, version: WorkflowVersion, @@ -121,7 +128,8 @@ impl WorkflowRepository { let operation_name = workflow.operation.to_string(); self.definitions .insert(version.clone(), (operation_name.clone(), definition)); - self.workflows.register_custom_workflow(workflow, version)?; + self.workflows + .register_custom_workflow(source, workflow, version)?; Ok(operation_name) } @@ -178,7 +186,12 @@ impl WorkflowRepository { async fn reload_operation_workflow(&mut self, path: &Utf8PathBuf) { match read_operation_workflow(path).await { Ok((workflow, version)) => { - if let Ok(cmd) = self.load_operation_workflow(path.clone(), workflow, version) { + if let Ok(cmd) = self.load_operation_workflow( + WorkflowSource::UserDefined, + path.clone(), + workflow, + version, + ) { info!("Using the updated operation workflow definition from {path} for '{cmd}' operation"); } } diff --git a/crates/core/tedge_api/src/workflow/error.rs b/crates/core/tedge_api/src/workflow/error.rs index d85bbe4a1c8..73dc53367e1 100644 --- a/crates/core/tedge_api/src/workflow/error.rs +++ b/crates/core/tedge_api/src/workflow/error.rs @@ -85,6 +85,9 @@ pub enum WorkflowExecutionError { #[error("Unknown version for the {operation} operation: {version}")] UnknownVersion { operation: String, version: String }, + #[error("A new command cannot be created for the deprecated {operation} operation")] + DeprecatedOperation { operation: String }, + #[error("No command has been initiated on the command topic: {topic}")] UnknownRequest { topic: String }, diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index 9a2d921d232..f005ba275cb 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -35,9 +35,6 @@ pub struct OperationWorkflow { /// The operation to which this workflow applies pub operation: OperationType, - /// Mark this workflow as built_in - pub built_in: bool, - /// Default action outcome handlers pub handlers: DefaultHandlers, @@ -237,7 +234,6 @@ impl OperationWorkflow { Ok(OperationWorkflow { operation, - built_in: false, handlers, states, }) @@ -269,7 +265,6 @@ impl OperationWorkflow { .collect(); OperationWorkflow { - built_in: true, operation, handlers: DefaultHandlers::default(), states, @@ -294,7 +289,6 @@ impl OperationWorkflow { .collect(); OperationWorkflow { - built_in: true, operation: operation.as_str().into(), handlers: DefaultHandlers::default(), states, diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 0ec47aecb3f..ab92b3b69f4 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -14,11 +14,6 @@ pub struct WorkflowSupervisor { commands: CommandBoard, } -struct WorkflowVersions { - current: WorkflowVersion, - workflows: HashMap, -} - impl WorkflowSupervisor { /// Register a builtin workflow provided by thin-edge pub fn register_builtin_workflow( @@ -26,6 +21,7 @@ impl WorkflowSupervisor { operation: OperationType, ) -> Result<(), WorkflowRegistrationError> { self.register_custom_workflow( + WorkflowSource::BuiltIn, OperationWorkflow::built_in(operation), "builtin".to_string(), ) @@ -34,20 +30,21 @@ impl WorkflowSupervisor { /// Register a user-defined workflow pub fn register_custom_workflow( &mut self, + source: WorkflowSource, workflow: OperationWorkflow, version: WorkflowVersion, ) -> Result<(), WorkflowRegistrationError> { let operation = workflow.operation.clone(); if let Some(versions) = self.workflows.get_mut(&operation) { - versions.add(version, workflow); + versions.add(source, version, workflow); } else { - let versions = WorkflowVersions::new(version, workflow); + let versions = WorkflowVersions::new(source, version, workflow); self.workflows.insert(operation, versions); } Ok(()) } - /// Register a user-defined workflow + /// Un-register a user-defined workflow /// /// Return true is this was the last version for that operation. pub fn unregister_custom_workflow( @@ -124,7 +121,7 @@ impl WorkflowSupervisor { operation: &OperationType, command_state: GenericCommandState, ) -> Result, WorkflowExecutionError> { - let Some(workflow_versions) = self.workflows.get(operation) else { + let Some(workflow_versions) = self.workflows.get_mut(operation) else { return Err(WorkflowExecutionError::UnknownOperation { operation: operation.to_string(), }); @@ -135,9 +132,15 @@ impl WorkflowSupervisor { Ok(Some(command_state)) } else if command_state.is_init() { // This is a new command request - let command_state = command_state.set_workflow_version(&workflow_versions.current); - self.commands.insert(command_state.clone())?; - Ok(Some(command_state)) + if let Some(current_version) = workflow_versions.use_current_version() { + let command_state = command_state.set_workflow_version(current_version); + self.commands.insert(command_state.clone())?; + Ok(Some(command_state)) + } else { + return Err(WorkflowExecutionError::DeprecatedOperation { + operation: operation.to_string(), + }); + } } else { // Ignore command updates published over MQTT // @@ -166,7 +169,7 @@ impl WorkflowSupervisor { .ok_or(WorkflowExecutionError::UnknownOperation { operation: operation_name.clone(), }) - .and_then(|versions| versions.get(&operation_name, version.as_ref())) + .and_then(|versions| versions.get(version.as_ref())) .and_then(|workflow| workflow.get_action(command_state)) } @@ -273,70 +276,142 @@ impl WorkflowSupervisor { } } +/// A stack of known versions for a workflow +/// +/// In practice, one might have 3 concurrent versions: +/// +/// - The current version, i.e. the version to be used for new operation instances. +/// - One in-use version, that was used by some operation instance when the current version has been updated. +/// - The builtin version provided by the agent and which can be overridden by the users. +/// +/// None of these versions are mandatory. For instance, an operation can have no builtin version, +/// no current version (because the user removed the definition file), but a version still in-use +/// (that has been started before the user deprecated the operation). +struct WorkflowVersions { + operation: OperationName, + current: Option, + builtin: Option, + in_use: Option, + versions: HashMap, +} + +#[derive(Copy, Clone)] +pub enum WorkflowSource { + BuiltIn, + UserDefined, + InUseCopy, +} + +use WorkflowSource::*; + impl WorkflowVersions { - fn new(version: WorkflowVersion, workflow: OperationWorkflow) -> Self { + fn new(source: WorkflowSource, version: WorkflowVersion, workflow: OperationWorkflow) -> Self { + let operation = workflow.operation.to_string(); + + let current = match source { + BuiltIn | UserDefined => Some(version.clone()), + InUseCopy => None, + }; + let builtin = match source { + BuiltIn => Some(version.clone()), + UserDefined | InUseCopy => None, + }; + let in_use = match source { + InUseCopy => Some(version.clone()), + UserDefined | BuiltIn => None, + }; + let versions = HashMap::from([(version, workflow)]); WorkflowVersions { - current: version.clone(), - workflows: HashMap::from([(version, workflow)]), + operation, + current, + builtin, + in_use, + versions, } } - fn add(&mut self, version: WorkflowVersion, workflow: OperationWorkflow) { - if version == self.current || self.workflows.contains_key(&version) { - // Already registered - return; - } + fn add( + &mut self, + source: WorkflowSource, + version: WorkflowVersion, + workflow: OperationWorkflow, + ) { + match source { + BuiltIn => { + self.builtin = Some(version.clone()); + + if self.current.is_none() { + self.current = Some(version.clone()); + } else { + info!( + "The built-in {operation} operation has been customized", + operation = workflow.operation + ); + } + } - // FIXME builtin registration - if workflow.built_in { - info!( - "The built-in {operation} operation has been customized", - operation = workflow.operation - ); - return; + UserDefined => { + self.current = Some(version.clone()); + if self.builtin.is_some() { + info!( + "The built-in {operation} operation has been customized", + operation = workflow.operation + ); + } + } + + InUseCopy => self.in_use = Some(version.clone()), } - self.workflows.insert(version.clone(), workflow); - self.current = version + self.versions.insert(version.clone(), workflow); } - // Remove a version from this list of versions. - // - // Return Some updated list when there is still one version - // Return None if there is no more version for that operation. - fn remove(&mut self, version: &WorkflowVersion) { - self.workflows.remove(version); - if &self.current == version { - if self.workflows.contains_key("builtin") { - self.current = "builtin".to_string() - } else { - // FIXME - self.current = "".to_string() + // Mark the current version as being in-use. + fn use_current_version(&mut self) -> Option<&WorkflowVersion> { + if self.current.is_some() && self.in_use != self.current { + if let Some(old_version) = self.in_use.as_ref() { + self.versions.remove(old_version); } + self.in_use.clone_from(&self.current); } + self.current.as_ref() + } + + // Remove a version from this list of versions, restoring the built-in version if any + fn remove(&mut self, version: &WorkflowVersion) { + self.versions.remove(version); + self.current.clone_from(&self.builtin); } fn is_empty(&self) -> bool { - // FIXME - self.current.is_empty() + self.versions.is_empty() } fn get( &self, - operation: &OperationName, version: Option<&WorkflowVersion>, ) -> Result<&OperationWorkflow, WorkflowExecutionError> { - let version = version.unwrap_or(&self.current); - self.workflows - .get(version) - .ok_or(WorkflowExecutionError::UnknownVersion { - operation: operation.clone(), - version: version.to_string(), - }) + match version { + None => self.current_workflow(), + Some(version) => self.find(version), + } + .ok_or(WorkflowExecutionError::UnknownVersion { + operation: self.operation.clone(), + version: version + .or(self.current.as_ref()) + .unwrap_or(&"current".to_string()) + .to_string(), + }) } fn current_workflow(&self) -> Option<&OperationWorkflow> { - self.workflows.get(&self.current) + self.current + .as_ref() + .and_then(|version| self.versions.get(version)) + } + + fn find(&self, version: &WorkflowVersion) -> Option<&OperationWorkflow> { + self.versions.get(version) } } From 2f96370cdfddc9f6e0fa4b8a6e8fb07df94cf4d0 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 15 Oct 2024 10:10:41 +0200 Subject: [PATCH 10/16] Update capability messages on operation workflow updates --- .../src/operation_workflows/actor.rs | 16 ++-- .../src/operation_workflows/persist.rs | 84 ++++++++++++++----- .../core/tedge_api/src/workflow/supervisor.rs | 45 ++++++++-- 3 files changed, 105 insertions(+), 40 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index 170a4c02a2d..483e16266e9 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -86,18 +86,16 @@ impl Actor for WorkflowActor { self.publish_builtin_capability(operation, payload).await?; } AgentInput::FsWatchEvent(file_update) => { - if let Some(deprecated_operation) = self + if let Some(updated_capability) = self .workflow_repository - .update_operation_workflows(file_update) + .update_operation_workflows( + &self.mqtt_schema, + &self.device_topic_id, + file_update, + ) .await { - let deprecated_capability = - self.workflow_repository.deregistration_message( - &self.mqtt_schema, - &self.device_topic_id, - &deprecated_operation, - ); - self.mqtt_publisher.send(deprecated_capability).await? + self.mqtt_publisher.send(updated_capability).await? } } } diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index b258db8ef63..ff7f1ff7bb7 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -41,7 +41,8 @@ pub struct WorkflowRepository { // // For a fresh new workflow definition, this points to the user-defined file // When the workflow definition is in use, this points to a copy in the state directory. - definitions: HashMap, + definitions: HashMap, + in_use_copies: HashMap, // The in-memory representation of all the workflows (builtin, user-defined, i,n-use). workflows: WorkflowSupervisor, @@ -56,11 +57,13 @@ impl WorkflowRepository { let workflows = WorkflowSupervisor::default(); let state_dir = state_dir.join("workflows-in-use"); let definitions = HashMap::new(); + let in_use_copies = HashMap::new(); Self { builtin_workflows, custom_workflows_dir, state_dir, definitions, + in_use_copies, workflows, } } @@ -126,8 +129,18 @@ impl WorkflowRepository { version: WorkflowVersion, ) -> Result { let operation_name = workflow.operation.to_string(); - self.definitions - .insert(version.clone(), (operation_name.clone(), definition)); + match source { + WorkflowSource::UserDefined => { + self.definitions + .insert(version.clone(), (operation_name.clone(), definition)); + } + WorkflowSource::InUseCopy => { + self.in_use_copies + .insert(operation_name.clone(), version.clone()); + } + WorkflowSource::BuiltIn => {} + }; + self.workflows .register_custom_workflow(source, workflow, version)?; Ok(operation_name) @@ -149,17 +162,21 @@ impl WorkflowRepository { /// Return the operation capability deregistration message when the operation has been deprecated. pub async fn update_operation_workflows( &mut self, + schema: &MqttSchema, + target: &EntityTopicId, file_update: FsWatchEvent, - ) -> Option { + ) -> Option { match file_update { FsWatchEvent::Modified(path) | FsWatchEvent::FileCreated(path) => { if let Ok(path) = Utf8PathBuf::try_from(path) { if self.is_user_defined(&path) { + // Checking the path exists as FsWatchEvent returns misleading Modified events along FileDeleted events. if path.exists() { - self.reload_operation_workflow(&path).await - } else { - // FsWatchEvent returns misleading Modified events along FileDeleted events. - return self.remove_operation_workflow(&path).await; + return self.reload_operation_workflow(&path).await.and_then( + |updated_operation| { + self.capability_message(schema, target, &updated_operation) + }, + ); } } } @@ -168,7 +185,11 @@ impl WorkflowRepository { FsWatchEvent::FileDeleted(path) => { if let Ok(path) = Utf8PathBuf::try_from(path) { if self.is_user_defined(&path) { - return self.remove_operation_workflow(&path).await; + return self.remove_operation_workflow(&path).await.map( + |deprecated_operation| { + self.deregistration_message(schema, target, &deprecated_operation) + }, + ); } } } @@ -183,7 +204,10 @@ impl WorkflowRepository { path.extension() == Some("toml") && path.parent() == Some(&self.custom_workflows_dir) } - async fn reload_operation_workflow(&mut self, path: &Utf8PathBuf) { + /// Reload a user defined workflow. + /// + /// Return the operation name if this is a new operation or workflow version. + async fn reload_operation_workflow(&mut self, path: &Utf8PathBuf) -> Option { match read_operation_workflow(path).await { Ok((workflow, version)) => { if let Ok(cmd) = self.load_operation_workflow( @@ -193,12 +217,14 @@ impl WorkflowRepository { version, ) { info!("Using the updated operation workflow definition from {path} for '{cmd}' operation"); + return Some(cmd); } } Err(err) => { error!("Fail to reload {path}: {err}") } } + None } /// Remove a user defined workflow. @@ -215,16 +241,16 @@ impl WorkflowRepository { .iter() .find(|(_, (_, p))| p == removed_path) .map(|(v, (n, _))| (n.clone(), v.clone()))?; - self.definitions.remove(&removed_version); let deprecated = self .workflows .unregister_custom_workflow(&operation, &removed_version); - if deprecated { + if matches!(deprecated, Some(WorkflowSource::BuiltIn)) { + info!("The builtin workflow for the '{operation}' operation has been restored"); + None + } else { info!("The user defined workflow for the '{operation}' operation has been removed"); Some(operation) - } else { - None } } @@ -235,15 +261,18 @@ impl WorkflowRepository { operation: &OperationName, version: &WorkflowVersion, ) { + if let Some(in_use_version) = self.in_use_copies.get(operation) { + if in_use_version == version { + return; + } + }; if let Some((_, source)) = self.definitions.get(version) { - if !source.starts_with(&self.state_dir) { - let target = self.state_dir.join(operation).with_extension("toml"); - if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { - error!("Fail to persist a copy of {source} as {target}: {err}"); - } else { - self.definitions - .insert(version.clone(), (operation.clone(), target)); - } + let target = self.state_dir.join(operation).with_extension("toml"); + if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { + error!("Fail to persist a copy of {source} as {target}: {err}"); + } else { + self.in_use_copies + .insert(operation.clone(), version.clone()); } } } @@ -264,7 +293,16 @@ impl WorkflowRepository { self.workflows.capability_messages(schema, target) } - pub fn deregistration_message( + fn capability_message( + &self, + schema: &MqttSchema, + target: &EntityTopicId, + operation: &OperationName, + ) -> Option { + self.workflows.capability_message(schema, target, operation) + } + + fn deregistration_message( &self, schema: &MqttSchema, target: &EntityTopicId, diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index ab92b3b69f4..8da5e42060a 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -46,22 +46,31 @@ impl WorkflowSupervisor { /// Un-register a user-defined workflow /// - /// Return true is this was the last version for that operation. + /// Return None is this was the last version for that operation. + /// Return Some(BuiltIn) is there is a builtin definition + /// Return Some(InUseCopy) if the workflow has been deprecated but there is still a running command. pub fn unregister_custom_workflow( &mut self, operation: &OperationName, version: &WorkflowVersion, - ) -> bool { + ) -> Option { let operation = OperationType::from(operation.as_str()); if let Some(versions) = self.workflows.get_mut(&operation) { versions.remove(version); } - if self.workflows.get(&operation).map(|v| v.is_empty()) == Some(true) { + + let current_source = match self.workflows.get(&operation) { + None => None, + Some(version) if version.is_empty() => None, + Some(version) if version.is_builtin() => Some(BuiltIn), + Some(_) => Some(InUseCopy), + }; + + if current_source.is_none() { self.workflows.remove(&operation); - true - } else { - false } + + current_source } /// The set of pending commands @@ -97,6 +106,19 @@ impl WorkflowSupervisor { .collect() } + pub fn capability_message( + &self, + schema: &MqttSchema, + target: &EntityTopicId, + operation: &OperationName, + ) -> Option { + let operation = OperationType::from(operation.as_str()); + self.workflows + .get(&operation) + .and_then(|versions| versions.current_workflow()) + .and_then(|workflow| workflow.capability_message(schema, target)) + } + pub fn deregistration_message( &self, schema: &MqttSchema, @@ -369,8 +391,11 @@ impl WorkflowVersions { // Mark the current version as being in-use. fn use_current_version(&mut self) -> Option<&WorkflowVersion> { if self.current.is_some() && self.in_use != self.current { - if let Some(old_version) = self.in_use.as_ref() { - self.versions.remove(old_version); + // remove the previous version in-use unless this is the builtin version + if let Some(previous_version) = self.in_use.as_ref() { + if Some(previous_version) != self.builtin.as_ref() { + self.versions.remove(previous_version); + } } self.in_use.clone_from(&self.current); } @@ -387,6 +412,10 @@ impl WorkflowVersions { self.versions.is_empty() } + fn is_builtin(&self) -> bool { + self.builtin.is_some() + } + fn get( &self, version: Option<&WorkflowVersion>, From 79fc2692675b8be433521974ed38759335f87353 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Mon, 14 Oct 2024 18:35:01 +0200 Subject: [PATCH 11/16] Test reloading workflows at runtime Signed-off-by: Didier Wenzek --- .../dynamic_workflow_reloading.robot | 98 +++++++++++++++++++ .../workflows/user-command-v1.toml | 15 +++ .../workflows/user-command-v2.toml | 15 +++ 3 files changed, 128 insertions(+) create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/user-command-v1.toml create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/user-command-v2.toml diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot new file mode 100644 index 00000000000..6441de1e2a8 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot @@ -0,0 +1,98 @@ +*** Settings *** +Resource ../../../resources/common.resource +Library OperatingSystem +Library JSONLibrary +Library ThinEdgeIO +Library Cumulocity + +Suite Setup Custom Setup +Test Setup Custom Test Setup +Test Teardown Get Logs + +Test Tags theme:tedge_agent + + +*** Test Cases *** +Create User-Defined Operation + ThinEdgeIO.File Should Not Exist /etc/tedge/operations/user-command.toml + ThinEdgeIO.Transfer To Device ${CURDIR}/user-command-v1.toml /etc/tedge/operations/user-command.toml + ${capability} Should Have MQTT Messages te/device/main///cmd/user-command + Should Be Equal ${capability[0]} {} + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/user-command/dyn-test-1 '{"status":"init"}' + Should Have MQTT Messages + ... te/device/main///cmd/user-command/dyn-test-1 + ... message_pattern=.*successful.* + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-user-command-dyn-test-1.log + Should Contain + ... ${workflow_log} + ... item="@version":"37d0861e3038b34e8ab2ffe3257dd9372213ed5e17ba352e5028b0bf9762a089" + Should Contain ${workflow_log} item="user-command":"first-version" + +Update User-Defined Operation + ThinEdgeIO.File Should Exist /etc/tedge/operations/user-command.toml + ThinEdgeIO.Transfer To Device ${CURDIR}/user-command-v2.toml /etc/tedge/operations/user-command.toml + ${capability} Should Have MQTT Messages te/device/main///cmd/user-command + Should Be Equal ${capability[0]} {} + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/user-command/dyn-test-2 '{"status":"init"}' + Should Have MQTT Messages + ... te/device/main///cmd/user-command/dyn-test-2 + ... message_pattern=.*successful.* + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-user-command-dyn-test-2.log + Should Contain + ... ${workflow_log} + ... item="@version":"1370727b2fcd269c91546e36651b9c727897562a5d3cc8e861a1e35f09ec82a6" + Should Contain ${workflow_log} item="user-command":"second-version" + +Remove User-Defined Operation + ThinEdgeIO.File Should Exist /etc/tedge/operations/user-command.toml + ${timestamp} Get Unix Timestamp + Execute Command rm /etc/tedge/operations/user-command.toml + ${capability} Should Have MQTT Messages te/device/main///cmd/user-command date_from=${timestamp} + Should Be Empty ${capability[0]} + +Override Builtin Operation + ThinEdgeIO.File Should Not Exist /etc/tedge/operations/software_list.toml + ThinEdgeIO.Transfer To Device ${CURDIR}/software_list.toml /etc/tedge/operations/software_list.toml + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/software_list/dyn-test-4 '{"status":"init"}' + Should Have MQTT Messages + ... te/device/main///cmd/software_list/dyn-test-4 + ... message_pattern=.*successful.* + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-software_list-dyn-test-4.log + Should Contain + ... ${workflow_log} + ... item="@version":"76e9afe834b4a7cadc9029670ba76745fcda73784f9e78c09f0c0416f7f58ad2" + +Recover Builtin Operation + ThinEdgeIO.File Should Exist /etc/tedge/operations/software_list.toml + Execute Command rm /etc/tedge/operations/software_list.toml + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/software_list/dyn-test-5 '{"status":"init"}' + Should Have MQTT Messages + ... te/device/main///cmd/software_list/dyn-test-5 + ... message_pattern=.*successful.* + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-software_list-dyn-test-5.log + Should Contain ${workflow_log} item="@version":"builtin" + + +*** Keywords *** +Custom Setup + ${DEVICE_SN} Setup + Set Suite Variable $DEVICE_SN + Device Should Exist ${DEVICE_SN} + Copy Scripts + +Custom Test Setup + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/user-command/dyn-test-1 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/user-command/dyn-test-2 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/software_list/dyn-test-4 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/software_list/dyn-test-5 '' + +Copy Scripts + ThinEdgeIO.Transfer To Device ${CURDIR}/echo-as-json.sh /etc/tedge/operations/ diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/user-command-v1.toml b/tests/RobotFramework/tests/tedge_agent/workflows/user-command-v1.toml new file mode 100644 index 00000000000..ad3132ed1be --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/user-command-v1.toml @@ -0,0 +1,15 @@ +operation = "user-command" + +[init] +action = "proceed" +on_success = "executing" + +[executing] +script = "/etc/tedge/operations/echo-as-json.sh user-command first-version" +on_success = "successful" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/user-command-v2.toml b/tests/RobotFramework/tests/tedge_agent/workflows/user-command-v2.toml new file mode 100644 index 00000000000..08fe141c558 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/user-command-v2.toml @@ -0,0 +1,15 @@ +operation = "user-command" + +[init] +action = "proceed" +on_success = "executing" + +[executing] +script = "/etc/tedge/operations/echo-as-json.sh user-command second-version" +on_success = "successful" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" From 1467f18650ec72e2f25bc88ed272089b1f0fca20 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 17 Oct 2024 12:04:41 +0200 Subject: [PATCH 12/16] A main workflow can update a sub-workflow before using it Signed-off-by: Didier Wenzek --- .../dynamic_workflow_reloading.robot | 43 +++++++++++++++++++ .../workflows/update-user-command.toml | 29 +++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot index 6441de1e2a8..c61ec47f6b5 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot +++ b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot @@ -52,6 +52,25 @@ Remove User-Defined Operation ${capability} Should Have MQTT Messages te/device/main///cmd/user-command date_from=${timestamp} Should Be Empty ${capability[0]} +Updating A Workflow Twice Before Using It + ThinEdgeIO.File Should Not Exist /etc/tedge/operations/user-command.toml + ThinEdgeIO.Transfer To Device ${CURDIR}/user-command-v1.toml /etc/tedge/operations/user-command.toml + ${capability} Should Have MQTT Messages te/device/main///cmd/user-command + Should Be Equal ${capability[0]} {} + ThinEdgeIO.Transfer To Device ${CURDIR}/user-command-v2.toml /etc/tedge/operations/user-command.toml + ${capability} Should Have MQTT Messages te/device/main///cmd/user-command + Should Be Equal ${capability[0]} {} + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/user-command/dyn-test-3 '{"status":"init"}' + Should Have MQTT Messages + ... te/device/main///cmd/user-command/dyn-test-3 + ... message_pattern=.*successful.* + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-user-command-dyn-test-3.log + Should Contain + ... ${workflow_log} + ... item="@version":"1370727b2fcd269c91546e36651b9c727897562a5d3cc8e861a1e35f09ec82a6" + Should Contain ${workflow_log} item="user-command":"second-version" + Override Builtin Operation ThinEdgeIO.File Should Not Exist /etc/tedge/operations/software_list.toml ThinEdgeIO.Transfer To Device ${CURDIR}/software_list.toml /etc/tedge/operations/software_list.toml @@ -76,6 +95,26 @@ Recover Builtin Operation ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-software_list-dyn-test-5.log Should Contain ${workflow_log} item="@version":"builtin" +Trigger Workflow Update From A Main Workflow + # Enable user-command v1 and prepare v2 + ThinEdgeIO.Transfer To Device ${CURDIR}/user-command-v1.toml /etc/tedge/operations/user-command.toml + ThinEdgeIO.Transfer To Device ${CURDIR}/user-command-v2.toml /etc/tedge/operations/user-command.toml.v2 + ThinEdgeIO.Transfer To Device + ... ${CURDIR}/update-user-command.toml + ... /etc/tedge/operations/update-user-command.toml + ${capability} Should Have MQTT Messages te/device/main///cmd/update-user-command + Should Be Equal ${capability[0]} {} + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/update-user-command/dyn-test-6 '{"status":"init"}' + Should Have MQTT Messages + ... te/device/main///cmd/update-user-command/dyn-test-6 + ... message_pattern=.*successful.* + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-update-user-command-dyn-test-6.log + Should Contain + ... ${workflow_log} + ... item="user_command_version":"1370727b2fcd269c91546e36651b9c727897562a5d3cc8e861a1e35f09ec82a6" + Should Contain ${workflow_log} item="user-command":"second-version" + *** Keywords *** Custom Setup @@ -90,9 +129,13 @@ Custom Test Setup Execute Command ... tedge mqtt pub --retain te/device/main///cmd/user-command/dyn-test-2 '' Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/user-command/dyn-test-3 '' + Execute Command ... tedge mqtt pub --retain te/device/main///cmd/software_list/dyn-test-4 '' Execute Command ... tedge mqtt pub --retain te/device/main///cmd/software_list/dyn-test-5 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/update-user-command/dyn-test-6 '' Copy Scripts ThinEdgeIO.Transfer To Device ${CURDIR}/echo-as-json.sh /etc/tedge/operations/ diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml b/tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml new file mode 100644 index 00000000000..d288e4d1095 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml @@ -0,0 +1,29 @@ +operation = "update-user-command" + +[init] +action = "proceed" +on_success = "update" + +[update] +script = "mv /etc/tedge/operations/user-command.toml.v2 /etc/tedge/operations/user-command.toml" +on_success = "wait" + +[wait] +script = "sleep 1" +on_success = "execute" + +[execute] +operation = "user-command" +on_exec = "executing" + +[executing] +action = "await-operation-completion" +on_success = "successful" +output.user_command = "${.payload.user_command}" +output.user_command_version = "${.payload.@version}" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" From 651a0b7be5bae74b2b0879eee0370c14b5bc616e Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 17 Oct 2024 15:26:34 +0200 Subject: [PATCH 13/16] A new command instance must use the latest workflow version Signed-off-by: Didier Wenzek --- .../src/operation_workflows/persist.rs | 57 ++++++++++++++++--- .../dynamic_workflow_reloading.robot | 22 ++++++- .../workflows/update-user-command.toml | 6 +- 3 files changed, 72 insertions(+), 13 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index ff7f1ff7bb7..0f7fa7f215d 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -41,10 +41,10 @@ pub struct WorkflowRepository { // // For a fresh new workflow definition, this points to the user-defined file // When the workflow definition is in use, this points to a copy in the state directory. - definitions: HashMap, + definitions: HashMap, in_use_copies: HashMap, - // The in-memory representation of all the workflows (builtin, user-defined, i,n-use). + // The in-memory representation of all the workflows (builtin, user-defined, in-use). workflows: WorkflowSupervisor, } @@ -132,7 +132,7 @@ impl WorkflowRepository { match source { WorkflowSource::UserDefined => { self.definitions - .insert(version.clone(), (operation_name.clone(), definition)); + .insert(operation_name.clone(), (version.clone(), definition)); } WorkflowSource::InUseCopy => { self.in_use_copies @@ -167,7 +167,10 @@ impl WorkflowRepository { file_update: FsWatchEvent, ) -> Option { match file_update { - FsWatchEvent::Modified(path) | FsWatchEvent::FileCreated(path) => { + // FsWatchEvent returns a duplicated Modified events along FileCreated events. + FsWatchEvent::FileCreated(_) => {} + + FsWatchEvent::Modified(path) => { if let Ok(path) = Utf8PathBuf::try_from(path) { if self.is_user_defined(&path) { // Checking the path exists as FsWatchEvent returns misleading Modified events along FileDeleted events. @@ -240,8 +243,8 @@ impl WorkflowRepository { .definitions .iter() .find(|(_, (_, p))| p == removed_path) - .map(|(v, (n, _))| (n.clone(), v.clone()))?; - self.definitions.remove(&removed_version); + .map(|(n, (v, _))| (n.clone(), v.clone()))?; + self.definitions.remove(&operation); let deprecated = self .workflows .unregister_custom_workflow(&operation, &removed_version); @@ -266,7 +269,7 @@ impl WorkflowRepository { return; } }; - if let Some((_, source)) = self.definitions.get(version) { + if let Some((_, source)) = self.definitions.get(operation) { let target = self.state_dir.join(operation).with_extension("toml"); if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { error!("Fail to persist a copy of {source} as {target}: {err}"); @@ -277,6 +280,41 @@ impl WorkflowRepository { } } + async fn load_latest_version(&mut self, operation: &OperationName) { + if let Some((path, version, workflow)) = self.get_latest_version(operation).await { + if let Err(err) = self.load_operation_workflow( + WorkflowSource::UserDefined, + path.clone(), + workflow, + version, + ) { + error!("Fail to reload the latest version of the {operation} operation from {path}: {err:?}"); + } + } + } + + async fn get_latest_version( + &mut self, + operation: &OperationName, + ) -> Option<(Utf8PathBuf, WorkflowVersion, OperationWorkflow)> { + if let Some((version, path)) = self.definitions.get(operation) { + if let Ok((workflow, latest)) = read_operation_workflow(path).await { + if version != &latest { + return Some((path.to_owned(), latest, workflow)); + }; + }; + } else { + let path = self + .custom_workflows_dir + .join(operation) + .with_extension("toml"); + if let Ok((workflow, new)) = read_operation_workflow(&path).await { + return Some((path, new, workflow)); + }; + } + None + } + pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec { self.workflows.load_pending_commands(commands) } @@ -323,6 +361,11 @@ impl WorkflowRepository { operation: &OperationType, command_state: GenericCommandState, ) -> Result, WorkflowExecutionError> { + if command_state.is_init() { + // A new command instance must use the latest on-disk version of the operation workflow + self.load_latest_version(&operation.to_string()).await; + } + match self .workflows .apply_external_update(operation, command_state)? diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot index c61ec47f6b5..8ccfff7d240 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot +++ b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot @@ -105,7 +105,7 @@ Trigger Workflow Update From A Main Workflow ${capability} Should Have MQTT Messages te/device/main///cmd/update-user-command Should Be Equal ${capability[0]} {} Execute Command - ... tedge mqtt pub --retain te/device/main///cmd/update-user-command/dyn-test-6 '{"status":"init"}' + ... tedge mqtt pub --retain te/device/main///cmd/update-user-command/dyn-test-6 '{"status":"init", "version":"v2"}' Should Have MQTT Messages ... te/device/main///cmd/update-user-command/dyn-test-6 ... message_pattern=.*successful.* @@ -115,6 +115,26 @@ Trigger Workflow Update From A Main Workflow ... item="user_command_version":"1370727b2fcd269c91546e36651b9c727897562a5d3cc8e861a1e35f09ec82a6" Should Contain ${workflow_log} item="user-command":"second-version" +Trigger Workflow Creation From A Main Workflow + # Assuming the update-user-command workflow is already installed + ThinEdgeIO.File Should Exist /etc/tedge/operations/update-user-command.toml + # Fully disable user-command + ${timestamp} Get Unix Timestamp + Execute Command rm /etc/tedge/operations/user-command.toml + Should Have MQTT Messages te/device/main///cmd/user-command pattern="^$" date_from=${timestamp} + # Prepare the creation of the user-command from the update-user-command workflow + ThinEdgeIO.Transfer To Device ${CURDIR}/user-command-v1.toml /etc/tedge/operations/user-command.toml.v1 + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/update-user-command/dyn-test-7 '{"status":"init", "version":"v1"}' + Should Have MQTT Messages + ... te/device/main///cmd/update-user-command/dyn-test-7 + ... message_pattern=.*successful.* + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-update-user-command-dyn-test-7.log + Should Contain + ... ${workflow_log} + ... item="user_command_version":"37d0861e3038b34e8ab2ffe3257dd9372213ed5e17ba352e5028b0bf9762a089" + Should Contain ${workflow_log} item="user-command":"first-version" + *** Keywords *** Custom Setup diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml b/tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml index d288e4d1095..fb0c69bed99 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml +++ b/tests/RobotFramework/tests/tedge_agent/workflows/update-user-command.toml @@ -5,11 +5,7 @@ action = "proceed" on_success = "update" [update] -script = "mv /etc/tedge/operations/user-command.toml.v2 /etc/tedge/operations/user-command.toml" -on_success = "wait" - -[wait] -script = "sleep 1" +script = "mv /etc/tedge/operations/user-command.toml.${.payload.version} /etc/tedge/operations/user-command.toml" on_success = "execute" [execute] From 55aef0444019588d2bf12955ebdb6a99158a4d0b Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Fri, 18 Oct 2024 19:18:26 +0200 Subject: [PATCH 14/16] Support concurrent instances with difference versions Signed-off-by: Didier Wenzek --- .../src/operation_workflows/persist.rs | 30 ++-- crates/core/tedge_api/src/workflow/error.rs | 3 + .../core/tedge_api/src/workflow/supervisor.rs | 151 ++++++++---------- .../dynamic_workflow_reloading.robot | 89 +++++++++++ .../workflows/long-running-command-v1.toml | 26 +++ .../workflows/long-running-command-v2.toml | 24 +++ .../workflows/long-running-command-v3.toml | 24 +++ .../tests/tedge_agent/workflows/sleep.toml | 24 +++ 8 files changed, 268 insertions(+), 103 deletions(-) create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v1.toml create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v2.toml create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v3.toml create mode 100644 tests/RobotFramework/tests/tedge_agent/workflows/sleep.toml diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 0f7fa7f215d..27cc691802a 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -2,6 +2,7 @@ use anyhow::Context; use camino::Utf8Path; use camino::Utf8PathBuf; use std::collections::HashMap; +use std::collections::HashSet; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; @@ -37,12 +38,13 @@ pub struct WorkflowRepository { // Directory of user-defined workflow copies of the workflow in-use state_dir: Utf8PathBuf, - // Map each workflow version to its workflow file - // - // For a fresh new workflow definition, this points to the user-defined file - // When the workflow definition is in use, this points to a copy in the state directory. + // Map each user defined workflow to its version and workflow file definitions: HashMap, - in_use_copies: HashMap, + + // Set of workflow in-use + // + // TODO Implement the cleanup logic + in_use_copies: HashSet, // The in-memory representation of all the workflows (builtin, user-defined, in-use). workflows: WorkflowSupervisor, @@ -57,7 +59,7 @@ impl WorkflowRepository { let workflows = WorkflowSupervisor::default(); let state_dir = state_dir.join("workflows-in-use"); let definitions = HashMap::new(); - let in_use_copies = HashMap::new(); + let in_use_copies = HashSet::new(); Self { builtin_workflows, custom_workflows_dir, @@ -79,8 +81,6 @@ impl WorkflowRepository { } // Then, the definitions of the workflow still in-use are loaded - // If a definition has not changed, then self.definitions is updated accordingly - // so the known location of this definition is the copy not the original let dir_path = &self.state_dir.clone(); let _ = tokio::fs::create_dir(dir_path).await; // if the creation fails, this will be reported next line on read if let Err(err) = self @@ -135,8 +135,7 @@ impl WorkflowRepository { .insert(operation_name.clone(), (version.clone(), definition)); } WorkflowSource::InUseCopy => { - self.in_use_copies - .insert(operation_name.clone(), version.clone()); + self.in_use_copies.insert(version.clone()); } WorkflowSource::BuiltIn => {} }; @@ -264,18 +263,15 @@ impl WorkflowRepository { operation: &OperationName, version: &WorkflowVersion, ) { - if let Some(in_use_version) = self.in_use_copies.get(operation) { - if in_use_version == version { - return; - } + if self.in_use_copies.contains(version) { + return; }; if let Some((_, source)) = self.definitions.get(operation) { - let target = self.state_dir.join(operation).with_extension("toml"); + let target = self.state_dir.join(version).with_extension("toml"); if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { error!("Fail to persist a copy of {source} as {target}: {err}"); } else { - self.in_use_copies - .insert(operation.clone(), version.clone()); + self.in_use_copies.insert(version.clone()); } } } diff --git a/crates/core/tedge_api/src/workflow/error.rs b/crates/core/tedge_api/src/workflow/error.rs index 73dc53367e1..82d346c6033 100644 --- a/crates/core/tedge_api/src/workflow/error.rs +++ b/crates/core/tedge_api/src/workflow/error.rs @@ -79,6 +79,9 @@ pub enum WorkflowExecutionError { #[error("Missing status in the command payload")] MissingStatus, + #[error("Missing version in the command payload")] + MissingVersion, + #[error("No workflow is defined for the operation: {operation}")] UnknownOperation { operation: String }, diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 8da5e42060a..5a5474ca3af 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -3,6 +3,7 @@ use crate::workflow::*; use ::log::info; use on_disk::OnDiskCommandBoard; use serde::Serialize; +use std::string::ToString; /// Dispatch actions to operation participants #[derive(Default)] @@ -185,13 +186,16 @@ impl WorkflowSupervisor { }); }; - let version = command_state.workflow_version(); + let Some(version) = &command_state.workflow_version() else { + return Err(WorkflowExecutionError::MissingVersion); + }; + self.workflows .get(&operation_name.as_str().into()) .ok_or(WorkflowExecutionError::UnknownOperation { operation: operation_name.clone(), }) - .and_then(|versions| versions.get(version.as_ref())) + .and_then(|versions| versions.get(version)) .and_then(|workflow| workflow.get_action(command_state)) } @@ -298,26 +302,21 @@ impl WorkflowSupervisor { } } -/// A stack of known versions for a workflow -/// -/// In practice, one might have 3 concurrent versions: +/// The set of in-use workflow versions for an operation /// -/// - The current version, i.e. the version to be used for new operation instances. -/// - One in-use version, that was used by some operation instance when the current version has been updated. -/// - The builtin version provided by the agent and which can be overridden by the users. -/// -/// None of these versions are mandatory. For instance, an operation can have no builtin version, -/// no current version (because the user removed the definition file), but a version still in-use -/// (that has been started before the user deprecated the operation). +/// - The current version is the version that will be used for a new command instance. +/// - The current version might be none. This is the case when the command has been deprecated. +/// - When a new command instance is initialized, the current version is stored as being in use. +/// - When all the commands using a given version are finalized, these copies are removed. +/// - Among all the versions, the `"builtin"` version is specific. +/// - The `"builtin"` version is never removed, and is used as the current version if none is available. struct WorkflowVersions { operation: OperationName, - current: Option, - builtin: Option, - in_use: Option, - versions: HashMap, + current: Option<(WorkflowVersion, OperationWorkflow)>, + in_use: HashMap, } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Hash, Eq, PartialEq)] pub enum WorkflowSource { BuiltIn, UserDefined, @@ -327,28 +326,23 @@ pub enum WorkflowSource { use WorkflowSource::*; impl WorkflowVersions { + const BUILT_IN: &'static str = "builtin"; + fn new(source: WorkflowSource, version: WorkflowVersion, workflow: OperationWorkflow) -> Self { let operation = workflow.operation.to_string(); - - let current = match source { - BuiltIn | UserDefined => Some(version.clone()), - InUseCopy => None, - }; - let builtin = match source { - BuiltIn => Some(version.clone()), - UserDefined | InUseCopy => None, - }; - let in_use = match source { - InUseCopy => Some(version.clone()), - UserDefined | BuiltIn => None, + let (current, in_use) = match source { + BuiltIn => ( + None, + HashMap::from([(Self::BUILT_IN.to_string(), workflow)]), + ), + UserDefined => (Some((version, workflow)), HashMap::new()), + InUseCopy => (None, HashMap::from([(version, workflow)])), }; - let versions = HashMap::from([(version, workflow)]); + WorkflowVersions { operation, current, - builtin, in_use, - versions, } } @@ -360,87 +354,72 @@ impl WorkflowVersions { ) { match source { BuiltIn => { - self.builtin = Some(version.clone()); - - if self.current.is_none() { - self.current = Some(version.clone()); - } else { - info!( - "The built-in {operation} operation has been customized", - operation = workflow.operation - ); - } + self.in_use.insert(Self::BUILT_IN.to_string(), workflow); } - UserDefined => { - self.current = Some(version.clone()); - if self.builtin.is_some() { - info!( - "The built-in {operation} operation has been customized", - operation = workflow.operation - ); - } + self.current = Some((version, workflow)); } + InUseCopy => { + self.in_use.insert(version, workflow); + } + }; - InUseCopy => self.in_use = Some(version.clone()), + if self.current.is_some() && self.in_use.contains_key(Self::BUILT_IN) { + info!( + "The built-in {operation} operation has been customized", + operation = self.operation + ); } - - self.versions.insert(version.clone(), workflow); } // Mark the current version as being in-use. fn use_current_version(&mut self) -> Option<&WorkflowVersion> { - if self.current.is_some() && self.in_use != self.current { - // remove the previous version in-use unless this is the builtin version - if let Some(previous_version) = self.in_use.as_ref() { - if Some(previous_version) != self.builtin.as_ref() { - self.versions.remove(previous_version); - } + match self.current.as_ref() { + Some((version, workflow)) => { + if !self.in_use.contains_key(version) { + self.in_use.insert(version.clone(), workflow.clone()); + }; + Some(version) } - self.in_use.clone_from(&self.current); + + None => self + .in_use + .get_key_value(Self::BUILT_IN) + .map(|(builtin, _)| builtin), } - self.current.as_ref() } - // Remove a version from this list of versions, restoring the built-in version if any + // Remove the current version from this list of versions, restoring the built-in version if any fn remove(&mut self, version: &WorkflowVersion) { - self.versions.remove(version); - self.current.clone_from(&self.builtin); + if self.current.as_ref().map(|(v, _)| v == version) == Some(true) { + self.current = None; + } else if version != Self::BUILT_IN { + self.in_use.remove(version); + } } fn is_empty(&self) -> bool { - self.versions.is_empty() + self.in_use.is_empty() } fn is_builtin(&self) -> bool { - self.builtin.is_some() + self.in_use.contains_key(Self::BUILT_IN) } - fn get( - &self, - version: Option<&WorkflowVersion>, - ) -> Result<&OperationWorkflow, WorkflowExecutionError> { - match version { - None => self.current_workflow(), - Some(version) => self.find(version), - } - .ok_or(WorkflowExecutionError::UnknownVersion { - operation: self.operation.clone(), - version: version - .or(self.current.as_ref()) - .unwrap_or(&"current".to_string()) - .to_string(), - }) + fn get(&self, version: &WorkflowVersion) -> Result<&OperationWorkflow, WorkflowExecutionError> { + self.in_use + .get(version) + .ok_or(WorkflowExecutionError::UnknownVersion { + operation: self.operation.clone(), + version: version.to_string(), + }) } fn current_workflow(&self) -> Option<&OperationWorkflow> { self.current .as_ref() - .and_then(|version| self.versions.get(version)) - } - - fn find(&self, version: &WorkflowVersion) -> Option<&OperationWorkflow> { - self.versions.get(version) + .map(|(_, workflow)| workflow) + .or_else(|| self.in_use.get(Self::BUILT_IN)) } } diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot index 8ccfff7d240..4bcdfdbc2d8 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot +++ b/tests/RobotFramework/tests/tedge_agent/workflows/dynamic_workflow_reloading.robot @@ -135,6 +135,75 @@ Trigger Workflow Creation From A Main Workflow ... item="user_command_version":"37d0861e3038b34e8ab2ffe3257dd9372213ed5e17ba352e5028b0bf9762a089" Should Contain ${workflow_log} item="user-command":"first-version" +Update Concurrently Running Versions + Update Workflow ${CURDIR}/sleep.toml sleep + # Trigger a first version of a long running command + Update Workflow ${CURDIR}/long-running-command-v1.toml long-running-command + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/long-running-command/dyn-test-8 '{"status":"init", "duration":30}' + Should Have MQTT Messages + ... te/device/main///cmd/long-running-command/dyn-test-8 + ... message_pattern=.*scheduled.* + + # Then a second version of the same long running command + Update Workflow ${CURDIR}/long-running-command-v2.toml long-running-command + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/long-running-command/dyn-test-9 '{"status":"init", "duration":30}' + Should Have MQTT Messages + ... te/device/main///cmd/long-running-command/dyn-test-9 + ... message_pattern=.*scheduled.* + + # And a third one + Update Workflow ${CURDIR}/long-running-command-v3.toml long-running-command + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/long-running-command/dyn-test-10 '{"status":"init", "duration":30}' + Should Have MQTT Messages + ... te/device/main///cmd/long-running-command/dyn-test-10 + ... message_pattern=.*scheduled.* + + # Check the 3 workflows use their original workflow version till the end + Should Have MQTT Messages + ... te/device/main///cmd/long-running-command/dyn-test-8 + ... message_pattern=.*first-version.* + ... timeout=60 + Should Have MQTT Messages + ... te/device/main///cmd/long-running-command/dyn-test-9 + ... message_pattern=.*second-version.* + ... timeout=60 + Should Have MQTT Messages + ... te/device/main///cmd/long-running-command/dyn-test-10 + ... message_pattern=.*third-version.* + ... timeout=60 + +Resume On Restart A Pending Operation Which Workflow Is Deprecated + # Trigger a long running command + Update Workflow ${CURDIR}/sleep.toml sleep + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/sleep/dyn-test-11 '{"status":"init", "duration":30}' + + # Stop the agent, once sure the command is executing + Should Have MQTT Messages + ... te/device/main///cmd/sleep/dyn-test-11 + ... message_pattern=.*executing.* + Stop Service tedge-agent + + # Make sure the long running command has not been fully executed + ${workflow_log} Execute Command cat /var/log/tedge/agent/workflow-sleep-dyn-test-11.log + Should Not Contain + ... ${workflow_log} + ... item="logging" + + # Deprecate the long running command, and restart + Execute Command rm /etc/tedge/operations/sleep.toml + Start Service tedge-agent + + # The pending long command should resume, despite the operation has been deprecated + ${messages} Should Have MQTT Messages + ... te/device/main///cmd/sleep/dyn-test-11 + ... message_pattern=.*successful.* + ... timeout=60 + Should Contain ${messages[0]} item="what a long sleep" + *** Keywords *** Custom Setup @@ -156,6 +225,26 @@ Custom Test Setup ... tedge mqtt pub --retain te/device/main///cmd/software_list/dyn-test-5 '' Execute Command ... tedge mqtt pub --retain te/device/main///cmd/update-user-command/dyn-test-6 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/update-user-command/dyn-test-7 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/long-running-command/dyn-test-8 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/long-running-command/dyn-test-9 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/long-running-command/dyn-test-10 '' + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/sleep/dyn-test-11 '' Copy Scripts ThinEdgeIO.Transfer To Device ${CURDIR}/echo-as-json.sh /etc/tedge/operations/ + +Update Workflow + [Arguments] ${FILE} ${OPERATION} + ${timestamp} Get Unix Timestamp + ThinEdgeIO.Transfer To Device ${FILE} /etc/tedge/operations/${OPERATION}.toml + Should Have MQTT Messages + ... te/device/main///cmd/${OPERATION} + ... pattern="^{}$" + ... date_from=${timestamp} + ... timeout=60 diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v1.toml b/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v1.toml new file mode 100644 index 00000000000..10134d75784 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v1.toml @@ -0,0 +1,26 @@ +operation = "long-running-command" + +[init] +action = "proceed" +on_success = "scheduled" + +[scheduled] +# Using a sub-operation for sleep to allow concurrent execution of multiple versions of this workflow. +# Using sleep command directly would have blocked each workflow, resulting in serial execution. +operation = "sleep" +input.duration = "${.payload.duration}" +on_exec = "executing" + +[executing] +action = "await-operation-completion" +on_success = "logging" + +[logging] +script = "/etc/tedge/operations/echo-as-json.sh long-running-command first-version" +on_success = "successful" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v2.toml b/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v2.toml new file mode 100644 index 00000000000..193f2407f83 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v2.toml @@ -0,0 +1,24 @@ +operation = "long-running-command" + +[init] +action = "proceed" +on_success = "scheduled" + +[scheduled] +operation = "sleep" +input.duration = "${.payload.duration}" +on_exec = "executing" + +[executing] +action = "await-operation-completion" +on_success = "logging" + +[logging] +script = "/etc/tedge/operations/echo-as-json.sh long-running-command second-version" +on_success = "successful" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v3.toml b/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v3.toml new file mode 100644 index 00000000000..19980c9eb33 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/long-running-command-v3.toml @@ -0,0 +1,24 @@ +operation = "long-running-command" + +[init] +action = "proceed" +on_success = "scheduled" + +[scheduled] +operation = "sleep" +input.duration = "${.payload.duration}" +on_exec = "executing" + +[executing] +action = "await-operation-completion" +on_success = "logging" + +[logging] +script = "/etc/tedge/operations/echo-as-json.sh long-running-command third-version" +on_success = "successful" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/sleep.toml b/tests/RobotFramework/tests/tedge_agent/workflows/sleep.toml new file mode 100644 index 00000000000..57c55b51db7 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/sleep.toml @@ -0,0 +1,24 @@ +operation = "sleep" + +[init] +action = "proceed" +on_success = "executing" + +[executing] +script = "sleep ${.payload.duration}" +on_success = "logging" +# As the purpose of this test-fixture workflow is to check that +# the same version of the workflow is used after a restart of the agent, +# we want to be sure the engine doesn't move to the failed state when the agent stops +# For that purpose, we loop on executing when interrupted. +on_kill = "executing" + +[logging] +script = "/etc/tedge/operations/echo-as-json.sh message 'what a long sleep'" +on_success = "successful" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup" From 4bf281a36abd61aa5b0d73e8329717b0b6287863 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 22 Oct 2024 18:19:28 +0200 Subject: [PATCH 15/16] Improve struct WorkflowSource A workflow source being always used with a complementary info: a file path or a workflow version, it makes sense to pack the complementary info within the WorkflowSource itself. This also highlights the corner case of the BuiltIn workflow for which there is no complementary info. Signed-off-by: Didier Wenzek --- .../src/operation_workflows/persist.rs | 39 ++++++----- .../core/tedge_api/src/workflow/supervisor.rs | 69 ++++++++++--------- 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 27cc691802a..4c429544aec 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -74,7 +74,7 @@ impl WorkflowRepository { // First, all the user-defined workflows are loaded let dir_path = &self.custom_workflows_dir.clone(); if let Err(err) = self - .load_operation_workflows(WorkflowSource::UserDefined, dir_path) + .load_operation_workflows(WorkflowSource::UserDefined(dir_path)) .await { error!("Fail to read the operation workflows from {dir_path}: {err:?}"); @@ -84,7 +84,7 @@ impl WorkflowRepository { let dir_path = &self.state_dir.clone(); let _ = tokio::fs::create_dir(dir_path).await; // if the creation fails, this will be reported next line on read if let Err(err) = self - .load_operation_workflows(WorkflowSource::InUseCopy, dir_path) + .load_operation_workflows(WorkflowSource::InUseCopy(dir_path)) .await { error!("Fail to reload the running operation workflows from {dir_path}: {err:?}"); @@ -96,16 +96,19 @@ impl WorkflowRepository { async fn load_operation_workflows( &mut self, - source: WorkflowSource, - dir_path: &Utf8PathBuf, + source: WorkflowSource<&Utf8PathBuf>, ) -> Result<(), anyhow::Error> { + let Some(dir_path) = source.inner() else { + return Ok(()); + }; for entry in dir_path.read_dir_utf8()?.flatten() { let file = entry.path(); if file.extension() == Some("toml") { match read_operation_workflow(file) .await .and_then(|(workflow, version)| { - self.load_operation_workflow(source, file.into(), workflow, version) + let file_source = source.set_inner(file.into()); + self.load_operation_workflow(file_source, workflow, version) }) { Ok(cmd) => { info!( @@ -123,25 +126,25 @@ impl WorkflowRepository { fn load_operation_workflow( &mut self, - source: WorkflowSource, - definition: Utf8PathBuf, + source: WorkflowSource, workflow: OperationWorkflow, version: WorkflowVersion, ) -> Result { let operation_name = workflow.operation.to_string(); - match source { - WorkflowSource::UserDefined => { + let version = match source { + WorkflowSource::UserDefined(definition) => { self.definitions .insert(operation_name.clone(), (version.clone(), definition)); + WorkflowSource::UserDefined(version) } - WorkflowSource::InUseCopy => { + WorkflowSource::InUseCopy(_) => { self.in_use_copies.insert(version.clone()); + WorkflowSource::InUseCopy(version) } - WorkflowSource::BuiltIn => {} + WorkflowSource::BuiltIn => WorkflowSource::BuiltIn, }; - self.workflows - .register_custom_workflow(source, workflow, version)?; + self.workflows.register_custom_workflow(version, workflow)?; Ok(operation_name) } @@ -213,8 +216,7 @@ impl WorkflowRepository { match read_operation_workflow(path).await { Ok((workflow, version)) => { if let Ok(cmd) = self.load_operation_workflow( - WorkflowSource::UserDefined, - path.clone(), + WorkflowSource::UserDefined(path.clone()), workflow, version, ) { @@ -244,10 +246,10 @@ impl WorkflowRepository { .find(|(_, (_, p))| p == removed_path) .map(|(n, (v, _))| (n.clone(), v.clone()))?; self.definitions.remove(&operation); - let deprecated = self + let builtin_restored = self .workflows .unregister_custom_workflow(&operation, &removed_version); - if matches!(deprecated, Some(WorkflowSource::BuiltIn)) { + if builtin_restored { info!("The builtin workflow for the '{operation}' operation has been restored"); None } else { @@ -279,8 +281,7 @@ impl WorkflowRepository { async fn load_latest_version(&mut self, operation: &OperationName) { if let Some((path, version, workflow)) = self.get_latest_version(operation).await { if let Err(err) = self.load_operation_workflow( - WorkflowSource::UserDefined, - path.clone(), + WorkflowSource::UserDefined(path.clone()), workflow, version, ) { diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 5a5474ca3af..96b47abc767 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -24,22 +24,20 @@ impl WorkflowSupervisor { self.register_custom_workflow( WorkflowSource::BuiltIn, OperationWorkflow::built_in(operation), - "builtin".to_string(), ) } /// Register a user-defined workflow pub fn register_custom_workflow( &mut self, - source: WorkflowSource, + version: WorkflowSource, workflow: OperationWorkflow, - version: WorkflowVersion, ) -> Result<(), WorkflowRegistrationError> { let operation = workflow.operation.clone(); if let Some(versions) = self.workflows.get_mut(&operation) { - versions.add(source, version, workflow); + versions.add(version, workflow); } else { - let versions = WorkflowVersions::new(source, version, workflow); + let versions = WorkflowVersions::new(version, workflow); self.workflows.insert(operation, versions); } Ok(()) @@ -47,31 +45,29 @@ impl WorkflowSupervisor { /// Un-register a user-defined workflow /// - /// Return None is this was the last version for that operation. - /// Return Some(BuiltIn) is there is a builtin definition - /// Return Some(InUseCopy) if the workflow has been deprecated but there is still a running command. + /// Return true if a builtin version has been restored pub fn unregister_custom_workflow( &mut self, operation: &OperationName, version: &WorkflowVersion, - ) -> Option { + ) -> bool { let operation = OperationType::from(operation.as_str()); if let Some(versions) = self.workflows.get_mut(&operation) { versions.remove(version); } - let current_source = match self.workflows.get(&operation) { - None => None, - Some(version) if version.is_empty() => None, - Some(version) if version.is_builtin() => Some(BuiltIn), - Some(_) => Some(InUseCopy), + let (empty,builtin_restored) = match self.workflows.get(&operation) { + None => (true, false), + Some(version) if version.is_empty() => (true, false), + Some(version) if version.is_builtin() => (false, true), + Some(_) => (false, false), }; - if current_source.is_none() { + if empty { self.workflows.remove(&operation); } - current_source + builtin_restored } /// The set of pending commands @@ -316,11 +312,27 @@ struct WorkflowVersions { in_use: HashMap, } -#[derive(Copy, Clone, Hash, Eq, PartialEq)] -pub enum WorkflowSource { +pub enum WorkflowSource { BuiltIn, - UserDefined, - InUseCopy, + UserDefined(T), + InUseCopy(T), +} + +impl WorkflowSource { + pub fn inner(&self) -> Option<&T> { + match self { + BuiltIn => None, + UserDefined(inner) | InUseCopy(inner) => Some(inner), + } + } + + pub fn set_inner(&self, target: U) -> WorkflowSource { + match self { + BuiltIn => BuiltIn, + UserDefined(_) => UserDefined(target), + InUseCopy(_) => InUseCopy(target), + } + } } use WorkflowSource::*; @@ -328,15 +340,15 @@ use WorkflowSource::*; impl WorkflowVersions { const BUILT_IN: &'static str = "builtin"; - fn new(source: WorkflowSource, version: WorkflowVersion, workflow: OperationWorkflow) -> Self { + fn new(source: WorkflowSource, workflow: OperationWorkflow) -> Self { let operation = workflow.operation.to_string(); let (current, in_use) = match source { BuiltIn => ( None, HashMap::from([(Self::BUILT_IN.to_string(), workflow)]), ), - UserDefined => (Some((version, workflow)), HashMap::new()), - InUseCopy => (None, HashMap::from([(version, workflow)])), + UserDefined(version) => (Some((version, workflow)), HashMap::new()), + InUseCopy(version) => (None, HashMap::from([(version, workflow)])), }; WorkflowVersions { @@ -346,20 +358,15 @@ impl WorkflowVersions { } } - fn add( - &mut self, - source: WorkflowSource, - version: WorkflowVersion, - workflow: OperationWorkflow, - ) { + fn add(&mut self, source: WorkflowSource, workflow: OperationWorkflow) { match source { BuiltIn => { self.in_use.insert(Self::BUILT_IN.to_string(), workflow); } - UserDefined => { + UserDefined(version) => { self.current = Some((version, workflow)); } - InUseCopy => { + InUseCopy(version) => { self.in_use.insert(version, workflow); } }; From d70209eb3a690b8fa98d2a238e3ab760e608843d Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 22 Oct 2024 21:15:49 +0200 Subject: [PATCH 16/16] Remove copies of in-use workflow when no more used Signed-off-by: Didier Wenzek --- .../src/operation_workflows/persist.rs | 53 +++++++++++++++---- .../core/tedge_api/src/workflow/supervisor.rs | 2 +- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 4c429544aec..91ce84991b0 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -2,7 +2,6 @@ use anyhow::Context; use camino::Utf8Path; use camino::Utf8PathBuf; use std::collections::HashMap; -use std::collections::HashSet; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; @@ -41,10 +40,8 @@ pub struct WorkflowRepository { // Map each user defined workflow to its version and workflow file definitions: HashMap, - // Set of workflow in-use - // - // TODO Implement the cleanup logic - in_use_copies: HashSet, + // Map each workflow version to the count of instance using it + in_use_copies: HashMap, // The in-memory representation of all the workflows (builtin, user-defined, in-use). workflows: WorkflowSupervisor, @@ -59,7 +56,7 @@ impl WorkflowRepository { let workflows = WorkflowSupervisor::default(); let state_dir = state_dir.join("workflows-in-use"); let definitions = HashMap::new(); - let in_use_copies = HashSet::new(); + let in_use_copies = HashMap::new(); Self { builtin_workflows, custom_workflows_dir, @@ -138,7 +135,10 @@ impl WorkflowRepository { WorkflowSource::UserDefined(version) } WorkflowSource::InUseCopy(_) => { - self.in_use_copies.insert(version.clone()); + self.in_use_copies + .entry(version.clone()) + .and_modify(|count| *count += 1) + .or_insert(1); WorkflowSource::InUseCopy(version) } WorkflowSource::BuiltIn => WorkflowSource::BuiltIn, @@ -265,19 +265,30 @@ impl WorkflowRepository { operation: &OperationName, version: &WorkflowVersion, ) { - if self.in_use_copies.contains(version) { + if let Some(count) = self.in_use_copies.get_mut(version) { + *count += 1; return; }; + if let Some((_, source)) = self.definitions.get(operation) { - let target = self.state_dir.join(version).with_extension("toml"); + let target = self.workflow_copy_path(operation, version); if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { error!("Fail to persist a copy of {source} as {target}: {err}"); } else { - self.in_use_copies.insert(version.clone()); + self.in_use_copies.insert(version.clone(), 1); } } } + fn workflow_copy_path( + &self, + operation: &OperationName, + version: &WorkflowVersion, + ) -> Utf8PathBuf { + let filename = format!("{operation}-{version}"); + self.state_dir.join(filename).with_extension("toml") + } + async fn load_latest_version(&mut self, operation: &OperationName) { if let Some((path, version, workflow)) = self.get_latest_version(operation).await { if let Err(err) = self.load_operation_workflow( @@ -290,6 +301,22 @@ impl WorkflowRepository { } } + async fn release_in_use_copy(&mut self, operation: &OperationName, version: &WorkflowVersion) { + if let Some(count) = self.in_use_copies.get_mut(version) { + *count -= 1; + if *count > 0 { + return; + } + } + + self.in_use_copies.remove(version); + + let target = self.workflow_copy_path(operation, version); + if let Err(err) = tokio::fs::remove_file(target.clone()).await { + error!("Fail to remove the workflow copy at {target}: {err}"); + } + } + async fn get_latest_version( &mut self, operation: &OperationName, @@ -361,6 +388,12 @@ impl WorkflowRepository { if command_state.is_init() { // A new command instance must use the latest on-disk version of the operation workflow self.load_latest_version(&operation.to_string()).await; + } else if command_state.is_finished() { + // Clear the cache if this happens to be the latest instance using that version of the workflow + if let Some(version) = command_state.workflow_version() { + self.release_in_use_copy(&operation.to_string(), &version) + .await; + } } match self diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 96b47abc767..e8ff1bf90d4 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -56,7 +56,7 @@ impl WorkflowSupervisor { versions.remove(version); } - let (empty,builtin_restored) = match self.workflows.get(&operation) { + let (empty, builtin_restored) = match self.workflows.get(&operation) { None => (true, false), Some(version) if version.is_empty() => (true, false), Some(version) if version.is_builtin() => (false, true),