Skip to content

Commit

Permalink
Remove copies of in-use workflow when no more used
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Oct 23, 2024
1 parent 4bf281a commit d70209e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 11 deletions.
53 changes: 43 additions & 10 deletions crates/core/tedge_agent/src/operation_workflows/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use anyhow::Context;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use std::collections::HashMap;
use std::collections::HashSet;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
Expand Down Expand Up @@ -41,10 +40,8 @@ pub struct WorkflowRepository {
// Map each user defined workflow to its version and workflow file
definitions: HashMap<OperationName, (WorkflowVersion, Utf8PathBuf)>,

// Set of workflow in-use
//
// TODO Implement the cleanup logic
in_use_copies: HashSet<WorkflowVersion>,
// Map each workflow version to the count of instance using it
in_use_copies: HashMap<WorkflowVersion, u32>,

// The in-memory representation of all the workflows (builtin, user-defined, in-use).
workflows: WorkflowSupervisor,
Expand All @@ -59,7 +56,7 @@ impl WorkflowRepository {
let workflows = WorkflowSupervisor::default();
let state_dir = state_dir.join("workflows-in-use");
let definitions = HashMap::new();
let in_use_copies = HashSet::new();
let in_use_copies = HashMap::new();
Self {
builtin_workflows,
custom_workflows_dir,
Expand Down Expand Up @@ -138,7 +135,10 @@ impl WorkflowRepository {
WorkflowSource::UserDefined(version)
}
WorkflowSource::InUseCopy(_) => {
self.in_use_copies.insert(version.clone());
self.in_use_copies
.entry(version.clone())
.and_modify(|count| *count += 1)
.or_insert(1);
WorkflowSource::InUseCopy(version)
}
WorkflowSource::BuiltIn => WorkflowSource::BuiltIn,
Expand Down Expand Up @@ -265,19 +265,30 @@ impl WorkflowRepository {
operation: &OperationName,
version: &WorkflowVersion,
) {
if self.in_use_copies.contains(version) {
if let Some(count) = self.in_use_copies.get_mut(version) {
*count += 1;
return;
};

if let Some((_, source)) = self.definitions.get(operation) {
let target = self.state_dir.join(version).with_extension("toml");
let target = self.workflow_copy_path(operation, version);
if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await {
error!("Fail to persist a copy of {source} as {target}: {err}");
} else {
self.in_use_copies.insert(version.clone());
self.in_use_copies.insert(version.clone(), 1);
}
}
}

fn workflow_copy_path(
&self,
operation: &OperationName,
version: &WorkflowVersion,
) -> Utf8PathBuf {
let filename = format!("{operation}-{version}");
self.state_dir.join(filename).with_extension("toml")
}

async fn load_latest_version(&mut self, operation: &OperationName) {
if let Some((path, version, workflow)) = self.get_latest_version(operation).await {
if let Err(err) = self.load_operation_workflow(
Expand All @@ -290,6 +301,22 @@ impl WorkflowRepository {
}
}

async fn release_in_use_copy(&mut self, operation: &OperationName, version: &WorkflowVersion) {
if let Some(count) = self.in_use_copies.get_mut(version) {
*count -= 1;
if *count > 0 {
return;
}
}

self.in_use_copies.remove(version);

let target = self.workflow_copy_path(operation, version);
if let Err(err) = tokio::fs::remove_file(target.clone()).await {
error!("Fail to remove the workflow copy at {target}: {err}");
}
}

async fn get_latest_version(
&mut self,
operation: &OperationName,
Expand Down Expand Up @@ -361,6 +388,12 @@ impl WorkflowRepository {
if command_state.is_init() {
// A new command instance must use the latest on-disk version of the operation workflow
self.load_latest_version(&operation.to_string()).await;
} else if command_state.is_finished() {
// Clear the cache if this happens to be the latest instance using that version of the workflow
if let Some(version) = command_state.workflow_version() {
self.release_in_use_copy(&operation.to_string(), &version)
.await;
}
}

match self
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_api/src/workflow/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl WorkflowSupervisor {
versions.remove(version);
}

let (empty,builtin_restored) = match self.workflows.get(&operation) {
let (empty, builtin_restored) = match self.workflows.get(&operation) {
None => (true, false),
Some(version) if version.is_empty() => (true, false),
Some(version) if version.is_builtin() => (false, true),
Expand Down

0 comments on commit d70209e

Please sign in to comment.