diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 47d8b7ba932..5a192835883 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -1,13 +1,14 @@ use crate::file_transfer_server::actor::FileTransferServerBuilder; use crate::file_transfer_server::actor::FileTransferServerConfig; use crate::operation_file_cache::FileCacheActorBuilder; +use crate::operation_workflows::load_operation_workflows; +use crate::operation_workflows::OperationConfig; +use crate::operation_workflows::WorkflowActorBuilder; use crate::restart_manager::builder::RestartManagerBuilder; use crate::restart_manager::config::RestartManagerConfig; use crate::software_manager::builder::SoftwareManagerBuilder; use crate::software_manager::config::SoftwareManagerConfig; use crate::state_repository::state::agent_state_dir; -use crate::tedge_operation_converter::builder::TedgeOperationConverterBuilder; -use crate::tedge_operation_converter::config::OperationConfig; use crate::tedge_to_te_converter::converter::TedgetoTeConverter; use crate::AgentOpt; use crate::Capabilities; @@ -17,12 +18,9 @@ use camino::Utf8PathBuf; use flockfile::check_another_instance_is_not_running; use flockfile::Flockfile; use flockfile::FlockfileError; -use log::error; use reqwest::Identity; -use std::ffi::OsStr; use std::fmt::Debug; use std::net::SocketAddr; -use std::path::Path; use std::sync::Arc; use tedge_actors::Concurrent; use tedge_actors::ConvertingActor; @@ -37,8 +35,6 @@ use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::Service; use tedge_api::path::DataDir; -use tedge_api::workflow::OperationWorkflow; -use tedge_api::workflow::WorkflowSupervisor; use tedge_config::TEdgeConfigReaderService; use tedge_config_manager::ConfigManagerBuilder; use tedge_config_manager::ConfigManagerConfig; @@ -236,7 +232,7 @@ impl Agent { let mut runtime = Runtime::new(); // Operation workflows - let workflows = self.load_operation_workflows().await?; + let workflows = load_operation_workflows(&self.config.operations_dir).await?; let mut script_runner: ServerActorBuilder = ScriptActor::builder(); // Restart actor @@ -249,7 +245,7 @@ impl Agent { let mut software_update_builder = SoftwareManagerBuilder::new(self.config.sw_update_config); // Converter actor - let mut converter_actor_builder = TedgeOperationConverterBuilder::new( + let mut converter_actor_builder = WorkflowActorBuilder::new( self.config.operation_config, workflows, &mut mqtt_actor_builder, @@ -380,46 +376,6 @@ impl Agent { Ok(()) } - - async fn load_operation_workflows(&self) -> Result { - let dir_path = &self.config.operations_dir; - let mut workflows = WorkflowSupervisor::default(); - for entry in std::fs::read_dir(dir_path)?.flatten() { - let file = entry.path(); - if file.extension() == Some(OsStr::new("toml")) { - match read_operation_workflow(&file) - .await - .and_then(|workflow| load_operation_workflow(&mut workflows, workflow)) - { - Ok(cmd) => { - info!("Using operation workflow definition from {file:?} for '{cmd}' operation"); - } - Err(err) => { - error!("Ignoring operation workflow definition from {file:?}: {err:?}") - } - }; - } - } - Ok(workflows) - } -} - -async fn read_operation_workflow(path: &Path) -> Result { - let bytes = tokio::fs::read(path) - .await - .context("Reading file content")?; - let input = std::str::from_utf8(&bytes).context("Expecting UTF8 content")?; - let workflow = toml::from_str::(input).context("Parsing TOML content")?; - Ok(workflow) -} - -fn load_operation_workflow( - workflows: &mut WorkflowSupervisor, - workflow: OperationWorkflow, -) -> Result { - let name = workflow.operation.to_string(); - workflows.register_custom_workflow(workflow)?; - Ok(name) } pub fn create_tedge_to_te_converter( diff --git a/crates/core/tedge_agent/src/lib.rs b/crates/core/tedge_agent/src/lib.rs index 8911f220c77..8f7f3d7fc7b 100644 --- a/crates/core/tedge_agent/src/lib.rs +++ b/crates/core/tedge_agent/src/lib.rs @@ -23,10 +23,10 @@ use tracing::log::warn; mod agent; mod file_transfer_server; mod operation_file_cache; +mod operation_workflows; mod restart_manager; mod software_manager; mod state_repository; -mod tedge_operation_converter; mod tedge_to_te_converter; #[derive(Debug, Clone, clap::Parser)] diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs similarity index 98% rename from crates/core/tedge_agent/src/tedge_operation_converter/actor.rs rename to crates/core/tedge_agent/src/operation_workflows/actor.rs index 2d9bc576de4..acc5d6c4d58 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -1,5 +1,5 @@ +use crate::operation_workflows::message_box::CommandDispatcher; use crate::state_repository::state::AgentStateRepository; -use crate::tedge_operation_converter::message_box::CommandDispatcher; use async_trait::async_trait; use camino::Utf8PathBuf; use log::error; @@ -44,7 +44,7 @@ pub struct InternalCommandState(GenericCommandState); fan_in_message_type!(AgentInput[MqttMessage, InternalCommandState, GenericCommandData] : Debug); -pub struct TedgeOperationConverterActor { +pub struct WorkflowActor { pub(crate) mqtt_schema: MqttSchema, pub(crate) device_topic_id: EntityTopicId, pub(crate) workflows: WorkflowSupervisor, @@ -58,9 +58,9 @@ pub struct TedgeOperationConverterActor { } #[async_trait] -impl Actor for TedgeOperationConverterActor { +impl Actor for WorkflowActor { fn name(&self) -> &str { - "TedgeOperationConverter" + "WorkflowActor" } async fn run(mut self) -> Result<(), RuntimeError> { @@ -90,7 +90,7 @@ impl Actor for TedgeOperationConverterActor { } } -impl TedgeOperationConverterActor { +impl WorkflowActor { async fn publish_operation_capabilities(&mut self) -> Result<(), RuntimeError> { for capability in self .workflows diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/builder.rs b/crates/core/tedge_agent/src/operation_workflows/builder.rs similarity index 86% rename from crates/core/tedge_agent/src/tedge_operation_converter/builder.rs rename to crates/core/tedge_agent/src/operation_workflows/builder.rs index 915fef7f4f9..7fa0d7029d0 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/builder.rs +++ b/crates/core/tedge_agent/src/operation_workflows/builder.rs @@ -1,9 +1,9 @@ +use crate::operation_workflows::actor::AgentInput; +use crate::operation_workflows::actor::InternalCommandState; +use crate::operation_workflows::actor::WorkflowActor; +use crate::operation_workflows::config::OperationConfig; +use crate::operation_workflows::message_box::CommandDispatcher; use crate::state_repository::state::AgentStateRepository; -use crate::tedge_operation_converter::actor::AgentInput; -use crate::tedge_operation_converter::actor::InternalCommandState; -use crate::tedge_operation_converter::actor::TedgeOperationConverterActor; -use crate::tedge_operation_converter::config::OperationConfig; -use crate::tedge_operation_converter::message_box::CommandDispatcher; use log::error; use std::process::Output; use tedge_actors::futures::channel::mpsc; @@ -32,7 +32,7 @@ use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; use tedge_script_ext::Execute; -pub struct TedgeOperationConverterBuilder { +pub struct WorkflowActorBuilder { config: OperationConfig, workflows: WorkflowSupervisor, input_sender: DynSender, @@ -44,7 +44,7 @@ pub struct TedgeOperationConverterBuilder { signal_sender: mpsc::Sender, } -impl TedgeOperationConverterBuilder { +impl WorkflowActorBuilder { pub fn new( config: OperationConfig, workflows: WorkflowSupervisor, @@ -105,20 +105,20 @@ impl TedgeOperationConverterBuilder { } } -impl RuntimeRequestSink for TedgeOperationConverterBuilder { +impl RuntimeRequestSink for WorkflowActorBuilder { fn get_signal_sender(&self) -> DynSender { Box::new(self.signal_sender.clone()) } } -impl Builder for TedgeOperationConverterBuilder { +impl Builder for WorkflowActorBuilder { type Error = LinkError; - fn try_build(self) -> Result { + fn try_build(self) -> Result { Ok(self.build()) } - fn build(mut self) -> TedgeOperationConverterActor { + fn build(mut self) -> WorkflowActor { for capability in self.command_dispatcher.capabilities() { if let Err(err) = self .workflows @@ -130,7 +130,7 @@ impl Builder for TedgeOperationConverterBuilder { let repository = AgentStateRepository::new(self.config.state_dir, self.config.config_dir, "workflows"); - TedgeOperationConverterActor { + WorkflowActor { mqtt_schema: self.config.mqtt_schema, device_topic_id: self.config.device_topic_id, workflows: self.workflows, diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/config.rs b/crates/core/tedge_agent/src/operation_workflows/config.rs similarity index 100% rename from crates/core/tedge_agent/src/tedge_operation_converter/config.rs rename to crates/core/tedge_agent/src/operation_workflows/config.rs diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/message_box.rs b/crates/core/tedge_agent/src/operation_workflows/message_box.rs similarity index 100% rename from crates/core/tedge_agent/src/tedge_operation_converter/message_box.rs rename to crates/core/tedge_agent/src/operation_workflows/message_box.rs diff --git a/crates/core/tedge_agent/src/operation_workflows/mod.rs b/crates/core/tedge_agent/src/operation_workflows/mod.rs new file mode 100644 index 00000000000..631340615ac --- /dev/null +++ b/crates/core/tedge_agent/src/operation_workflows/mod.rs @@ -0,0 +1,62 @@ +use anyhow::Context; +use camino::Utf8PathBuf; +use log::error; +use std::ffi::OsStr; +use std::path::Path; +use tedge_api::workflow::OperationWorkflow; +use tedge_api::workflow::WorkflowSupervisor; +use tracing::info; + +mod actor; +mod builder; +mod config; +mod message_box; + +#[cfg(test)] +mod tests; + +pub use builder::WorkflowActorBuilder; +pub use config::OperationConfig; + +pub async fn load_operation_workflows( + dir_path: &Utf8PathBuf, +) -> Result { + let mut workflows = WorkflowSupervisor::default(); + for entry in std::fs::read_dir(dir_path)?.flatten() { + let file = entry.path(); + if file.extension() == Some(OsStr::new("toml")) { + match read_operation_workflow(&file) + .await + .and_then(|workflow| load_operation_workflow(&mut workflows, workflow)) + { + Ok(cmd) => { + info!( + "Using operation workflow definition from {file:?} for '{cmd}' operation" + ); + } + Err(err) => { + error!("Ignoring operation workflow definition from {file:?}: {err:?}") + } + }; + } + } + Ok(workflows) +} + +async fn read_operation_workflow(path: &Path) -> Result { + let bytes = tokio::fs::read(path) + .await + .context("Reading file content")?; + let input = std::str::from_utf8(&bytes).context("Expecting UTF8 content")?; + let workflow = toml::from_str::(input).context("Parsing TOML content")?; + Ok(workflow) +} + +fn load_operation_workflow( + workflows: &mut WorkflowSupervisor, + workflow: OperationWorkflow, +) -> Result { + let name = workflow.operation.to_string(); + workflows.register_custom_workflow(workflow)?; + Ok(name) +} diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs b/crates/core/tedge_agent/src/operation_workflows/tests.rs similarity index 98% rename from crates/core/tedge_agent/src/tedge_operation_converter/tests.rs rename to crates/core/tedge_agent/src/operation_workflows/tests.rs index 937a9cb0922..1806fae1304 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/tests.rs +++ b/crates/core/tedge_agent/src/operation_workflows/tests.rs @@ -1,6 +1,6 @@ +use crate::operation_workflows::builder::WorkflowActorBuilder; +use crate::operation_workflows::config::OperationConfig; use crate::software_manager::actor::SoftwareCommand; -use crate::tedge_operation_converter::builder::TedgeOperationConverterBuilder; -use crate::tedge_operation_converter::config::OperationConfig; use camino::Utf8Path; use serde_json::json; use std::process::Output; @@ -367,12 +367,8 @@ async fn spawn_mqtt_operation_converter(device_topic_id: &str) -> Result