Skip to content

Commit

Permalink
Store invoking command topic in sub command state
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Apr 23, 2024
1 parent 82c148a commit 9230264
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 149 deletions.
112 changes: 65 additions & 47 deletions crates/core/tedge_agent/src/tedge_operation_converter/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ use tedge_api::commands::SoftwareCommandMetadata;
use tedge_api::commands::SoftwareListCommand;
use tedge_api::commands::SoftwareUpdateCommand;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicError;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::extract_command_identifier;
use tedge_api::workflow::extract_json_output;
use tedge_api::workflow::CommandBoard;
use tedge_api::workflow::CommandId;
use tedge_api::workflow::GenericCommandState;
use tedge_api::workflow::GenericStateUpdate;
use tedge_api::workflow::OperationAction;
use tedge_api::workflow::OperationName;
use tedge_api::workflow::TopicName;
use tedge_api::workflow::WorkflowExecutionError;
use tedge_api::workflow::WorkflowSupervisor;
use tedge_api::Jsonify;
Expand Down Expand Up @@ -125,38 +124,35 @@ impl TedgeOperationConverterActor {
}

async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
let (operation, cmd_id) = match self.mqtt_schema.entity_channel_of(&message.topic) {
Ok((_, Channel::Command { operation, cmd_id })) => (operation, cmd_id),
let Ok((_, operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name)
else {
log::error!("Unknown command channel: {}", &message.topic.name);
return Ok(());
};

_ => {
log::error!("Unknown command channel: {}", message.topic.name);
return Ok(());
}
let Ok(state) = GenericCommandState::from_command_message(&message) else {
log::error!("Invalid command payload: {}", &message.topic.name);
return Ok(());
};
let step = state.status.clone();

let mut log_file = self.open_command_log(&message.topic.name, &operation, &cmd_id);
let mut log_file =
self.open_command_log(state.invoking_command_topic(), &operation, &cmd_id);

match self.workflows.apply_external_update(&operation, &message) {
Ok(None) => {
if message.payload_bytes().is_empty() {
log_file
.log_step("", "The command has been fully processed")
.await;
self.persist_command_board().await?;
}
}
Ok(Some(state)) => {
match self.workflows.apply_external_update(&operation, state) {
Ok(None) => (),
Ok(Some(new_state)) => {
self.persist_command_board().await?;
self.process_command_state_update(state).await?;
if new_state.is_init() {
self.process_command_state_update(new_state).await?;
}
}
Err(WorkflowExecutionError::UnknownOperation { operation }) => {
info!("Ignoring {operation} operation which is not registered");
}
Err(err) => {
error!("{operation} operation request cannot be processed: {err}");
log_file
.log_step("Unknown", &format!("Error: {err}\n"))
.await;
log_file.log_step(&step, &format!("Error: {err}\n")).await;
}
}

Expand All @@ -167,15 +163,13 @@ impl TedgeOperationConverterActor {
&mut self,
state: GenericCommandState,
) -> Result<(), RuntimeError> {
let (target, operation, cmd_id) = match self.mqtt_schema.entity_channel_of(&state.topic) {
Ok((target, Channel::Command { operation, cmd_id })) => (target, operation, cmd_id),

_ => {
log::error!("Unknown command channel: {}", state.topic.name);
return Ok(());
}
let Ok((target, operation, cmd_id)) = self.extract_command_identifiers(&state.topic.name)
else {
log::error!("Unknown command channel: {}", state.topic.name);
return Ok(());
};
let mut log_file = self.open_command_log(&state.topic.name, &operation, &cmd_id);
let mut log_file =
self.open_command_log(state.invoking_command_topic(), &operation, &cmd_id);

let action = match self.workflows.get_action(&state) {
Ok(action) => action,
Expand All @@ -187,7 +181,7 @@ impl TedgeOperationConverterActor {
Err(err) => {
error!("{operation} operation request cannot be processed: {err}");
log_file
.log_step("Unknown", &format!("Error: {err}\n"))
.log_step(&state.status, &format!("Error: {err}\n"))
.await;
return Ok(());
}
Expand All @@ -198,12 +192,17 @@ impl TedgeOperationConverterActor {
match action {
OperationAction::Clear => {
if let Some(invoking_command) = self.workflows.invoking_command_state(&state) {
info!(
"Resuming invoking command {}",
invoking_command.topic.as_ref()
);
self.command_sender.send(invoking_command.clone()).await?;
} else {
info!(
"Waiting {} {operation} operation to be cleared",
state.status
);
}
info!(
"Waiting {} {operation} operation to be cleared",
state.status
);
Ok(())
}
OperationAction::MoveTo(next_step) => {
Expand Down Expand Up @@ -350,13 +349,13 @@ impl TedgeOperationConverterActor {
.update_with_json(sub_cmd_output)
.update(handlers.on_success);
self.publish_command_state(new_state).await?;
self.mqtt_publisher.send(sub_state.clear_message()).await?;
self.publish_command_state(sub_state.terminate()).await?;
} else if sub_state.is_failed() {
let new_state = state.update(handlers.on_error.unwrap_or_else(|| {
GenericStateUpdate::failed("sub-operation failed".to_string())
}));
self.publish_command_state(new_state).await?;
self.mqtt_publisher.send(sub_state.clear_message()).await?;
self.publish_command_state(sub_state.terminate()).await?;
} else {
// Nothing specific has to be done: the current state has been persisted
// and will be resumed on completion of the sub-operation
Expand All @@ -371,18 +370,15 @@ impl TedgeOperationConverterActor {

fn open_command_log(
&mut self,
command_topic: &TopicName,
root_command: Option<&str>,
operation: &OperationType,
cmd_id: &str,
) -> CommandLog {
let (root_operation, root_cmd_id) = match self
.workflows
.command_invocation_chain(command_topic)
.pop()
.and_then(|topic| extract_command_identifier(&topic))
let (root_operation, root_cmd_id) = match root_command
.and_then(|root_topic| self.extract_command_identifiers(root_topic).ok())
{
None => (None, None),
Some((op, id)) => (Some(op), Some(id)),
Some((_, op, id)) => (Some(op.to_string()), Some(id)),
};

CommandLog::new(
Expand Down Expand Up @@ -470,7 +466,9 @@ impl TedgeOperationConverterActor {
error!("Fail to persist workflow operation state: {err}");
}
self.persist_command_board().await?;
self.command_sender.send(new_state.clone()).await?;
if !new_state.is_term() {
self.command_sender.send(new_state.clone()).await?;
}
self.mqtt_publisher.send(new_state.into_message()).await?;
Ok(())
}
Expand Down Expand Up @@ -510,6 +508,26 @@ impl TedgeOperationConverterActor {

Ok(())
}

fn extract_command_identifiers(
&self,
topic: impl AsRef<str>,
) -> Result<(EntityTopicId, OperationType, CommandId), CommandTopicError> {
let (target, channel) = self.mqtt_schema.entity_channel_of(topic)?;
match channel {
Channel::Command { operation, cmd_id } => Ok((target, operation, cmd_id)),
_ => Err(CommandTopicError::InvalidCommandTopic),
}
}
}

#[derive(Debug, thiserror::Error)]
enum CommandTopicError {
#[error(transparent)]
InvalidTopic(#[from] EntityTopicError),

#[error("Not a command topic")]
InvalidCommandTopic,
}

/// Log all command steps
Expand Down Expand Up @@ -582,7 +600,7 @@ Action: {action}
let cmd_id = &self.cmd_id;
let message = format!(
r#"------------------------------------
{operation}/{cmd_id}/{step}: {now}
{operation}/{cmd_id} status={step} time={now}
{action}
"#
Expand Down
6 changes: 1 addition & 5 deletions crates/core/tedge_api/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,7 @@ where
let topic = self.topic(schema);
let status = self.status().to_string();
let payload = serde_json::to_value(self.payload).unwrap(); // any command payload can be converted into JSON
GenericCommandState {
topic,
status,
payload,
}
GenericCommandState::new(topic, status, payload)
}

/// Return the MQTT message for this command
Expand Down
6 changes: 1 addition & 5 deletions crates/core/tedge_api/src/workflow/on_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ impl TryFrom<OnDiskCommandBoardV1> for CommandBoard {
value: command.unix_timestamp,
}
})?;
let state = GenericCommandState {
topic,
status: command.status,
payload: command.payload,
};
let state = GenericCommandState::new(topic, command.status, command.payload);
commands.insert(topic_name, (timestamp, state));
}
Ok(CommandBoard::new(commands))
Expand Down
Loading

0 comments on commit 9230264

Please sign in to comment.