Skip to content

Commit

Permalink
Merge pull request #3180 from didier-wenzek/feat/load-operation-workf…
Browse files Browse the repository at this point in the history
…lows-on-updates

feat: support reloading workflows at runtime
  • Loading branch information
didier-wenzek authored Oct 23, 2024
2 parents 55c1140 + d70209e commit 484e54f
Show file tree
Hide file tree
Showing 24 changed files with 1,248 additions and 162 deletions.
14 changes: 7 additions & 7 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use crate::device_profile_manager::DeviceProfileManagerBuilder;
use crate::file_transfer_server::actor::FileTransferServerBuilder;
use crate::file_transfer_server::actor::FileTransferServerConfig;
use crate::operation_file_cache::FileCacheActorBuilder;
use crate::operation_workflows::load_operation_workflows;
use crate::operation_workflows::OperationConfig;
use crate::operation_workflows::WorkflowActorBuilder;
use crate::restart_manager::builder::RestartManagerBuilder;
use crate::restart_manager::config::RestartManagerConfig;
use crate::software_manager::builder::SoftwareManagerBuilder;
use crate::software_manager::config::SoftwareManagerConfig;
use crate::state_repository::state::agent_state_dir;
use crate::state_repository::state::agent_default_state_dir;
use crate::tedge_to_te_converter::converter::TedgetoTeConverter;
use crate::AgentOpt;
use crate::Capabilities;
Expand Down Expand Up @@ -219,7 +218,7 @@ impl Agent {
#[instrument(skip(self), name = "sm-agent")]
pub fn init(&self) -> Result<(), anyhow::Error> {
// `config_dir` by default is `/etc/tedge` (or whatever the user sets with --config-dir)
create_directory_with_defaults(agent_state_dir(self.config.config_dir.clone()))?;
create_directory_with_defaults(agent_default_state_dir(self.config.config_dir.clone()))?;
create_directory_with_defaults(&self.config.agent_log_dir)?;
create_directory_with_defaults(&self.config.data_dir)?;
create_directory_with_defaults(&self.config.http_config.file_transfer_dir)?;
Expand All @@ -242,8 +241,10 @@ impl Agent {
// as it will create the device_profile workflow if it does not already exist
DeviceProfileManagerBuilder::try_new(&self.config.operations_dir)?;

// Operation workflows
let workflows = load_operation_workflows(&self.config.operations_dir).await?;
// Inotify actor
let mut fs_watch_actor_builder = FsWatchActorBuilder::new();

// Script actor
let mut script_runner: ServerActorBuilder<ScriptActor, Concurrent> = ScriptActor::builder();

// Restart actor
Expand All @@ -258,9 +259,9 @@ impl Agent {
// Converter actor
let mut converter_actor_builder = WorkflowActorBuilder::new(
self.config.operation_config,
workflows,
&mut mqtt_actor_builder,
&mut script_runner,
&mut fs_watch_actor_builder,
);
converter_actor_builder.register_builtin_operation(&mut restart_actor_builder);
converter_actor_builder.register_builtin_operation(&mut software_update_builder);
Expand All @@ -284,7 +285,6 @@ impl Agent {
&self.config.service,
);

let mut fs_watch_actor_builder = FsWatchActorBuilder::new();
let mut downloader_actor_builder = DownloaderActor::new(
self.config.identity.clone(),
self.config.cloud_root_certs.clone(),
Expand Down
58 changes: 44 additions & 14 deletions crates/core/tedge_agent/src/operation_workflows/actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::operation_workflows::message_box::CommandDispatcher;
use crate::operation_workflows::persist::WorkflowRepository;
use crate::state_repository::state::AgentStateRepository;
use async_trait::async_trait;
use camino::Utf8PathBuf;
Expand Down Expand Up @@ -30,8 +31,8 @@ use tedge_api::workflow::GenericStateUpdate;
use tedge_api::workflow::OperationAction;
use tedge_api::workflow::OperationName;
use tedge_api::workflow::WorkflowExecutionError;
use tedge_api::workflow::WorkflowSupervisor;
use tedge_api::CommandLog;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_script_ext::Execute;
Expand All @@ -42,12 +43,12 @@ use tokio::time::sleep;
#[derive(Debug)]
pub struct InternalCommandState(GenericCommandState);

fan_in_message_type!(AgentInput[MqttMessage, InternalCommandState, GenericCommandData] : Debug);
fan_in_message_type!(AgentInput[MqttMessage, InternalCommandState, GenericCommandData, FsWatchEvent] : Debug);

pub struct WorkflowActor {
pub(crate) mqtt_schema: MqttSchema,
pub(crate) device_topic_id: EntityTopicId,
pub(crate) workflows: WorkflowSupervisor,
pub(crate) workflow_repository: WorkflowRepository,
pub(crate) state_repository: AgentStateRepository<CommandBoard>,
pub(crate) log_dir: Utf8PathBuf,
pub(crate) input_receiver: UnboundedLoggingReceiver<AgentInput>,
Expand All @@ -64,6 +65,7 @@ impl Actor for WorkflowActor {
}

async fn run(mut self) -> Result<(), RuntimeError> {
self.workflow_repository.load().await;
self.publish_operation_capabilities().await?;
self.load_command_board().await?;

Expand All @@ -83,6 +85,19 @@ impl Actor for WorkflowActor {
)) => {
self.publish_builtin_capability(operation, payload).await?;
}
AgentInput::FsWatchEvent(file_update) => {
if let Some(updated_capability) = self
.workflow_repository
.update_operation_workflows(
&self.mqtt_schema,
&self.device_topic_id,
file_update,
)
.await
{
self.mqtt_publisher.send(updated_capability).await?
}
}
}
}
Ok(())
Expand All @@ -92,7 +107,7 @@ impl Actor for WorkflowActor {
impl WorkflowActor {
async fn publish_operation_capabilities(&mut self) -> Result<(), RuntimeError> {
for capability in self
.workflows
.workflow_repository
.capability_messages(&self.mqtt_schema, &self.device_topic_id)
{
self.mqtt_publisher.send(capability).await?
Expand Down Expand Up @@ -135,7 +150,11 @@ impl WorkflowActor {

let mut log_file = self.open_command_log(&state, &operation, &cmd_id);

match self.workflows.apply_external_update(&operation, state) {
match self
.workflow_repository
.apply_external_update(&operation, state)
.await
{
Ok(None) => (),
Ok(Some(new_state)) => {
self.persist_command_board().await?;
Expand Down Expand Up @@ -172,7 +191,7 @@ impl WorkflowActor {
};
let mut log_file = self.open_command_log(&state, &operation, &cmd_id);

let action = match self.workflows.get_action(&state) {
let action = match self.workflow_repository.get_action(&state) {
Ok(action) => action,
Err(WorkflowExecutionError::UnknownStep { operation, step }) => {
info!("No action defined for {operation} operation {step} step");
Expand All @@ -192,7 +211,9 @@ impl WorkflowActor {

match action {
OperationAction::Clear => {
if let Some(invoking_command) = self.workflows.invoking_command_state(&state) {
if let Some(invoking_command) =
self.workflow_repository.invoking_command_state(&state)
{
log_file
.log_info(&format!(
"Resuming invoking command {}",
Expand Down Expand Up @@ -336,7 +357,7 @@ impl WorkflowActor {

// Get the sub-operation state and resume this command when the sub-operation is in a terminal state
if let Some(sub_state) = self
.workflows
.workflow_repository
.sub_command_state(&state)
.map(|s| s.to_owned())
{
Expand Down Expand Up @@ -423,8 +444,11 @@ impl WorkflowActor {
&mut self,
new_state: GenericCommandState,
) -> Result<(), RuntimeError> {
let adapted_state = self.workflows.adapt_builtin_response(new_state);
if let Err(err) = self.workflows.apply_internal_update(adapted_state.clone()) {
let adapted_state = self.workflow_repository.adapt_builtin_response(new_state);
if let Err(err) = self
.workflow_repository
.apply_internal_update(adapted_state.clone())
{
error!("Fail to persist workflow operation state: {err}");
}
self.persist_command_board().await?;
Expand All @@ -441,7 +465,7 @@ impl WorkflowActor {
cmd_id: &str,
) -> CommandLog {
let (root_operation, root_cmd_id) = match self
.workflows
.workflow_repository
.root_invoking_command_state(state)
.map(|s| s.topic.as_ref())
.and_then(|root_topic| self.extract_command_identifiers(root_topic).ok())
Expand All @@ -465,7 +489,10 @@ impl WorkflowActor {
new_state: GenericCommandState,
log_file: &mut CommandLog,
) -> Result<(), RuntimeError> {
if let Err(err) = self.workflows.apply_internal_update(new_state.clone()) {
if let Err(err) = self
.workflow_repository
.apply_internal_update(new_state.clone())
{
error!("Fail to persist workflow operation state: {err}");
}
self.persist_command_board().await?;
Expand All @@ -483,7 +510,10 @@ impl WorkflowActor {
async fn load_command_board(&mut self) -> Result<(), RuntimeError> {
match self.state_repository.load().await {
Ok(Some(pending_commands)) => {
for command in self.workflows.load_pending_commands(pending_commands) {
for command in self
.workflow_repository
.load_pending_commands(pending_commands)
{
self.process_command_update(command.clone()).await?;
}
}
Expand All @@ -500,7 +530,7 @@ impl WorkflowActor {

/// Persist on-disk the current state of the pending command requests
async fn persist_command_board(&mut self) -> Result<(), RuntimeError> {
let pending_commands = self.workflows.pending_commands();
let pending_commands = self.workflow_repository.pending_commands();
if let Err(err) = self.state_repository.store(pending_commands).await {
error!(
"Fail to persist pending command requests in {} due to: {}",
Expand Down
34 changes: 16 additions & 18 deletions crates/core/tedge_agent/src/operation_workflows/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::operation_workflows::actor::InternalCommandState;
use crate::operation_workflows::actor::WorkflowActor;
use crate::operation_workflows::config::OperationConfig;
use crate::operation_workflows::message_box::CommandDispatcher;
use crate::operation_workflows::persist::WorkflowRepository;
use crate::state_repository::state::agent_state_dir;
use crate::state_repository::state::AgentStateRepository;
use log::error;
use std::path::PathBuf;
use std::process::Output;
use tedge_actors::futures::channel::mpsc;
use tedge_actors::Builder;
Expand All @@ -27,14 +29,13 @@ use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::workflow::GenericCommandData;
use tedge_api::workflow::GenericCommandState;
use tedge_api::workflow::OperationName;
use tedge_api::workflow::WorkflowSupervisor;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
use tedge_script_ext::Execute;

pub struct WorkflowActorBuilder {
config: OperationConfig,
workflows: WorkflowSupervisor,
input_sender: DynSender<AgentInput>,
input_receiver: UnboundedLoggingReceiver<AgentInput>,
command_dispatcher: CommandDispatcher,
Expand All @@ -47,9 +48,9 @@ pub struct WorkflowActorBuilder {
impl WorkflowActorBuilder {
pub fn new(
config: OperationConfig,
workflows: WorkflowSupervisor,
mqtt_actor: &mut (impl MessageSource<MqttMessage, TopicFilter> + MessageSink<MqttMessage>),
script_runner: &mut impl Service<Execute, std::io::Result<Output>>,
fs_notify: &mut impl MessageSource<FsWatchEvent, PathBuf>,
) -> Self {
let (input_sender, input_receiver) = mpsc::unbounded();
let (signal_sender, signal_receiver) = mpsc::channel(10);
Expand All @@ -73,9 +74,10 @@ impl WorkflowActorBuilder {

let script_runner = ClientMessageBox::new(script_runner);

fs_notify.connect_sink(config.operations_dir.clone().into(), &input_sender);

Self {
config,
workflows,
input_sender,
input_receiver,
command_dispatcher,
Expand Down Expand Up @@ -118,23 +120,19 @@ impl Builder<WorkflowActor> for WorkflowActorBuilder {
Ok(self.build())
}

fn build(mut self) -> WorkflowActor {
for capability in self.command_dispatcher.capabilities() {
if let Err(err) = self
.workflows
.register_builtin_workflow(capability.as_str().into())
{
error!("Fail to register built-in workflow for {capability} operation: {err}");
}
}
fn build(self) -> WorkflowActor {
let builtin_workflows = self.command_dispatcher.capabilities();
let custom_workflows_dir = self.config.operations_dir;
let state_dir = agent_state_dir(self.config.state_dir, self.config.config_dir);
let workflow_repository =
WorkflowRepository::new(builtin_workflows, custom_workflows_dir, state_dir.clone());
let state_repository = AgentStateRepository::with_state_dir(state_dir, "workflows");

let repository =
AgentStateRepository::new(self.config.state_dir, self.config.config_dir, "workflows");
WorkflowActor {
mqtt_schema: self.config.mqtt_schema,
device_topic_id: self.config.device_topic_id,
workflows: self.workflows,
state_repository: repository,
workflow_repository,
state_repository,
log_dir: self.config.log_dir,
input_receiver: self.input_receiver,
builtin_command_dispatcher: self.command_dispatcher,
Expand Down
5 changes: 4 additions & 1 deletion crates/core/tedge_agent/src/operation_workflows/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct OperationConfig {
pub log_dir: Utf8PathBuf,
pub config_dir: Utf8PathBuf,
pub state_dir: Utf8PathBuf,
pub operations_dir: Utf8PathBuf,
}

impl OperationConfig {
Expand All @@ -17,14 +18,16 @@ impl OperationConfig {
device_topic_id: &EntityTopicId,
tedge_config_location: &tedge_config::TEdgeConfigLocation,
) -> Result<OperationConfig, tedge_config::TEdgeConfigError> {
let config_dir = &tedge_config_location.tedge_config_root_path;
let tedge_config = tedge_config::TEdgeConfig::try_new(tedge_config_location.clone())?;

Ok(OperationConfig {
mqtt_schema: MqttSchema::with_root(topic_root),
device_topic_id: device_topic_id.clone(),
log_dir: tedge_config.logs.path.join("agent"),
config_dir: tedge_config_location.tedge_config_root_path.clone(),
config_dir: config_dir.clone(),
state_dir: tedge_config.agent.state.path.clone(),
operations_dir: config_dir.join("operations"),
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl CommandDispatcher {
}

/// List the operations for which a builtin handler has been registered
pub fn capabilities(&self) -> Vec<&OperationName> {
self.senders.keys().collect()
pub fn capabilities(&self) -> Vec<OperationName> {
self.senders.keys().cloned().collect()
}
}
Loading

0 comments on commit 484e54f

Please sign in to comment.