Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: improve workflow builtin actions #3105

Merged
80 changes: 62 additions & 18 deletions crates/core/tedge_agent/src/operation_workflows/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct WorkflowActor {
pub(crate) state_repository: AgentStateRepository<CommandBoard>,
pub(crate) log_dir: Utf8PathBuf,
pub(crate) input_receiver: UnboundedLoggingReceiver<AgentInput>,
pub(crate) command_dispatcher: CommandDispatcher,
pub(crate) builtin_command_dispatcher: CommandDispatcher,
pub(crate) command_sender: DynSender<InternalCommandState>,
pub(crate) mqtt_publisher: LoggingSender<MqttMessage>,
pub(crate) script_runner: ClientMessageBox<Execute, std::io::Result<Output>>,
Expand All @@ -73,16 +73,15 @@ impl Actor for WorkflowActor {
self.process_mqtt_message(message).await?;
}
AgentInput::InternalCommandState(InternalCommandState(command_state)) => {
self.process_internal_state_update(command_state).await?;
self.process_command_update(command_state).await?;
}
AgentInput::GenericCommandData(GenericCommandData::State(new_state)) => {
self.process_command_state_update(new_state).await?;
self.process_builtin_command_update(new_state).await?;
}
AgentInput::GenericCommandData(GenericCommandData::Metadata(
GenericCommandMetadata { operation, payload },
)) => {
self.publish_operation_capability(operation, payload)
.await?;
self.publish_builtin_capability(operation, payload).await?;
}
}
}
Expand All @@ -101,7 +100,7 @@ impl WorkflowActor {
Ok(())
}

