diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 01d13efbc9..836e400078 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -3,7 +3,6 @@ use camino::Utf8Path; use camino::Utf8PathBuf; use log::error; use std::collections::HashMap; -use std::collections::HashSet; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; @@ -41,10 +40,8 @@ pub struct WorkflowRepository { // Map each user defined workflow to its version and workflow file definitions: HashMap, - // Set of workflow in-use - // - // TODO Implement the cleanup logic - in_use_copies: HashSet, + // Map each workflow version to the count of instance using it + in_use_copies: HashMap, // The in-memory representation of all the workflows (builtin, user-defined, in-use). workflows: WorkflowSupervisor, @@ -59,7 +56,7 @@ impl WorkflowRepository { let workflows = WorkflowSupervisor::default(); let state_dir = state_dir.join("workflows-in-use"); let definitions = HashMap::new(); - let in_use_copies = HashSet::new(); + let in_use_copies = HashMap::new(); Self { builtin_workflows, custom_workflows_dir, @@ -138,7 +135,10 @@ impl WorkflowRepository { WorkflowSource::UserDefined(version) } WorkflowSource::InUseCopy(_) => { - self.in_use_copies.insert(version.clone()); + self.in_use_copies + .entry(version.clone()) + .and_modify(|count| *count += 1) + .or_insert(1); WorkflowSource::InUseCopy(version) } WorkflowSource::BuiltIn => WorkflowSource::BuiltIn, @@ -265,19 +265,26 @@ impl WorkflowRepository { operation: &OperationName, version: &WorkflowVersion, ) { - if self.in_use_copies.contains(version) { + if let Some(count) = self.in_use_copies.get_mut(version) { + *count += 1; return; }; + if let Some((_, source)) = self.definitions.get(operation) { - let target = self.state_dir.join(version).with_extension("toml"); + let target = self.workflow_copy_path(operation, version); if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { error!("Fail to persist a copy of {source} as {target}: {err}"); } else { - self.in_use_copies.insert(version.clone()); + self.in_use_copies.insert(version.clone(), 1); } } } + fn workflow_copy_path(&self, operation: &OperationName, version: &WorkflowVersion) -> Utf8PathBuf { + let filename = format!("{operation}-{version}"); + self.state_dir.join(filename).with_extension("toml") + } + async fn load_latest_version(&mut self, operation: &OperationName) { if let Some((path, version, workflow)) = self.get_latest_version(operation).await { if let Err(err) = self.load_operation_workflow( @@ -290,6 +297,22 @@ impl WorkflowRepository { } } + async fn release_in_use_copy(&mut self, operation: &OperationName, version: &WorkflowVersion) { + if let Some(count) = self.in_use_copies.get_mut(version) { + *count -= 1; + if *count == 0 { + return; + } + } + + self.in_use_copies.remove(version); + + let target = self.workflow_copy_path(operation, version); + if let Err(err) = tokio::fs::remove_file(target.clone()).await { + error!("Fail to remove the workflow copy at {target}: {err}"); + } + } + async fn get_latest_version( &mut self, operation: &OperationName, @@ -361,6 +384,11 @@ impl WorkflowRepository { if command_state.is_init() { // A new command instance must use the latest on-disk version of the operation workflow self.load_latest_version(&operation.to_string()).await; + } else if command_state.is_finished() { + // Clear the cache if this happens to be the latest instance using that version of the workflow + if let Some(version) = command_state.workflow_version() { + self.release_in_use_copy(&operation.to_string(), &version).await; + } } match self diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 7f19536e93..00d2fa4be0 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -56,7 +56,7 @@ impl WorkflowSupervisor { versions.remove(version); } - let (empty,builtin_restored) = match self.workflows.get(&operation) { + let (empty, builtin_restored) = match self.workflows.get(&operation) { None => (true, false), Some(version) if version.is_empty() => (true, false), Some(version) if version.is_builtin() => (false, true),