Skip to content

Commit

Permalink
Merge pull request #2912 from didier-wenzek/refactor/rename-workflow-…
Browse files Browse the repository at this point in the history
…actor

refactoring: rename workflow related structs
  • Loading branch information
didier-wenzek authored Jun 4, 2024
2 parents 216b9cb + afd5de3 commit 114486d
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 82 deletions.
54 changes: 5 additions & 49 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::file_transfer_server::actor::FileTransferServerBuilder;
use crate::file_transfer_server::actor::FileTransferServerConfig;
use crate::operation_file_cache::FileCacheActorBuilder;
use crate::operation_workflows::load_operation_workflows;
use crate::operation_workflows::OperationConfig;
use crate::operation_workflows::WorkflowActorBuilder;
use crate::restart_manager::builder::RestartManagerBuilder;
use crate::restart_manager::config::RestartManagerConfig;
use crate::software_manager::builder::SoftwareManagerBuilder;
use crate::software_manager::config::SoftwareManagerConfig;
use crate::state_repository::state::agent_state_dir;
use crate::tedge_operation_converter::builder::TedgeOperationConverterBuilder;
use crate::tedge_operation_converter::config::OperationConfig;
use crate::tedge_to_te_converter::converter::TedgetoTeConverter;
use crate::AgentOpt;
use crate::Capabilities;
Expand All @@ -17,12 +18,9 @@ use camino::Utf8PathBuf;
use flockfile::check_another_instance_is_not_running;
use flockfile::Flockfile;
use flockfile::FlockfileError;
use log::error;
use reqwest::Identity;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use tedge_actors::Concurrent;
use tedge_actors::ConvertingActor;
Expand All @@ -37,8 +35,6 @@ use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::Service;
use tedge_api::path::DataDir;
use tedge_api::workflow::OperationWorkflow;
use tedge_api::workflow::WorkflowSupervisor;
use tedge_config::TEdgeConfigReaderService;
use tedge_config_manager::ConfigManagerBuilder;
use tedge_config_manager::ConfigManagerConfig;
Expand Down Expand Up @@ -236,7 +232,7 @@ impl Agent {
let mut runtime = Runtime::new();

// Operation workflows
let workflows = self.load_operation_workflows().await?;
let workflows = load_operation_workflows(&self.config.operations_dir).await?;
let mut script_runner: ServerActorBuilder<ScriptActor, Concurrent> = ScriptActor::builder();

// Restart actor
Expand All @@ -249,7 +245,7 @@ impl Agent {
let mut software_update_builder = SoftwareManagerBuilder::new(self.config.sw_update_config);

// Converter actor
let mut converter_actor_builder = TedgeOperationConverterBuilder::new(
let mut converter_actor_builder = WorkflowActorBuilder::new(
self.config.operation_config,
workflows,
&mut mqtt_actor_builder,
Expand Down Expand Up @@ -380,46 +376,6 @@ impl Agent {

Ok(())
}

async fn load_operation_workflows(&self) -> Result<WorkflowSupervisor, anyhow::Error> {
let dir_path = &self.config.operations_dir;
let mut workflows = WorkflowSupervisor::default();
for entry in std::fs::read_dir(dir_path)?.flatten() {
let file = entry.path();
if file.extension() == Some(OsStr::new("toml")) {
match read_operation_workflow(&file)
.await
.and_then(|workflow| load_operation_workflow(&mut workflows, workflow))
{
Ok(cmd) => {
info!("Using operation workflow definition from {file:?} for '{cmd}' operation");
}
Err(err) => {
error!("Ignoring operation workflow definition from {file:?}: {err:?}")
}
};
}
}
Ok(workflows)
}
}

async fn read_operation_workflow(path: &Path) -> Result<OperationWorkflow, anyhow::Error> {
let bytes = tokio::fs::read(path)
.await
.context("Reading file content")?;
let input = std::str::from_utf8(&bytes).context("Expecting UTF8 content")?;
let workflow = toml::from_str::<OperationWorkflow>(input).context("Parsing TOML content")?;
Ok(workflow)
}

fn load_operation_workflow(
workflows: &mut WorkflowSupervisor,
workflow: OperationWorkflow,
) -> Result<String, anyhow::Error> {
let name = workflow.operation.to_string();
workflows.register_custom_workflow(workflow)?;
Ok(name)
}

pub fn create_tedge_to_te_converter(
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use tracing::log::warn;
mod agent;
mod file_transfer_server;
mod operation_file_cache;
mod operation_workflows;
mod restart_manager;
mod software_manager;
mod state_repository;
mod tedge_operation_converter;
mod tedge_to_te_converter;

#[derive(Debug, Clone, clap::Parser)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::operation_workflows::message_box::CommandDispatcher;
use crate::state_repository::state::AgentStateRepository;
use crate::tedge_operation_converter::message_box::CommandDispatcher;
use async_trait::async_trait;
use camino::Utf8PathBuf;
use log::error;
Expand Down Expand Up @@ -44,7 +44,7 @@ pub struct InternalCommandState(GenericCommandState);

fan_in_message_type!(AgentInput[MqttMessage, InternalCommandState, GenericCommandData] : Debug);

pub struct TedgeOperationConverterActor {
pub struct WorkflowActor {
pub(crate) mqtt_schema: MqttSchema,
pub(crate) device_topic_id: EntityTopicId,
pub(crate) workflows: WorkflowSupervisor,
Expand All @@ -58,9 +58,9 @@ pub struct TedgeOperationConverterActor {
}

#[async_trait]
impl Actor for TedgeOperationConverterActor {
impl Actor for WorkflowActor {
fn name(&self) -> &str {
"TedgeOperationConverter"
"WorkflowActor"
}

async fn run(mut self) -> Result<(), RuntimeError> {
Expand Down Expand Up @@ -90,7 +90,7 @@ impl Actor for TedgeOperationConverterActor {
}
}

impl TedgeOperationConverterActor {
impl WorkflowActor {
async fn publish_operation_capabilities(&mut self) -> Result<(), RuntimeError> {
for capability in self
.workflows
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::operation_workflows::actor::AgentInput;
use crate::operation_workflows::actor::InternalCommandState;
use crate::operation_workflows::actor::WorkflowActor;
use crate::operation_workflows::config::OperationConfig;
use crate::operation_workflows::message_box::CommandDispatcher;
use crate::state_repository::state::AgentStateRepository;
use crate::tedge_operation_converter::actor::AgentInput;
use crate::tedge_operation_converter::actor::InternalCommandState;
use crate::tedge_operation_converter::actor::TedgeOperationConverterActor;
use crate::tedge_operation_converter::config::OperationConfig;
use crate::tedge_operation_converter::message_box::CommandDispatcher;
use log::error;
use std::process::Output;
use tedge_actors::futures::channel::mpsc;
Expand Down Expand Up @@ -32,7 +32,7 @@ use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
use tedge_script_ext::Execute;

pub struct TedgeOperationConverterBuilder {
pub struct WorkflowActorBuilder {
config: OperationConfig,
workflows: WorkflowSupervisor,
input_sender: DynSender<AgentInput>,
Expand All @@ -44,7 +44,7 @@ pub struct TedgeOperationConverterBuilder {
signal_sender: mpsc::Sender<RuntimeRequest>,
}

impl TedgeOperationConverterBuilder {
impl WorkflowActorBuilder {
pub fn new(
config: OperationConfig,
workflows: WorkflowSupervisor,
Expand Down Expand Up @@ -105,20 +105,20 @@ impl TedgeOperationConverterBuilder {
}
}

impl RuntimeRequestSink for TedgeOperationConverterBuilder {
impl RuntimeRequestSink for WorkflowActorBuilder {
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
Box::new(self.signal_sender.clone())
}
}

impl Builder<TedgeOperationConverterActor> for TedgeOperationConverterBuilder {
impl Builder<WorkflowActor> for WorkflowActorBuilder {
type Error = LinkError;

fn try_build(self) -> Result<TedgeOperationConverterActor, Self::Error> {
fn try_build(self) -> Result<WorkflowActor, Self::Error> {
Ok(self.build())
}

fn build(mut self) -> TedgeOperationConverterActor {
fn build(mut self) -> WorkflowActor {
for capability in self.command_dispatcher.capabilities() {
if let Err(err) = self
.workflows
Expand All @@ -130,7 +130,7 @@ impl Builder<TedgeOperationConverterActor> for TedgeOperationConverterBuilder {

let repository =
AgentStateRepository::new(self.config.state_dir, self.config.config_dir, "workflows");
TedgeOperationConverterActor {
WorkflowActor {
mqtt_schema: self.config.mqtt_schema,
device_topic_id: self.config.device_topic_id,
workflows: self.workflows,
Expand Down
62 changes: 62 additions & 0 deletions crates/core/tedge_agent/src/operation_workflows/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use anyhow::Context;
use camino::Utf8PathBuf;
use log::error;
use std::ffi::OsStr;
use std::path::Path;
use tedge_api::workflow::OperationWorkflow;
use tedge_api::workflow::WorkflowSupervisor;
use tracing::info;

mod actor;
mod builder;
mod config;
mod message_box;

#[cfg(test)]
mod tests;

pub use builder::WorkflowActorBuilder;
pub use config::OperationConfig;

pub async fn load_operation_workflows(
dir_path: &Utf8PathBuf,
) -> Result<WorkflowSupervisor, anyhow::Error> {
let mut workflows = WorkflowSupervisor::default();
for entry in std::fs::read_dir(dir_path)?.flatten() {
let file = entry.path();
if file.extension() == Some(OsStr::new("toml")) {
match read_operation_workflow(&file)
.await
.and_then(|workflow| load_operation_workflow(&mut workflows, workflow))
{
Ok(cmd) => {
info!(
"Using operation workflow definition from {file:?} for '{cmd}' operation"
);
}
Err(err) => {
error!("Ignoring operation workflow definition from {file:?}: {err:?}")
}
};
}
}
Ok(workflows)
}

async fn read_operation_workflow(path: &Path) -> Result<OperationWorkflow, anyhow::Error> {
let bytes = tokio::fs::read(path)
.await
.context("Reading file content")?;
let input = std::str::from_utf8(&bytes).context("Expecting UTF8 content")?;
let workflow = toml::from_str::<OperationWorkflow>(input).context("Parsing TOML content")?;
Ok(workflow)
}

fn load_operation_workflow(
workflows: &mut WorkflowSupervisor,
workflow: OperationWorkflow,
) -> Result<String, anyhow::Error> {
let name = workflow.operation.to_string();
workflows.register_custom_workflow(workflow)?;
Ok(name)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::operation_workflows::builder::WorkflowActorBuilder;
use crate::operation_workflows::config::OperationConfig;
use crate::software_manager::actor::SoftwareCommand;
use crate::tedge_operation_converter::builder::TedgeOperationConverterBuilder;
use crate::tedge_operation_converter::config::OperationConfig;
use camino::Utf8Path;
use serde_json::json;
use std::process::Output;
Expand Down Expand Up @@ -367,12 +367,8 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result<TestHan
config_dir: tmp_path.into(),
state_dir: tmp_path.into(),
};
let mut converter_actor_builder = TedgeOperationConverterBuilder::new(
config,
workflows,
&mut mqtt_builder,
&mut script_builder,
);
let mut converter_actor_builder =
WorkflowActorBuilder::new(config, workflows, &mut mqtt_builder, &mut script_builder);
converter_actor_builder.register_builtin_operation(&mut restart_builder);
converter_actor_builder.register_builtin_operation(&mut software_builder);

Expand Down
7 changes: 0 additions & 7 deletions crates/core/tedge_agent/src/tedge_operation_converter/mod.rs

This file was deleted.

0 comments on commit 114486d

Please sign in to comment.