diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index 4a1999914e8..b2cb34c1117 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -51,7 +51,7 @@ pub struct WorkflowActor { pub(crate) state_repository: AgentStateRepository, pub(crate) log_dir: Utf8PathBuf, pub(crate) input_receiver: UnboundedLoggingReceiver, - pub(crate) command_dispatcher: CommandDispatcher, + pub(crate) builtin_command_dispatcher: CommandDispatcher, pub(crate) command_sender: DynSender, pub(crate) mqtt_publisher: LoggingSender, pub(crate) script_runner: ClientMessageBox>, @@ -73,16 +73,15 @@ impl Actor for WorkflowActor { self.process_mqtt_message(message).await?; } AgentInput::InternalCommandState(InternalCommandState(command_state)) => { - self.process_internal_state_update(command_state).await?; + self.process_command_update(command_state).await?; } AgentInput::GenericCommandData(GenericCommandData::State(new_state)) => { - self.process_command_state_update(new_state).await?; + self.process_builtin_command_update(new_state).await?; } AgentInput::GenericCommandData(GenericCommandData::Metadata( GenericCommandMetadata { operation, payload }, )) => { - self.publish_operation_capability(operation, payload) - .await?; + self.publish_builtin_capability(operation, payload).await?; } } } @@ -101,7 +100,7 @@ impl WorkflowActor { Ok(()) } - async fn publish_operation_capability( + async fn publish_builtin_capability( &mut self, operation: OperationName, payload: serde_json::Value, @@ -117,6 +116,11 @@ impl WorkflowActor { Ok(()) } + /// Process a command update received from MQTT + /// + /// Beware, these updates are coming from external components (the mapper inits and clears commands), + /// but also from *this* actor as all its state transitions are published over MQTT. + /// Only the former will be actually processed with [Self::process_command_update]. async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> { let Ok((operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name) else { log::error!("Unknown command channel: {}", &message.topic.name); @@ -136,7 +140,7 @@ impl WorkflowActor { Ok(Some(new_state)) => { self.persist_command_board().await?; if new_state.is_init() { - self.process_internal_state_update(new_state.set_log_path(&log_file.path)) + self.process_command_update(new_state.set_log_path(&log_file.path)) .await?; } } @@ -152,7 +156,13 @@ impl WorkflowActor { Ok(()) } - async fn process_internal_state_update( + /// Process a command state update taking any action as defined by the workflow + /// + /// A new state can be received: + /// - from MQTT as for init and clear messages + /// - from the engine itself when a progress is made + /// - from one of the builtin operation actors + async fn process_command_update( &mut self, state: GenericCommandState, ) -> Result<(), RuntimeError> { @@ -205,10 +215,25 @@ impl WorkflowActor { let new_state = state.move_to(next_step); self.publish_command_state(new_state, &mut log_file).await } - OperationAction::BuiltIn => { + OperationAction::BuiltIn(_, _) => { let step = &state.status; info!("Processing {operation} operation {step} step"); - Ok(self.command_dispatcher.send(state).await?) + + Ok(self.builtin_command_dispatcher.send(state).await?) + } + OperationAction::BuiltInOperation(ref builtin_op, ref handlers) => { + let step = &state.status; + info!("Executing builtin:{builtin_op} operation {step} step"); + + // Fork a builtin state + let builtin_state = action.adapt_builtin_request(state.clone()); + + // Move to the next state to await the builtin operation outcome + let new_state = state.update(handlers.on_exec.clone()); + self.publish_command_state(new_state, &mut log_file).await?; + + // Forward the command to the builtin operation actor + Ok(self.builtin_command_dispatcher.send(builtin_state).await?) } OperationAction::AwaitingAgentRestart(handlers) => { let step = &state.status; @@ -307,7 +332,7 @@ impl WorkflowActor { } OperationAction::AwaitOperationCompletion(handlers, output_excerpt) => { let step = &state.status; - info!("{operation} operation {step} waiting for sub-operation completion"); + info!("{operation} operation {step}: waiting for sub-operation completion"); // Get the sub-operation state and resume this command when the sub-operation is in a terminal state if let Some(sub_state) = self @@ -349,8 +374,6 @@ impl WorkflowActor { )) .await; } - } else { - log_file.log_info("=> sub-operation not yet launched").await; }; Ok(()) @@ -376,18 +399,39 @@ impl WorkflowActor { } } - async fn process_command_state_update( + /// Pre-process an update received from a builtin operation actor + /// + /// The actual work will be done by [Self::process_command_update]. + async fn process_builtin_command_update( &mut self, new_state: GenericCommandState, ) -> Result<(), RuntimeError> { - if let Err(err) = self.workflows.apply_internal_update(new_state.clone()) { + if new_state.is_finished() { + self.finalize_builtin_command_update(new_state).await + } else { + // As not finalized, the builtin state is sent back + // to the builtin operation actor for further processing. + let builtin_state = new_state.clone(); + Ok(self.builtin_command_dispatcher.send(builtin_state).await?) + } + } + + /// Finalize a builtin operation + /// + /// Moving to the next step calling [Self::process_command_update]. + async fn finalize_builtin_command_update( + &mut self, + new_state: GenericCommandState, + ) -> Result<(), RuntimeError> { + let adapted_state = self.workflows.adapt_builtin_response(new_state); + if let Err(err) = self.workflows.apply_internal_update(adapted_state.clone()) { error!("Fail to persist workflow operation state: {err}"); } self.persist_command_board().await?; self.mqtt_publisher - .send(new_state.clone().into_message()) + .send(adapted_state.clone().into_message()) .await?; - self.process_internal_state_update(new_state).await + self.process_command_update(adapted_state).await } fn open_command_log( @@ -440,7 +484,7 @@ impl WorkflowActor { match self.state_repository.load().await { Ok(Some(pending_commands)) => { for command in self.workflows.load_pending_commands(pending_commands) { - self.process_internal_state_update(command.clone()).await?; + self.process_command_update(command.clone()).await?; } } Ok(None) => {} diff --git a/crates/core/tedge_agent/src/operation_workflows/builder.rs b/crates/core/tedge_agent/src/operation_workflows/builder.rs index 7fa0d7029d0..1d4d846acee 100644 --- a/crates/core/tedge_agent/src/operation_workflows/builder.rs +++ b/crates/core/tedge_agent/src/operation_workflows/builder.rs @@ -137,7 +137,7 @@ impl Builder for WorkflowActorBuilder { state_repository: repository, log_dir: self.config.log_dir, input_receiver: self.input_receiver, - command_dispatcher: self.command_dispatcher, + builtin_command_dispatcher: self.command_dispatcher, mqtt_publisher: self.mqtt_publisher, command_sender: self.command_sender, script_runner: self.script_runner, diff --git a/crates/core/tedge_agent/src/operation_workflows/tests.rs b/crates/core/tedge_agent/src/operation_workflows/tests.rs index c2e6b8060d3..ab8cd39dc4c 100644 --- a/crates/core/tedge_agent/src/operation_workflows/tests.rs +++ b/crates/core/tedge_agent/src/operation_workflows/tests.rs @@ -204,13 +204,13 @@ async fn convert_outgoing_software_list_response() -> Result<(), DynError> { SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string()); let software_list_response = software_list_request .clone() - .with_status(CommandStatus::Executing); + .with_status(CommandStatus::Successful); software_box.send(software_list_response.into()).await?; mqtt_box .assert_received([MqttMessage::new( &Topic::new_unchecked("te/device/main///cmd/software_list/1234"), - r#"{"status":"executing"}"#, + r#"{"status":"successful"}"#, ) .with_retain()]) .await; @@ -286,13 +286,13 @@ async fn convert_outgoing_software_update_response() -> Result<(), DynError> { // Simulate SoftwareUpdate response message received. let software_update_request = SoftwareUpdateCommand::new(&EntityTopicId::default_main_device(), "1234".to_string()); - let software_update_response = software_update_request.with_status(CommandStatus::Executing); + let software_update_response = software_update_request.with_status(CommandStatus::Successful); software_box.send(software_update_response.into()).await?; mqtt_box .assert_received([MqttMessage::new( &Topic::new_unchecked("te/device/main///cmd/software_update/1234"), - r#"{"status":"executing"}"#, + r#"{"status":"successful"}"#, ) .with_retain()]) .await; @@ -325,7 +325,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> { let executing_response = RestartCommand { target: EntityTopicId::default_main_device(), cmd_id: "abc".to_string(), - payload: RestartCommandPayload::new(CommandStatus::Executing), + payload: RestartCommandPayload::new(CommandStatus::Successful), }; restart_box.send(executing_response).await?; @@ -335,7 +335,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> { .map(|msg| (msg.topic, msg.payload)) .expect("MqttMessage"); assert_eq!(topic.name, "te/device/main///cmd/restart/abc"); - assert!(format!("{:?}", payload).contains(r#"status":"executing"#)); + assert!(format!("{:?}", payload).contains(r#"status":"successful"#)); Ok(()) } diff --git a/crates/core/tedge_api/src/commands.rs b/crates/core/tedge_api/src/commands.rs index 5a7ffacc2c5..32c91857b43 100644 --- a/crates/core/tedge_api/src/commands.rs +++ b/crates/core/tedge_api/src/commands.rs @@ -731,23 +731,6 @@ pub enum CommandStatus { Unknown, } -impl CommandStatus { - pub fn is_terminal_status(&self) -> bool { - matches!( - self, - CommandStatus::Successful | CommandStatus::Failed { reason: _ } - ) - } - - pub fn is_successful(&self) -> bool { - *self == CommandStatus::Successful - } - - pub fn is_failed(&self) -> bool { - matches!(self, CommandStatus::Failed { reason: _ }) - } -} - fn default_failure_reason() -> String { "Unknown reason".to_string() } diff --git a/crates/core/tedge_api/src/workflow/error.rs b/crates/core/tedge_api/src/workflow/error.rs index 6cd8291ba4a..c8545fe54ee 100644 --- a/crates/core/tedge_api/src/workflow/error.rs +++ b/crates/core/tedge_api/src/workflow/error.rs @@ -20,6 +20,12 @@ pub enum WorkflowDefinitionError { #[error("The provided target {0} is not a valid path expression")] InvalidPathExpression(String), + + #[error("The `builtin:{builtin_operation}` cannot be invoked from `{main_operation}`, but only from `{builtin_operation}`")] + InvalidBuiltinOperation { + main_operation: String, + builtin_operation: String, + }, } /// Error related to a script definition diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index 5754db26e96..baf5cd92d37 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -57,13 +57,18 @@ pub enum OperationAction { /// ``` MoveTo(GenericStateUpdate), - /// The built-in behavior is used + /// Implied built-in operation (for backward compatibility) + /// + /// - the operation name is derived from the workflow + /// - the step (trigger vs await) is derived from the command status (scheduled vs executing) /// /// ```toml /// action = "builtin" + /// on_exec = "" /// on_success = "" + /// on_error = "" /// ``` - BuiltIn, + BuiltIn(ExecHandlers, AwaitHandlers), /// Await agent restart /// @@ -89,7 +94,7 @@ pub enum OperationAction { /// background_script = "sudo systemctl restart tedge-agent" /// on_exec = "" /// ``` - BgScript(ShellScript, BgExitHandlers), + BgScript(ShellScript, ExecHandlers), /// Trigger an operation and move to the next state from where the outcome of the operation will be awaited /// @@ -103,9 +108,17 @@ pub enum OperationAction { OperationName, Option, StateExcerpt, - BgExitHandlers, + ExecHandlers, ), + /// Trigger a built-in operation + /// + /// ```toml + /// operation = "" + /// on_exec = "" + /// ``` + BuiltInOperation(OperationName, ExecHandlers), + /// Await the completion of a sub-operation /// /// The sub-operation is stored in the command state. @@ -140,7 +153,7 @@ impl Display for OperationAction { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let str = match self { OperationAction::MoveTo(step) => format!("move to {step} state"), - OperationAction::BuiltIn => "builtin".to_string(), + OperationAction::BuiltIn(_, _) => "builtin action".to_string(), OperationAction::AwaitingAgentRestart { .. } => "await agent restart".to_string(), OperationAction::Script(script, _) => script.to_string(), OperationAction::BgScript(script, _) => script.to_string(), @@ -151,6 +164,9 @@ impl Display for OperationAction { script ), }, + OperationAction::BuiltInOperation(operation, _) => { + format!("execute builtin:{operation}") + } OperationAction::AwaitOperationCompletion { .. } => { "await sub-operation completion".to_string() } @@ -202,6 +218,22 @@ impl OperationWorkflow { }); } + let main_operation = operation.to_string(); + for (_, action) in states.iter() { + match action { + // A `builtin:` can only be invoked from the same `` + OperationAction::BuiltInOperation(builtin_operation, _) + if builtin_operation != &main_operation => + { + return Err(WorkflowDefinitionError::InvalidBuiltinOperation { + main_operation, + builtin_operation: builtin_operation.clone(), + }) + } + _ => continue, + } + } + Ok(OperationWorkflow { operation, built_in: false, @@ -212,10 +244,22 @@ impl OperationWorkflow { /// Create a built-in operation workflow pub fn built_in(operation: OperationType) -> Self { + let operation_name = operation.to_string(); + let exec_handler = ExecHandlers::builtin_default(); + let await_handler = AwaitHandlers::builtin_default(); let states = [ ("init", OperationAction::MoveTo("scheduled".into())), - ("scheduled", OperationAction::BuiltIn), - ("executing", OperationAction::BuiltIn), + ( + "scheduled", + OperationAction::BuiltInOperation(operation_name.clone(), exec_handler), + ), + ( + "executing", + OperationAction::AwaitOperationCompletion( + await_handler, + StateExcerpt::whole_payload(), + ), + ), ("successful", OperationAction::Clear), ("failed", OperationAction::Clear), ] @@ -384,6 +428,57 @@ impl OperationAction { Ok(new_state) } + + /// Rewrite a command state before pushing it to a builtin operation actor + /// + /// Return the command state unchanged if there is no appropriate substitute. + pub fn adapt_builtin_request(&self, command_state: GenericCommandState) -> GenericCommandState { + match self { + OperationAction::BuiltInOperation(_, _) => { + command_state.update(GenericStateUpdate::scheduled()) + } + _ => command_state, + } + } + + /// Rewrite the command state returned by a builtin operation actor + /// + /// Depending the operation is executing, successful or failed, + /// set the new state using the user provided handlers + /// + /// Return the command state unchanged if there is no appropriate handlers. + pub fn adapt_builtin_response( + &self, + command_state: GenericCommandState, + ) -> GenericCommandState { + match self { + OperationAction::BuiltIn(exec_handlers, _) + | OperationAction::BuiltInOperation(_, exec_handlers) + if command_state.is_executing() => + { + command_state.update(exec_handlers.on_exec.clone()) + } + OperationAction::BuiltIn(_, await_handlers) + | OperationAction::AwaitOperationCompletion(await_handlers, _) + if command_state.is_successful() => + { + command_state.update(await_handlers.on_success.clone()) + } + OperationAction::BuiltIn(_, await_handlers) + | OperationAction::AwaitOperationCompletion(await_handlers, _) + if command_state.is_failed() => + { + let mut on_error = await_handlers.on_error.clone(); + if on_error.reason.is_none() { + if let Some(builtin_reason) = command_state.failure_reason() { + on_error.reason = Some(builtin_reason.to_string()); + } + } + command_state.update(on_error) + } + _ => command_state, + } + } } #[derive(thiserror::Error, Debug, Eq, PartialEq)] diff --git a/crates/core/tedge_api/src/workflow/script.rs b/crates/core/tedge_api/src/workflow/script.rs index 3ed09d9a575..3fa4a2a80ee 100644 --- a/crates/core/tedge_api/src/workflow/script.rs +++ b/crates/core/tedge_api/src/workflow/script.rs @@ -297,20 +297,28 @@ fn extract_script_output(stdout: String) -> Option { None } -/// Define how to handle a background script +/// Define how to handle background scripts and actions #[derive(Clone, Debug, Default, Eq, PartialEq)] -pub struct BgExitHandlers { +pub struct ExecHandlers { pub on_exec: GenericStateUpdate, } -impl BgExitHandlers { +impl ExecHandlers { pub fn try_new(on_exec: Option) -> Result { - Ok(BgExitHandlers { + Ok(ExecHandlers { on_exec: on_exec.unwrap_or_else(GenericStateUpdate::successful), }) } } +impl ExecHandlers { + pub fn builtin_default() -> Self { + ExecHandlers { + on_exec: GenericStateUpdate::executing(), + } + } +} + /// Define how to await the completion of a command #[derive(Clone, Debug, Eq, PartialEq)] pub struct AwaitHandlers { @@ -320,6 +328,17 @@ pub struct AwaitHandlers { pub on_timeout: GenericStateUpdate, } +impl AwaitHandlers { + pub fn builtin_default() -> Self { + AwaitHandlers { + timeout: None, + on_success: GenericStateUpdate::successful(), + on_error: GenericStateUpdate::unknown_error(), + on_timeout: GenericStateUpdate::timeout(), + } + } +} + /// Define state transition on each iteration outcome #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct IterateHandlers { diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index cac75fc8f94..ccfd7fdd4c0 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -64,6 +64,8 @@ pub struct GenericStateUpdate { const STATUS: &str = "status"; const INIT: &str = "init"; +const SCHEDULED: &str = "scheduled"; +const EXECUTING: &str = "executing"; const SUCCESSFUL: &str = "successful"; const FAILED: &str = "failed"; const REASON: &str = "reason"; @@ -206,6 +208,19 @@ impl GenericCommandState { self.update_with_json(json_update) } + /// Merge this state into a more complete state overriding all values defined both side + pub fn merge_into(self, mut state: Self) -> Self { + state.status = self.status; + if let Some(properties) = state.payload.as_object_mut() { + if let Value::Object(new_properties) = self.payload { + for (key, value) in new_properties.into_iter() { + properties.insert(key, value); + } + } + } + state + } + /// Update the command state with a new status describing the next state pub fn move_to(mut self, update: GenericStateUpdate) -> Self { let status = update.status; @@ -421,15 +436,19 @@ impl GenericCommandState { } pub fn is_init(&self) -> bool { - matches!(self.status.as_str(), INIT) + self.status.as_str() == INIT + } + + pub fn is_executing(&self) -> bool { + self.status.as_str() == EXECUTING } pub fn is_successful(&self) -> bool { - matches!(self.status.as_str(), SUCCESSFUL) + self.status.as_str() == SUCCESSFUL } pub fn is_failed(&self) -> bool { - matches!(self.status.as_str(), FAILED) + self.status.as_str() == FAILED } pub fn is_finished(&self) -> bool { @@ -450,6 +469,20 @@ impl GenericStateUpdate { json!({STATUS: INIT}) } + pub fn scheduled() -> Self { + GenericStateUpdate { + status: SCHEDULED.to_string(), + reason: None, + } + } + + pub fn executing() -> Self { + GenericStateUpdate { + status: EXECUTING.to_string(), + reason: None, + } + } + pub fn successful() -> Self { GenericStateUpdate { status: SUCCESSFUL.to_string(), @@ -583,6 +616,11 @@ pub enum StateExcerpt { } impl StateExcerpt { + /// Excerpt returning the whole payload of a command state + pub fn whole_payload() -> Self { + StateExcerpt::PathExpr("${.}".to_string()) + } + /// Extract a JSON value from the input state pub fn extract_value_from(&self, input: &GenericCommandState) -> Value { match self { diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 4221e6b8d7e..2d183171701 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -131,6 +131,33 @@ impl WorkflowSupervisor { self.commands.get_state(command).map(|(_, state)| state) } + /// Rewrite the command state returned by a builtin operation actor + /// + /// Depending the operation is executing, successful or failed, + /// set the new state using the user provided handlers + /// + /// This method also takes care of the fact that the builtin operations + /// only return the state properties they care about. + /// Hence the command state is merged into the persisted state of the command. + /// + /// Return the command state unchanged if there is an error or no appropriate handlers. + pub fn adapt_builtin_response( + &self, + command_state: GenericCommandState, + ) -> GenericCommandState { + let command_id = &command_state.topic; + if let Some(current_state) = self.get_state(command_id.as_ref()) { + let new_state = command_state.merge_into(current_state.clone()); + if let Ok(current_action) = self.get_action(current_state) { + return current_action.adapt_builtin_response(new_state); + } else { + return new_state; + } + }; + + command_state + } + /// Return the state of the invoking command of a command, if any pub fn invoking_command_state( &self, @@ -165,8 +192,6 @@ impl WorkflowSupervisor { } /// Update the state of the command board on reception of new state for a command - /// - /// Return the next CommandRequest state if any is required. pub fn apply_internal_update( &mut self, new_command_state: GenericCommandState, diff --git a/crates/core/tedge_api/src/workflow/toml_config.rs b/crates/core/tedge_api/src/workflow/toml_config.rs index 457e70fc567..d9ae45f210e 100644 --- a/crates/core/tedge_api/src/workflow/toml_config.rs +++ b/crates/core/tedge_api/src/workflow/toml_config.rs @@ -1,7 +1,7 @@ use crate::mqtt_topics::OperationType; use crate::workflow::AwaitHandlers; -use crate::workflow::BgExitHandlers; use crate::workflow::DefaultHandlers; +use crate::workflow::ExecHandlers; use crate::workflow::ExitHandlers; use crate::workflow::GenericCommandState; use crate::workflow::GenericStateUpdate; @@ -124,26 +124,35 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { ) -> Result { match input.action { TomlOperationAction::Script(script) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = ExitHandlers::try_from((input.handlers, defaults))?; Ok(OperationAction::Script(script, handlers)) } TomlOperationAction::BackgroundScript(script) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = ExecHandlers::try_from((input.handlers, defaults))?; Ok(OperationAction::BgScript(script, handlers)) } - TomlOperationAction::Operation(operation) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; - let input_script = input.input_script; - let cmd_input = input.input.try_into()?; - Ok(OperationAction::Operation( - operation, - input_script, - cmd_input, - handlers, - )) - } + TomlOperationAction::Operation(operation) => match operation.strip_prefix("builtin:") { + None => { + let handlers = ExecHandlers::try_from((input.handlers, defaults))?; + let input_script = input.input_script; + let cmd_input = input.input.try_into()?; + Ok(OperationAction::Operation( + operation, + input_script, + cmd_input, + handlers, + )) + } + Some(builtin_operation_name) => { + let handlers = ExecHandlers::try_from((input.handlers, defaults))?; + Ok(OperationAction::BuiltInOperation( + builtin_operation_name.to_string(), + handlers, + )) + } + }, TomlOperationAction::Iterate(target_json_path) => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = IterateHandlers::try_from((input.handlers, defaults))?; let Some(json_path) = GenericCommandState::extract_path(&target_json_path) else { return Err(WorkflowDefinitionError::InvalidPathExpression( target_json_path, @@ -152,7 +161,6 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { Ok(OperationAction::Iterate(json_path.to_string(), handlers)) } TomlOperationAction::Action(command) => match command.as_str() { - "builtin" => Ok(OperationAction::BuiltIn), "cleanup" => Ok(OperationAction::Clear), "proceed" => { let on_success: GenericStateUpdate = input @@ -163,16 +171,27 @@ impl TryFrom<(TomlOperationState, DefaultHandlers)> for OperationAction { Ok(OperationAction::MoveTo(on_success)) } "await-agent-restart" => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = AwaitHandlers::try_from((input.handlers, defaults))?; Ok(OperationAction::AwaitingAgentRestart(handlers)) } "await-operation-completion" => { - let handlers = TryInto::::try_into((input.handlers, defaults))?; + let handlers = AwaitHandlers::try_from((input.handlers, defaults))?; let cmd_output = input.output.try_into()?; Ok(OperationAction::AwaitOperationCompletion( handlers, cmd_output, )) } + "builtin" => { + let exec_handlers = ExecHandlers::try_from(( + input.handlers.clone(), + ExecHandlers::builtin_default(), + ))?; + let await_handlers = AwaitHandlers::try_from(( + input.handlers, + AwaitHandlers::builtin_default(), + ))?; + Ok(OperationAction::BuiltIn(exec_handlers, await_handlers)) + } _ => Err(WorkflowDefinitionError::UnknownAction { action: command }), }, } @@ -184,11 +203,10 @@ impl TryFrom for OperationWorkflow { fn try_from(input: TomlOperationWorkflow) -> Result { let operation = input.operation; - let default_handlers = TryInto::::try_into(input.handlers)?; + let default_handlers = DefaultHandlers::try_from(input.handlers)?; let mut states = HashMap::new(); for (state, action_spec) in input.states.into_iter() { - let action = - TryInto::::try_into((action_spec, default_handlers.clone()))?; + let action = OperationAction::try_from((action_spec, default_handlers.clone()))?; states.insert(state, action); } @@ -293,14 +311,23 @@ impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for ExitHandlers { } } -impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for BgExitHandlers { +impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for ExecHandlers { type Error = ScriptDefinitionError; fn try_from( (value, _defaults): (TomlExitHandlers, DefaultHandlers), ) -> Result { let on_exec = value.on_exec.map(|u| u.into()); - BgExitHandlers::try_new(on_exec) + ExecHandlers::try_new(on_exec) + } +} + +impl TryFrom<(TomlExitHandlers, ExecHandlers)> for ExecHandlers { + type Error = ScriptDefinitionError; + + fn try_from((value, defaults): (TomlExitHandlers, ExecHandlers)) -> Result { + let on_exec = value.on_exec.map(|u| u.into()).or(Some(defaults.on_exec)); + ExecHandlers::try_new(on_exec) } } @@ -336,6 +363,38 @@ impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for AwaitHandlers { } } +impl TryFrom<(TomlExitHandlers, AwaitHandlers)> for AwaitHandlers { + type Error = ScriptDefinitionError; + + fn try_from( + (handlers, defaults): (TomlExitHandlers, AwaitHandlers), + ) -> Result { + let timeout = handlers + .timeout_second + .map(Duration::from_secs) + .or(defaults.timeout); + let on_success: GenericStateUpdate = handlers + .on_success + .map(|u| u.into()) + .unwrap_or(defaults.on_success); + let on_error = handlers + .on_error + .map(|u| u.into()) + .unwrap_or(defaults.on_error); + let on_timeout = handlers + .on_timeout + .map(|u| u.into()) + .unwrap_or(defaults.on_timeout); + + Ok(AwaitHandlers { + timeout, + on_success, + on_error, + on_timeout, + }) + } +} + impl TryFrom<(TomlExitHandlers, DefaultHandlers)> for IterateHandlers { type Error = WorkflowDefinitionError; @@ -525,7 +584,7 @@ on_exit.0 = "0" on_success = "success" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnSuccessHandler) } @@ -536,7 +595,7 @@ on_exit._ = "wildcard" on_error = "error" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnErrorHandler) } @@ -547,7 +606,7 @@ on_exit.1-5 = "1-5" on_exit.4-8 = "4-8" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!( error, ScriptDefinitionError::OverlappingHandler { @@ -563,7 +622,7 @@ on_exit.4-8 = "4-8" on_exit.5-1 = "oops" "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!( error, ScriptDefinitionError::IncorrectRange { from: 5, to: 1 } @@ -577,7 +636,7 @@ on_success = "successful_state" on_stdout = ["other_successful_state_extracted_from_json"] "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnStdoutHandler) } @@ -588,7 +647,7 @@ on_exit.0 = "successful_state" on_stdout = ["other_successful_state_extracted_from_json"] "#; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let error = TryInto::::try_into(input).unwrap_err(); + let error = ExitHandlers::try_from(input).unwrap_err(); assert_eq!(error, ScriptDefinitionError::DuplicatedOnStdoutHandler) } @@ -596,7 +655,7 @@ on_stdout = ["other_successful_state_extracted_from_json"] fn default_handlers() { let file = ""; let input: TomlExitHandlers = toml::from_str(file).unwrap(); - let handlers = TryInto::::try_into(input).unwrap(); + let handlers = ExitHandlers::try_from(input).unwrap(); assert_eq!(handlers.state_update_on_success().status, "successful"); assert_eq!( handlers.state_update_on_exit("foo.sh", 1).reason.unwrap(), diff --git a/docs/src/references/agent/operation-workflow.md b/docs/src/references/agent/operation-workflow.md index 815dff90184..9ad9e8b1c11 100644 --- a/docs/src/references/agent/operation-workflow.md +++ b/docs/src/references/agent/operation-workflow.md @@ -464,21 +464,20 @@ The set of accepted handlers for an action are the following: - `timeout_second = 3600` the number of second given to the action to execute - `on_timeout = { status = "", reason = ""}` defines the next state when the action is not be completed within the time limit -For some action, notably a device `restart`, the handlers are limited to one: -- `on_exec = ""` defines the next state once the action has been launched in the background. - The action outcome will have to be observed in this `on_exec` state. - Currently, here are the available actions: - `await-agent-restart` awaits for **tedge-agent** to restart - `await-operation-completion` awaits for a sub-operation to reach a success, failure or timeout -- `builtin` is used when a builtin operation is overwritten by a custom workflow and indicates that for that state - the builtin action has to be applied. - `proceed` is a no-op action, simply proceeding to the next state, which is useful when a builtin operation is customized but no specific behavior has to be added on a workflow extension point. - `cleanup` marks the terminal state of the workflow where the command has been fully processed and where the original requester is expected to clean up the command retained message storing its state. - +- ( *deprecated* ) `builtin` is used when a builtin operation is overwritten by a custom workflow and indicates that for that state + the builtin action has to be applied. For backward compatibility, this keyword is rewritten by the agent + as a combination of `operation = "builtin:"` and `action = "await-operation-completion"`. +- ( *deprecated* ) `restart` trigger a device restart. For backward compatibility, this keyword is rewritten by the agent + as a combination of `operation = "restart"` and `action = "await-operation-completion"`. + #### Awaiting the agent to restart When the expected outcome of a script is to restart the device or the agent, @@ -555,62 +554,124 @@ action = "cleanup" action = "cleanup" ``` -#### Proceed and Builtin actions +#### Proceed -The `"proceed"` and `"builtin"` actions are useful when customizing a builtin operation -(`software_list`, `software_update`, `restart`, `config_snapshot`, `config_update`, `log_upload`). -Indeed, the first step is to start with a workflow specification which mimics the builtin behavior. +`proceed` simply let the command proceeds to the next state. -For instance, here is the builtin workflow for the `software_update` operation: +Adding such a no-op step helps to later customize the workflow +without changing the observable sequence of steps a command has to go through. + +This is notably used by all the builtin operations that proceed from the *init* state to the *scheduled* state: ```toml -operation = "software_update" # an operation for which tedge-agent provides an implementation +[init] +action = "proceed" +on_success = "scheduled" +``` + +Adding some pre-processing step is then simply done by replacing the `proceed` action with something more specific: + +```toml +[init] +script = "/usr/bin/check-if-operation-is-timely.sh ${.}" +on_success = "scheduled" +on_error = { status = "failed", reason = "not timely" } +``` + +### 🚧 Customizing builtin operations + +:::info +🚧 The syntax for customizing builtin workflows is still being finalized so please avoid using it in production environments. +::: + +__tedge-agent__ supports out-of-the-box a set of so-called builtin operations: +`software_list`, `software_update`, `restart`, `config_snapshot`, `config_update`, `log_upload`. + +The workflows of these builtin operations can be customized. + +For each, there are *two* workflows: an internal workflow and a customized version of the former. +- The `builtin:` operation workflow describes the builtin behavior as provided by __tedge-agent__. +- The `` operation workflow rules the actual behavior of __tedge-agent__, + possibly invoking `builtin:` behind the scene. +- When a user or a mapper triggers a command over MQTT for that ``, + by publishing an init message on `te/+/+/+/+/cmd/builtin:/+`, + __tedge_agent__ uses the customized version of the workflow for that operation. + - From MQTT, there is no way to trigger directly the builtin version of an operation, + i.e. there is no `te/+/+/+/+/cmd/builtin:/+` command topic. + - The builtin versions are only internal triggered by the agent from a customized workflow. +- On a fresh install of %%te%%, the `` is simply a copy of `builtin:`. + - This copy is not materialized on the file system, but created in memory by the agent when no customized version is found. +- A customized version for an `` is provided as a TOML workflow definition file in `/etc/tedge/operations` + for that operation, as for any user provided workflow. + - By convention, this file is named after the operation name, as in `/etc/tedge/operations/software_update.toml`. + However, this is not mandatory: the operation is determined by the `operation` property, e.g. `operation = "software_update"`. + - If this definition is valid, then it will be used by the agent as the `` workflow. +- The customized version of an `` can invoke its builtin version + with a step triggering `operation = "builtin:"`. + - A customized workflow can also be a complete rework of the feature, ignoring the builtin behavior. + - However, the builtin behavior can only be invoked from the workflow for the same operation + (e.g. `builtin:sofware_update` can only be invoked from `sofware_update`). + +In order to customize a builtin operation, the first step is to materialize its definition in `/etc/tedge/operations`. +For instance, here is the builtin workflow for the `software_update` operation: + +```toml title="/etc/tedge/operations/software_update.toml" +operation = "software_update" # any builtion operation can be customized ["init"] -action = "proceed" # open to customization +action = "proceed" # open to customization on_success = "scheduled" [scheduled] -action = "builtin" # delegated to the tedge-agent -on_success = "executing" +operation = "builtin:software_update" # trigger the built-in behavior for software update +on_exec = "executing" [executing] -action = "builtin" # delegated to the tedge-agent +action = "await-operation-completion" # awaiting the builtin operation to complete on_success = "successful" [successful] -action = "cleanup" # waiting for the mapper to clean up the command +action = "cleanup" # waiting for the mapper to clean up the command [failed] -action = "cleanup" # waiting for the mapper to clean up the command +action = "cleanup" # waiting for the mapper to clean up the command ``` The action for the `"init"` state is a `"proceed"` action, meaning nothing specific is done by the __tedge-agent__ and that a user can provide its own implementation. -By contrast, the actions marked as `"builtin"` are those delegated to the __tedge-agent__ -and where the main task of the operation is performed, in that case, installing software. +By contrast, for the `scheduled` and `executing` states, the work is delegated to the __tedge-agent__ +and this is where the main task of the operation is performed, in that case, installing software. Here is a customized version of the same operation. -```toml +```toml title="/etc/tedge/operations/software_update.toml" operation = "software_update" # a customized workflow [init] -script = "/usr/bin/schedule-software-update.sh ${.}" # checking is the software update command is timely -on_success = ["scheduled"] +script = "/usr/bin/schedule-software-update.sh ${.}" # one can override any `proceed` action - here with a checking step +on_success = "scheduled" on_error = { status = "failed", reason = "not timely" } [scheduled] -action = "builtin" # the software installation steps are unchanged +operation = "builtin:software_update" # trigger the built-in behavior for software update on_success = "executing" [executing] -action = "builtin" +action = "await-operation-completion" # awaiting the builtin operation to complete +on_success = "commit" # with customized chaining rules +on_error = "rollback" + +[commit] +script = "/usr/bin/commit-software-update.sh ${.}" # one can add extra steps - here a commit step on_success = "successful" +[rollback] +script = "/usr/bin/rollback-software-update.sh ${.}" # one can add extra steps - here a rollback step +on_success = "failed" + [successful] -action = "cleanup" +action = "cleanup" # terminal steps cannot be changed [failed] action = "cleanup" diff --git a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh index b5db88102af..b7c78614fcd 100755 --- a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh +++ b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.sh @@ -29,6 +29,12 @@ Parameters: EOT } +postprocess_sqlite() { + LOG_TYPE="$1" + TMP_LOG_FILE=/tmp/${LOG_TYPE}.log + rm -f "$TMP_LOG_FILE" +} + # # Main # @@ -52,6 +58,9 @@ case "$COMMAND" in ;; esac ;; + postprocess) + postprocess_sqlite "$@" + ;; esac exit 0 diff --git a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml index 5fa1fbb4753..7b4f7dc030d 100644 --- a/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml +++ b/tests/RobotFramework/tests/cumulocity/log/workflow/log_upload.toml @@ -20,6 +20,10 @@ on_error = "failed" [executing] action = "builtin" + on_success = "postprocess" # on_success & on_error can be customized for builtin actions + +[postprocess] + script = "/usr/bin/log_upload.sh ${.payload.status}" on_success = "successful" [successful] diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot index ab0079739f3..40cfe84d785 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot +++ b/tests/RobotFramework/tests/tedge_agent/workflows/custom_operation.robot @@ -26,15 +26,24 @@ Trigger Custom Download Operation Should Be Equal ${actual_log} ${expected_log} Override Built-In Operation - Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '{"status":"init"}' + Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '{"status":"init"}' ${software_list} Should Have MQTT Messages ... te/device/main///cmd/software_list/robot-456 ... message_pattern=.*successful.* ... maximum=1 - Should Contain ${software_list[0]} "currentSoftwareList" - Should Contain ${software_list[0]} "mosquitto" - Should Contain ${software_list[0]} "tedge" - Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '' + Should Contain ${software_list[0]} "currentSoftwareList" + Should Contain ${software_list[0]} "mosquitto" + Should Contain ${software_list[0]} "tedge" + Should Contain ${software_list[0]} "postprocess" + Should Contain ${software_list[0]} "done" + Execute Command tedge mqtt pub --retain te/device/main///cmd/software_list/robot-456 '' + +Override Built-In Operation Executing Step + # Trigger a software update using a custom software-update workflow with rollbacks + Execute Command + ... tedge mqtt pub --retain te/device/main///cmd/software_update/test-builtin-executing-step '{"status":"init","updateList":[{"type":"apt","modules":[{"name":"broken-package","version":"latest","action":"install"}]}]}' + Should Have MQTT Messages te/device/main///cmd/software_update/test-builtin-executing-step message_pattern=.*rollback.* minimum=1 + Execute Command tedge mqtt pub --retain te/device/main///cmd/software_update/test-builtin-executing-step '' Trigger Device Restart Using A Sub-Command [Documentation] To detect if the device has been rebooted, a marker file is created in the /run directory @@ -196,7 +205,7 @@ Custom Setup Copy Configuration Files ThinEdgeIO.Transfer To Device ${CURDIR}/software_list.toml /etc/tedge/operations/ - ThinEdgeIO.Transfer To Device ${CURDIR}/init-software-list.sh /etc/tedge/operations/ + ThinEdgeIO.Transfer To Device ${CURDIR}/software_update.toml /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/custom-download.toml /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/schedule-download.sh /etc/tedge/operations/ ThinEdgeIO.Transfer To Device ${CURDIR}/launch-download.sh /etc/tedge/operations/ diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/init-software-list.sh b/tests/RobotFramework/tests/tedge_agent/workflows/init-software-list.sh deleted file mode 100755 index 6a2c754dcd5..00000000000 --- a/tests/RobotFramework/tests/tedge_agent/workflows/init-software-list.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh -set -e - -echo new software list request topic = "$1" >>/tmp/operations.log - -echo ':::begin-tedge:::' -echo '{ "status":"scheduled" }' -echo ':::end-tedge:::' diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml b/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml index 0b78617ba95..37309240eec 100644 --- a/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml +++ b/tests/RobotFramework/tests/tedge_agent/workflows/software_list.toml @@ -1,15 +1,19 @@ operation = "software_list" # A built in operation can be overridden [init] -script = "/etc/tedge/operations/init-software-list.sh ${.topic}" # The json output of the script is used for the next step +script = "/etc/tedge/operations/echo-as-json.sh status scheduled" on_stdout = ["scheduled"] [scheduled] -action = "builtin" -on_success = "executing" +operation = "builtin:software_list" +on_exec = "executing" [executing] -action = "builtin" +action = "await-operation-completion" +on_success = "postprocess" + +[postprocess] +script = "/etc/tedge/operations/echo-as-json.sh postprocess done" on_success = "successful" [successful] diff --git a/tests/RobotFramework/tests/tedge_agent/workflows/software_update.toml b/tests/RobotFramework/tests/tedge_agent/workflows/software_update.toml new file mode 100644 index 00000000000..bd280f3f2bb --- /dev/null +++ b/tests/RobotFramework/tests/tedge_agent/workflows/software_update.toml @@ -0,0 +1,29 @@ +operation = "software_update" + +[init] +action = "proceed" +on_success = "scheduled" + +[scheduled] +action = "proceed" +on_success = "executing" + +[executing] +operation = "builtin:software_update" # trigger the built-in behavior for software update +on_exec = "await_operation" + +[await_operation] +action = "await-operation-completion" # awaiting the builtin operation to complete +on_success = "successful" +on_error = "rollback" + +[rollback] +script = "/etc/tedge/operations/echo-as-json.sh rollback done" +on_success = "failed" +on_error = "failed" + +[successful] +action = "cleanup" + +[failed] +action = "cleanup"