From 4bf281a36abd61aa5b0d73e8329717b0b6287863 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 22 Oct 2024 18:19:28 +0200 Subject: [PATCH] Improve struct WorkflowSource A workflow source being always used with a complementary info: a file path or a workflow version, it makes sense to pack the complementary info within the WorkflowSource itself. This also highlights the corner case of the BuiltIn workflow for which there is no complementary info. Signed-off-by: Didier Wenzek --- .../src/operation_workflows/persist.rs | 39 ++++++----- .../core/tedge_api/src/workflow/supervisor.rs | 69 ++++++++++--------- 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 27cc691802..4c429544ae 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -74,7 +74,7 @@ impl WorkflowRepository { // First, all the user-defined workflows are loaded let dir_path = &self.custom_workflows_dir.clone(); if let Err(err) = self - .load_operation_workflows(WorkflowSource::UserDefined, dir_path) + .load_operation_workflows(WorkflowSource::UserDefined(dir_path)) .await { error!("Fail to read the operation workflows from {dir_path}: {err:?}"); @@ -84,7 +84,7 @@ impl WorkflowRepository { let dir_path = &self.state_dir.clone(); let _ = tokio::fs::create_dir(dir_path).await; // if the creation fails, this will be reported next line on read if let Err(err) = self - .load_operation_workflows(WorkflowSource::InUseCopy, dir_path) + .load_operation_workflows(WorkflowSource::InUseCopy(dir_path)) .await { error!("Fail to reload the running operation workflows from {dir_path}: {err:?}"); @@ -96,16 +96,19 @@ impl WorkflowRepository { async fn load_operation_workflows( &mut self, - source: WorkflowSource, - dir_path: &Utf8PathBuf, + source: WorkflowSource<&Utf8PathBuf>, ) -> Result<(), anyhow::Error> { + let Some(dir_path) = source.inner() else { + return Ok(()); + }; for entry in dir_path.read_dir_utf8()?.flatten() { let file = entry.path(); if file.extension() == Some("toml") { match read_operation_workflow(file) .await .and_then(|(workflow, version)| { - self.load_operation_workflow(source, file.into(), workflow, version) + let file_source = source.set_inner(file.into()); + self.load_operation_workflow(file_source, workflow, version) }) { Ok(cmd) => { info!( @@ -123,25 +126,25 @@ impl WorkflowRepository { fn load_operation_workflow( &mut self, - source: WorkflowSource, - definition: Utf8PathBuf, + source: WorkflowSource, workflow: OperationWorkflow, version: WorkflowVersion, ) -> Result { let operation_name = workflow.operation.to_string(); - match source { - WorkflowSource::UserDefined => { + let version = match source { + WorkflowSource::UserDefined(definition) => { self.definitions .insert(operation_name.clone(), (version.clone(), definition)); + WorkflowSource::UserDefined(version) } - WorkflowSource::InUseCopy => { + WorkflowSource::InUseCopy(_) => { self.in_use_copies.insert(version.clone()); + WorkflowSource::InUseCopy(version) } - WorkflowSource::BuiltIn => {} + WorkflowSource::BuiltIn => WorkflowSource::BuiltIn, }; - self.workflows - .register_custom_workflow(source, workflow, version)?; + self.workflows.register_custom_workflow(version, workflow)?; Ok(operation_name) } @@ -213,8 +216,7 @@ impl WorkflowRepository { match read_operation_workflow(path).await { Ok((workflow, version)) => { if let Ok(cmd) = self.load_operation_workflow( - WorkflowSource::UserDefined, - path.clone(), + WorkflowSource::UserDefined(path.clone()), workflow, version, ) { @@ -244,10 +246,10 @@ impl WorkflowRepository { .find(|(_, (_, p))| p == removed_path) .map(|(n, (v, _))| (n.clone(), v.clone()))?; self.definitions.remove(&operation); - let deprecated = self + let builtin_restored = self .workflows .unregister_custom_workflow(&operation, &removed_version); - if matches!(deprecated, Some(WorkflowSource::BuiltIn)) { + if builtin_restored { info!("The builtin workflow for the '{operation}' operation has been restored"); None } else { @@ -279,8 +281,7 @@ impl WorkflowRepository { async fn load_latest_version(&mut self, operation: &OperationName) { if let Some((path, version, workflow)) = self.get_latest_version(operation).await { if let Err(err) = self.load_operation_workflow( - WorkflowSource::UserDefined, - path.clone(), + WorkflowSource::UserDefined(path.clone()), workflow, version, ) { diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 5a5474ca3a..96b47abc76 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -24,22 +24,20 @@ impl WorkflowSupervisor { self.register_custom_workflow( WorkflowSource::BuiltIn, OperationWorkflow::built_in(operation), - "builtin".to_string(), ) } /// Register a user-defined workflow pub fn register_custom_workflow( &mut self, - source: WorkflowSource, + version: WorkflowSource, workflow: OperationWorkflow, - version: WorkflowVersion, ) -> Result<(), WorkflowRegistrationError> { let operation = workflow.operation.clone(); if let Some(versions) = self.workflows.get_mut(&operation) { - versions.add(source, version, workflow); + versions.add(version, workflow); } else { - let versions = WorkflowVersions::new(source, version, workflow); + let versions = WorkflowVersions::new(version, workflow); self.workflows.insert(operation, versions); } Ok(()) @@ -47,31 +45,29 @@ impl WorkflowSupervisor { /// Un-register a user-defined workflow /// - /// Return None is this was the last version for that operation. - /// Return Some(BuiltIn) is there is a builtin definition - /// Return Some(InUseCopy) if the workflow has been deprecated but there is still a running command. + /// Return true if a builtin version has been restored pub fn unregister_custom_workflow( &mut self, operation: &OperationName, version: &WorkflowVersion, - ) -> Option { + ) -> bool { let operation = OperationType::from(operation.as_str()); if let Some(versions) = self.workflows.get_mut(&operation) { versions.remove(version); } - let current_source = match self.workflows.get(&operation) { - None => None, - Some(version) if version.is_empty() => None, - Some(version) if version.is_builtin() => Some(BuiltIn), - Some(_) => Some(InUseCopy), + let (empty,builtin_restored) = match self.workflows.get(&operation) { + None => (true, false), + Some(version) if version.is_empty() => (true, false), + Some(version) if version.is_builtin() => (false, true), + Some(_) => (false, false), }; - if current_source.is_none() { + if empty { self.workflows.remove(&operation); } - current_source + builtin_restored } /// The set of pending commands @@ -316,11 +312,27 @@ struct WorkflowVersions { in_use: HashMap, } -#[derive(Copy, Clone, Hash, Eq, PartialEq)] -pub enum WorkflowSource { +pub enum WorkflowSource { BuiltIn, - UserDefined, - InUseCopy, + UserDefined(T), + InUseCopy(T), +} + +impl WorkflowSource { + pub fn inner(&self) -> Option<&T> { + match self { + BuiltIn => None, + UserDefined(inner) | InUseCopy(inner) => Some(inner), + } + } + + pub fn set_inner(&self, target: U) -> WorkflowSource { + match self { + BuiltIn => BuiltIn, + UserDefined(_) => UserDefined(target), + InUseCopy(_) => InUseCopy(target), + } + } } use WorkflowSource::*; @@ -328,15 +340,15 @@ use WorkflowSource::*; impl WorkflowVersions { const BUILT_IN: &'static str = "builtin"; - fn new(source: WorkflowSource, version: WorkflowVersion, workflow: OperationWorkflow) -> Self { + fn new(source: WorkflowSource, workflow: OperationWorkflow) -> Self { let operation = workflow.operation.to_string(); let (current, in_use) = match source { BuiltIn => ( None, HashMap::from([(Self::BUILT_IN.to_string(), workflow)]), ), - UserDefined => (Some((version, workflow)), HashMap::new()), - InUseCopy => (None, HashMap::from([(version, workflow)])), + UserDefined(version) => (Some((version, workflow)), HashMap::new()), + InUseCopy(version) => (None, HashMap::from([(version, workflow)])), }; WorkflowVersions { @@ -346,20 +358,15 @@ impl WorkflowVersions { } } - fn add( - &mut self, - source: WorkflowSource, - version: WorkflowVersion, - workflow: OperationWorkflow, - ) { + fn add(&mut self, source: WorkflowSource, workflow: OperationWorkflow) { match source { BuiltIn => { self.in_use.insert(Self::BUILT_IN.to_string(), workflow); } - UserDefined => { + UserDefined(version) => { self.current = Some((version, workflow)); } - InUseCopy => { + InUseCopy(version) => { self.in_use.insert(version, workflow); } };