Skip to content

Commit

Permalink
Support concurrent instances with difference versions
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Oct 23, 2024
1 parent 651a0b7 commit 55aef04
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 103 deletions.
30 changes: 13 additions & 17 deletions crates/core/tedge_agent/src/operation_workflows/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Context;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use std::collections::HashMap;
use std::collections::HashSet;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
Expand Down Expand Up @@ -37,12 +38,13 @@ pub struct WorkflowRepository {
// Directory of user-defined workflow copies of the workflow in-use
state_dir: Utf8PathBuf,

// Map each workflow version to its workflow file
//
// For a fresh new workflow definition, this points to the user-defined file
// When the workflow definition is in use, this points to a copy in the state directory.
// Map each user defined workflow to its version and workflow file
definitions: HashMap<OperationName, (WorkflowVersion, Utf8PathBuf)>,
in_use_copies: HashMap<OperationName, WorkflowVersion>,

// Set of workflow in-use
//
// TODO Implement the cleanup logic
in_use_copies: HashSet<WorkflowVersion>,

// The in-memory representation of all the workflows (builtin, user-defined, in-use).
workflows: WorkflowSupervisor,
Expand All @@ -57,7 +59,7 @@ impl WorkflowRepository {
let workflows = WorkflowSupervisor::default();
let state_dir = state_dir.join("workflows-in-use");
let definitions = HashMap::new();
let in_use_copies = HashMap::new();
let in_use_copies = HashSet::new();
Self {
builtin_workflows,
custom_workflows_dir,
Expand All @@ -79,8 +81,6 @@ impl WorkflowRepository {
}

// Then, the definitions of the workflow still in-use are loaded
// If a definition has not changed, then self.definitions is updated accordingly
// so the known location of this definition is the copy not the original
let dir_path = &self.state_dir.clone();
let _ = tokio::fs::create_dir(dir_path).await; // if the creation fails, this will be reported next line on read
if let Err(err) = self
Expand Down Expand Up @@ -135,8 +135,7 @@ impl WorkflowRepository {
.insert(operation_name.clone(), (version.clone(), definition));
}
WorkflowSource::InUseCopy => {
self.in_use_copies
.insert(operation_name.clone(), version.clone());
self.in_use_copies.insert(version.clone());
}
WorkflowSource::BuiltIn => {}
};
Expand Down Expand Up @@ -264,18 +263,15 @@ impl WorkflowRepository {
operation: &OperationName,
version: &WorkflowVersion,
) {
if let Some(in_use_version) = self.in_use_copies.get(operation) {
if in_use_version == version {
return;
}
if self.in_use_copies.contains(version) {
return;
};
if let Some((_, source)) = self.definitions.get(operation) {
let target = self.state_dir.join(operation).with_extension("toml");
let target = self.state_dir.join(version).with_extension("toml");
if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await {
error!("Fail to persist a copy of {source} as {target}: {err}");
} else {
self.in_use_copies
.insert(operation.clone(), version.clone());
self.in_use_copies.insert(version.clone());
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/core/tedge_api/src/workflow/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub enum WorkflowExecutionError {
#[error("Missing status in the command payload")]
MissingStatus,

#[error("Missing version in the command payload")]
MissingVersion,

#[error("No workflow is defined for the operation: {operation}")]
UnknownOperation { operation: String },

Expand Down
151 changes: 65 additions & 86 deletions crates/core/tedge_api/src/workflow/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::workflow::*;
use ::log::info;
use on_disk::OnDiskCommandBoard;
use serde::Serialize;
use std::string::ToString;

/// Dispatch actions to operation participants
#[derive(Default)]
Expand Down Expand Up @@ -185,13 +186,16 @@ impl WorkflowSupervisor {
});
};

let version = command_state.workflow_version();
let Some(version) = &command_state.workflow_version() else {
return Err(WorkflowExecutionError::MissingVersion);
};

self.workflows
.get(&operation_name.as_str().into())
.ok_or(WorkflowExecutionError::UnknownOperation {
operation: operation_name.clone(),
})
.and_then(|versions| versions.get(version.as_ref()))
.and_then(|versions| versions.get(version))
.and_then(|workflow| workflow.get_action(command_state))
}

Expand Down Expand Up @@ -298,26 +302,21 @@ impl WorkflowSupervisor {
}
}

/// A stack of known versions for a workflow
///
/// In practice, one might have 3 concurrent versions:
/// The set of in-use workflow versions for an operation
///
/// - The current version, i.e. the version to be used for new operation instances.
/// - One in-use version, that was used by some operation instance when the current version has been updated.
/// - The builtin version provided by the agent and which can be overridden by the users.
///
/// None of these versions are mandatory. For instance, an operation can have no builtin version,
/// no current version (because the user removed the definition file), but a version still in-use
/// (that has been started before the user deprecated the operation).
/// - The current version is the version that will be used for a new command instance.
/// - The current version might be none. This is the case when the command has been deprecated.
/// - When a new command instance is initialized, the current version is stored as being in use.
/// - When all the commands using a given version are finalized, these copies are removed.
/// - Among all the versions, the `"builtin"` version is specific.
/// - The `"builtin"` version is never removed, and is used as the current version if none is available.
struct WorkflowVersions {
operation: OperationName,
current: Option<WorkflowVersion>,
builtin: Option<WorkflowVersion>,
in_use: Option<WorkflowVersion>,
versions: HashMap<WorkflowVersion, OperationWorkflow>,
current: Option<(WorkflowVersion, OperationWorkflow)>,
in_use: HashMap<WorkflowVersion, OperationWorkflow>,
}

#[derive(Copy, Clone)]
#[derive(Copy, Clone, Hash, Eq, PartialEq)]
pub enum WorkflowSource {
BuiltIn,
UserDefined,
Expand All @@ -327,28 +326,23 @@ pub enum WorkflowSource {
use WorkflowSource::*;

impl WorkflowVersions {
const BUILT_IN: &'static str = "builtin";

fn new(source: WorkflowSource, version: WorkflowVersion, workflow: OperationWorkflow) -> Self {
let operation = workflow.operation.to_string();

let current = match source {
BuiltIn | UserDefined => Some(version.clone()),
InUseCopy => None,
};
let builtin = match source {
BuiltIn => Some(version.clone()),
UserDefined | InUseCopy => None,
};
let in_use = match source {
InUseCopy => Some(version.clone()),
UserDefined | BuiltIn => None,
let (current, in_use) = match source {
BuiltIn => (
None,
HashMap::from([(Self::BUILT_IN.to_string(), workflow)]),
),
UserDefined => (Some((version, workflow)), HashMap::new()),
InUseCopy => (None, HashMap::from([(version, workflow)])),
};
let versions = HashMap::from([(version, workflow)]);

WorkflowVersions {
operation,
current,
builtin,
in_use,
versions,
}
}

Expand All @@ -360,87 +354,72 @@ impl WorkflowVersions {
) {
match source {
BuiltIn => {
self.builtin = Some(version.clone());

if self.current.is_none() {
self.current = Some(version.clone());
} else {
info!(
"The built-in {operation} operation has been customized",
operation = workflow.operation
);
}
self.in_use.insert(Self::BUILT_IN.to_string(), workflow);
}

UserDefined => {
self.current = Some(version.clone());
if self.builtin.is_some() {
info!(
"The built-in {operation} operation has been customized",
operation = workflow.operation
);
}
self.current = Some((version, workflow));
}
InUseCopy => {
self.in_use.insert(version, workflow);
}
};

InUseCopy => self.in_use = Some(version.clone()),
if self.current.is_some() && self.in_use.contains_key(Self::BUILT_IN) {
info!(
"The built-in {operation} operation has been customized",
operation = self.operation
);
}

self.versions.insert(version.clone(), workflow);
}

// Mark the current version as being in-use.
fn use_current_version(&mut self) -> Option<&WorkflowVersion> {
if self.current.is_some() && self.in_use != self.current {
// remove the previous version in-use unless this is the builtin version
if let Some(previous_version) = self.in_use.as_ref() {
if Some(previous_version) != self.builtin.as_ref() {
self.versions.remove(previous_version);
}
match self.current.as_ref() {
Some((version, workflow)) => {
if !self.in_use.contains_key(version) {
self.in_use.insert(version.clone(), workflow.clone());
};
Some(version)
}
self.in_use.clone_from(&self.current);

None => self
.in_use
.get_key_value(Self::BUILT_IN)
.map(|(builtin, _)| builtin),
}
self.current.as_ref()
}

// Remove a version from this list of versions, restoring the built-in version if any
// Remove the current version from this list of versions, restoring the built-in version if any
fn remove(&mut self, version: &WorkflowVersion) {
self.versions.remove(version);
self.current.clone_from(&self.builtin);
if self.current.as_ref().map(|(v, _)| v == version) == Some(true) {
self.current = None;
} else if version != Self::BUILT_IN {
self.in_use.remove(version);
}
}

fn is_empty(&self) -> bool {
self.versions.is_empty()
self.in_use.is_empty()
}

fn is_builtin(&self) -> bool {
self.builtin.is_some()
self.in_use.contains_key(Self::BUILT_IN)
}

fn get(
&self,
version: Option<&WorkflowVersion>,
) -> Result<&OperationWorkflow, WorkflowExecutionError> {
match version {
None => self.current_workflow(),
Some(version) => self.find(version),
}
.ok_or(WorkflowExecutionError::UnknownVersion {
operation: self.operation.clone(),
version: version
.or(self.current.as_ref())
.unwrap_or(&"current".to_string())
.to_string(),
})
fn get(&self, version: &WorkflowVersion) -> Result<&OperationWorkflow, WorkflowExecutionError> {
self.in_use
.get(version)
.ok_or(WorkflowExecutionError::UnknownVersion {
operation: self.operation.clone(),
version: version.to_string(),
})
}

fn current_workflow(&self) -> Option<&OperationWorkflow> {
self.current
.as_ref()
.and_then(|version| self.versions.get(version))
}

fn find(&self, version: &WorkflowVersion) -> Option<&OperationWorkflow> {
self.versions.get(version)
.map(|(_, workflow)| workflow)
.or_else(|| self.in_use.get(Self::BUILT_IN))
}
}

Expand Down
Loading

0 comments on commit 55aef04

Please sign in to comment.