Skip to content

Commit

Permalink
Improve struct WorkflowSource
Browse files Browse the repository at this point in the history
A workflow source being always used with a complementary info:
a file path or a workflow version, it makes sense to pack the
complementary info within the WorkflowSource itself.
This also highlights the corner case of the BuiltIn workflow for which
there is no complementary info.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Oct 23, 2024
1 parent 55aef04 commit 4bf281a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 50 deletions.
39 changes: 20 additions & 19 deletions crates/core/tedge_agent/src/operation_workflows/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl WorkflowRepository {
// First, all the user-defined workflows are loaded
let dir_path = &self.custom_workflows_dir.clone();
if let Err(err) = self
.load_operation_workflows(WorkflowSource::UserDefined, dir_path)
.load_operation_workflows(WorkflowSource::UserDefined(dir_path))
.await
{
error!("Fail to read the operation workflows from {dir_path}: {err:?}");
Expand All @@ -84,7 +84,7 @@ impl WorkflowRepository {
let dir_path = &self.state_dir.clone();
let _ = tokio::fs::create_dir(dir_path).await; // if the creation fails, this will be reported next line on read
if let Err(err) = self
.load_operation_workflows(WorkflowSource::InUseCopy, dir_path)
.load_operation_workflows(WorkflowSource::InUseCopy(dir_path))
.await
{
error!("Fail to reload the running operation workflows from {dir_path}: {err:?}");
Expand All @@ -96,16 +96,19 @@ impl WorkflowRepository {

async fn load_operation_workflows(
&mut self,
source: WorkflowSource,
dir_path: &Utf8PathBuf,
source: WorkflowSource<&Utf8PathBuf>,
) -> Result<(), anyhow::Error> {
let Some(dir_path) = source.inner() else {
return Ok(());
};
for entry in dir_path.read_dir_utf8()?.flatten() {
let file = entry.path();
if file.extension() == Some("toml") {
match read_operation_workflow(file)
.await
.and_then(|(workflow, version)| {
self.load_operation_workflow(source, file.into(), workflow, version)
let file_source = source.set_inner(file.into());
self.load_operation_workflow(file_source, workflow, version)
}) {
Ok(cmd) => {
info!(
Expand All @@ -123,25 +126,25 @@ impl WorkflowRepository {

fn load_operation_workflow(
&mut self,
source: WorkflowSource,
definition: Utf8PathBuf,
source: WorkflowSource<Utf8PathBuf>,
workflow: OperationWorkflow,
version: WorkflowVersion,
) -> Result<String, anyhow::Error> {
let operation_name = workflow.operation.to_string();
match source {
WorkflowSource::UserDefined => {
let version = match source {
WorkflowSource::UserDefined(definition) => {
self.definitions
.insert(operation_name.clone(), (version.clone(), definition));
WorkflowSource::UserDefined(version)
}
WorkflowSource::InUseCopy => {
WorkflowSource::InUseCopy(_) => {
self.in_use_copies.insert(version.clone());
WorkflowSource::InUseCopy(version)
}
WorkflowSource::BuiltIn => {}
WorkflowSource::BuiltIn => WorkflowSource::BuiltIn,
};

self.workflows
.register_custom_workflow(source, workflow, version)?;
self.workflows.register_custom_workflow(version, workflow)?;
Ok(operation_name)
}

Expand Down Expand Up @@ -213,8 +216,7 @@ impl WorkflowRepository {
match read_operation_workflow(path).await {
Ok((workflow, version)) => {
if let Ok(cmd) = self.load_operation_workflow(
WorkflowSource::UserDefined,
path.clone(),
WorkflowSource::UserDefined(path.clone()),
workflow,
version,
) {
Expand Down Expand Up @@ -244,10 +246,10 @@ impl WorkflowRepository {
.find(|(_, (_, p))| p == removed_path)
.map(|(n, (v, _))| (n.clone(), v.clone()))?;
self.definitions.remove(&operation);
let deprecated = self
let builtin_restored = self
.workflows
.unregister_custom_workflow(&operation, &removed_version);
if matches!(deprecated, Some(WorkflowSource::BuiltIn)) {
if builtin_restored {
info!("The builtin workflow for the '{operation}' operation has been restored");
None
} else {
Expand Down Expand Up @@ -279,8 +281,7 @@ impl WorkflowRepository {
async fn load_latest_version(&mut self, operation: &OperationName) {
if let Some((path, version, workflow)) = self.get_latest_version(operation).await {
if let Err(err) = self.load_operation_workflow(
WorkflowSource::UserDefined,
path.clone(),
WorkflowSource::UserDefined(path.clone()),
workflow,
version,
) {
Expand Down
69 changes: 38 additions & 31 deletions crates/core/tedge_api/src/workflow/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,54 +24,50 @@ impl WorkflowSupervisor {
self.register_custom_workflow(
WorkflowSource::BuiltIn,
OperationWorkflow::built_in(operation),
"builtin".to_string(),
)
}

/// Register a user-defined workflow
pub fn register_custom_workflow(
&mut self,
source: WorkflowSource,
version: WorkflowSource<WorkflowVersion>,
workflow: OperationWorkflow,
version: WorkflowVersion,
) -> Result<(), WorkflowRegistrationError> {
let operation = workflow.operation.clone();
if let Some(versions) = self.workflows.get_mut(&operation) {
versions.add(source, version, workflow);
versions.add(version, workflow);
} else {
let versions = WorkflowVersions::new(source, version, workflow);
let versions = WorkflowVersions::new(version, workflow);
self.workflows.insert(operation, versions);
}
Ok(())
}

/// Un-register a user-defined workflow
///
/// Return None is this was the last version for that operation.
/// Return Some(BuiltIn) is there is a builtin definition
/// Return Some(InUseCopy) if the workflow has been deprecated but there is still a running command.
/// Return true if a builtin version has been restored
pub fn unregister_custom_workflow(
&mut self,
operation: &OperationName,
version: &WorkflowVersion,
) -> Option<WorkflowSource> {
) -> bool {
let operation = OperationType::from(operation.as_str());
if let Some(versions) = self.workflows.get_mut(&operation) {
versions.remove(version);
}

let current_source = match self.workflows.get(&operation) {
None => None,
Some(version) if version.is_empty() => None,
Some(version) if version.is_builtin() => Some(BuiltIn),
Some(_) => Some(InUseCopy),
let (empty,builtin_restored) = match self.workflows.get(&operation) {
None => (true, false),
Some(version) if version.is_empty() => (true, false),
Some(version) if version.is_builtin() => (false, true),
Some(_) => (false, false),
};

if current_source.is_none() {
if empty {
self.workflows.remove(&operation);
}

current_source
builtin_restored
}

/// The set of pending commands
Expand Down Expand Up @@ -316,27 +312,43 @@ struct WorkflowVersions {
in_use: HashMap<WorkflowVersion, OperationWorkflow>,
}

#[derive(Copy, Clone, Hash, Eq, PartialEq)]
pub enum WorkflowSource {
pub enum WorkflowSource<T> {
BuiltIn,
UserDefined,
InUseCopy,
UserDefined(T),
InUseCopy(T),
}

impl<T> WorkflowSource<T> {
pub fn inner(&self) -> Option<&T> {
match self {
BuiltIn => None,
UserDefined(inner) | InUseCopy(inner) => Some(inner),
}
}

pub fn set_inner<U>(&self, target: U) -> WorkflowSource<U> {
match self {
BuiltIn => BuiltIn,
UserDefined(_) => UserDefined(target),
InUseCopy(_) => InUseCopy(target),
}
}
}

use WorkflowSource::*;

impl WorkflowVersions {
const BUILT_IN: &'static str = "builtin";

fn new(source: WorkflowSource, version: WorkflowVersion, workflow: OperationWorkflow) -> Self {
fn new(source: WorkflowSource<WorkflowVersion>, workflow: OperationWorkflow) -> Self {
let operation = workflow.operation.to_string();
let (current, in_use) = match source {
BuiltIn => (
None,
HashMap::from([(Self::BUILT_IN.to_string(), workflow)]),
),
UserDefined => (Some((version, workflow)), HashMap::new()),
InUseCopy => (None, HashMap::from([(version, workflow)])),
UserDefined(version) => (Some((version, workflow)), HashMap::new()),
InUseCopy(version) => (None, HashMap::from([(version, workflow)])),
};

WorkflowVersions {
Expand All @@ -346,20 +358,15 @@ impl WorkflowVersions {
}
}

fn add(
&mut self,
source: WorkflowSource,
version: WorkflowVersion,
workflow: OperationWorkflow,
) {
fn add(&mut self, source: WorkflowSource<WorkflowVersion>, workflow: OperationWorkflow) {
match source {
BuiltIn => {
self.in_use.insert(Self::BUILT_IN.to_string(), workflow);
}
UserDefined => {
UserDefined(version) => {
self.current = Some((version, workflow));
}
InUseCopy => {
InUseCopy(version) => {
self.in_use.insert(version, workflow);
}
};
Expand Down

0 comments on commit 4bf281a

Please sign in to comment.