async fn publish_operation_capability(
async fn publish_builtin_capability(
&mut self,
operation: OperationName,
payload: serde_json::Value,
Expand All @@ -117,6 +116,11 @@ impl WorkflowActor {
Ok(())
}

/// Process a command update received from MQTT
///
/// Beware, these updates are coming from external components (the mapper inits and clears commands),
/// but also from *this* actor as all its state transitions are published over MQTT.
/// Only the former will be actually processed with [Self::process_command_update].
async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
let Ok((operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name) else {
log::error!("Unknown command channel: {}", &message.topic.name);
Expand All @@ -136,7 +140,7 @@ impl WorkflowActor {
Ok(Some(new_state)) => {
self.persist_command_board().await?;
if new_state.is_init() {
self.process_internal_state_update(new_state.set_log_path(&log_file.path))
self.process_command_update(new_state.set_log_path(&log_file.path))
.await?;
}
}
Expand All @@ -152,7 +156,13 @@ impl WorkflowActor {
Ok(())
}

async fn process_internal_state_update(
/// Process a command state update taking any action as defined by the workflow
///
/// A new state can be received:
/// - from MQTT as for init and clear messages
/// - from the engine itself when a progress is made
/// - from one of the builtin operation actors
async fn process_command_update(
&mut self,
state: GenericCommandState,
) -> Result<(), RuntimeError> {
Expand Down Expand Up @@ -205,10 +215,25 @@ impl WorkflowActor {
let new_state = state.move_to(next_step);
self.publish_command_state(new_state, &mut log_file).await
}
OperationAction::BuiltIn => {
OperationAction::BuiltIn(_, _) => {
let step = &state.status;
info!("Processing {operation} operation {step} step");
Ok(self.command_dispatcher.send(state).await?)

Ok(self.builtin_command_dispatcher.send(state).await?)
}
OperationAction::BuiltInOperation(ref builtin_op, ref handlers) => {
let step = &state.status;
info!("Executing builtin:{builtin_op} operation {step} step");

// Fork a builtin state
let builtin_state = action.adapt_builtin_request(state.clone());

// Move to the next state to await the builtin operation outcome
let new_state = state.update(handlers.on_exec.clone());
self.publish_command_state(new_state, &mut log_file).await?;

// Forward the command to the builtin operation actor
Ok(self.builtin_command_dispatcher.send(builtin_state).await?)
}
OperationAction::AwaitingAgentRestart(handlers) => {
let step = &state.status;
Expand Down Expand Up @@ -307,7 +332,7 @@ impl WorkflowActor {
}
OperationAction::AwaitOperationCompletion(handlers, output_excerpt) => {
let step = &state.status;
info!("{operation} operation {step} waiting for sub-operation completion");
info!("{operation} operation {step}: waiting for sub-operation completion");

// Get the sub-operation state and resume this command when the sub-operation is in a terminal state
if let Some(sub_state) = self
Expand Down Expand Up @@ -349,8 +374,6 @@ impl WorkflowActor {
))
.await;
}
} else {
log_file.log_info("=> sub-operation not yet launched").await;
Comment on lines -352 to -353
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log entry has been removed, because confusing when the sub-operation is actually a builtin operation.

};

Ok(())
Expand All @@ -376,18 +399,39 @@ impl WorkflowActor {
}
}

async fn process_command_state_update(
/// Pre-process an update received from a builtin operation actor
///
/// The actual work will be done by [Self::process_command_update].
async fn process_builtin_command_update(
&mut self,
new_state: GenericCommandState,
) -> Result<(), RuntimeError> {
if let Err(err) = self.workflows.apply_internal_update(new_state.clone()) {
if new_state.is_finished() {
self.finalize_builtin_command_update(new_state).await
} else {
// As not finalized, the builtin state is sent back
// to the builtin operation actor for further processing.
let builtin_state = new_state.clone();
Ok(self.builtin_command_dispatcher.send(builtin_state).await?)
Comment on lines +412 to +415
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be remove in the future. Indeed, all the builtin-operation actors are currently dealing with 2 steps while things are actually done in a single step (the schedule state for restart and software update; the executing state for log_upload, config_snapshot and config_update).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config actor still does it in two steps, where the tedgeUrl generation happens in the scheduled state and the rest in executing, although I can't remember why we chose to do it that way.

}
}

/// Finalize a builtin operation
///
/// Moving to the next step calling [Self::process_command_update].
async fn finalize_builtin_command_update(
&mut self,
new_state: GenericCommandState,
) -> Result<(), RuntimeError> {
let adapted_state = self.workflows.adapt_builtin_response(new_state);
if let Err(err) = self.workflows.apply_internal_update(adapted_state.clone()) {
error!("Fail to persist workflow operation state: {err}");
}
self.persist_command_board().await?;
self.mqtt_publisher
.send(new_state.clone().into_message())
.send(adapted_state.clone().into_message())
.await?;
self.process_internal_state_update(new_state).await
self.process_command_update(adapted_state).await
}

fn open_command_log(
Expand Down Expand Up @@ -440,7 +484,7 @@ impl WorkflowActor {
match self.state_repository.load().await {
Ok(Some(pending_commands)) => {
for command in self.workflows.load_pending_commands(pending_commands) {
self.process_internal_state_update(command.clone()).await?;
self.process_command_update(command.clone()).await?;
}
}
Ok(None) => {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Builder<WorkflowActor> for WorkflowActorBuilder {
state_repository: repository,
log_dir: self.config.log_dir,
input_receiver: self.input_receiver,
command_dispatcher: self.command_dispatcher,
builtin_command_dispatcher: self.command_dispatcher,
mqtt_publisher: self.mqtt_publisher,
command_sender: self.command_sender,
script_runner: self.script_runner,
Expand Down
12 changes: 6 additions & 6 deletions crates/core/tedge_agent/src/operation_workflows/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ async fn convert_outgoing_software_list_response() -> Result<(), DynError> {
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_list_response = software_list_request
.clone()
.with_status(CommandStatus::Executing);
.with_status(CommandStatus::Successful);
software_box.send(software_list_response.into()).await?;

mqtt_box
.assert_received([MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/software_list/1234"),
r#"{"status":"executing"}"#,
r#"{"status":"successful"}"#,
Comment on lines -207 to +213
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 tests has been updated, watching for a successful status and no more an executing status.

Indeed, publishing the executing state is done by the workflow engine according to the workflow definition and no more by the internal actor.

)
.with_retain()])
.await;
Expand Down Expand Up @@ -286,13 +286,13 @@ async fn convert_outgoing_software_update_response() -> Result<(), DynError> {
// Simulate SoftwareUpdate response message received.
let software_update_request =
SoftwareUpdateCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_update_response = software_update_request.with_status(CommandStatus::Executing);
let software_update_response = software_update_request.with_status(CommandStatus::Successful);
software_box.send(software_update_response.into()).await?;

mqtt_box
.assert_received([MqttMessage::new(
&Topic::new_unchecked("te/device/main///cmd/software_update/1234"),
r#"{"status":"executing"}"#,
r#"{"status":"successful"}"#,
)
.with_retain()])
.await;
Expand Down Expand Up @@ -325,7 +325,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> {
let executing_response = RestartCommand {
target: EntityTopicId::default_main_device(),
cmd_id: "abc".to_string(),
payload: RestartCommandPayload::new(CommandStatus::Executing),
payload: RestartCommandPayload::new(CommandStatus::Successful),
};
restart_box.send(executing_response).await?;

Expand All @@ -335,7 +335,7 @@ async fn convert_outgoing_restart_response() -> Result<(), DynError> {
.map(|msg| (msg.topic, msg.payload))
.expect("MqttMessage");
assert_eq!(topic.name, "te/device/main///cmd/restart/abc");
assert!(format!("{:?}", payload).contains(r#"status":"executing"#));
assert!(format!("{:?}", payload).contains(r#"status":"successful"#));

Ok(())
}
Expand Down
17 changes: 0 additions & 17 deletions crates/core/tedge_api/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,23 +731,6 @@ pub enum CommandStatus {
Unknown,
}

impl CommandStatus {
pub fn is_terminal_status(&self) -> bool {
matches!(
self,
CommandStatus::Successful | CommandStatus::Failed { reason: _ }
)
}

pub fn is_successful(&self) -> bool {
*self == CommandStatus::Successful
}

pub fn is_failed(&self) -> bool {
matches!(self, CommandStatus::Failed { reason: _ })
}
}

fn default_failure_reason() -> String {
"Unknown reason".to_string()
}
Expand Down
6 changes: 6 additions & 0 deletions crates/core/tedge_api/src/workflow/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub enum WorkflowDefinitionError {

#[error("The provided target {0} is not a valid path expression")]
InvalidPathExpression(String),

#[error("The `builtin:{builtin_operation}` cannot be invoked from `{main_operation}`, but only from `{builtin_operation}`")]
InvalidBuiltinOperation {
main_operation: String,
builtin_operation: String,
},
}

/// Error related to a script definition
Expand Down
Loading