Skip to content

Commit

Permalink
Store invoking command topic in sub command state
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Apr 11, 2024
1 parent c785e87 commit da71c7e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 100 deletions.
95 changes: 56 additions & 39 deletions crates/core/tedge_agent/src/tedge_operation_converter/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ use tedge_api::messages::SoftwareCommandMetadata;
use tedge_api::messages::SoftwareListCommand;
use tedge_api::messages::SoftwareUpdateCommand;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicError;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::OperationType;
use tedge_api::workflow::extract_command_identifier;
use tedge_api::workflow::extract_json_output;
use tedge_api::workflow::CommandBoard;
use tedge_api::workflow::CommandId;
use tedge_api::workflow::GenericCommandState;
use tedge_api::workflow::GenericStateUpdate;
use tedge_api::workflow::OperationAction;
use tedge_api::workflow::OperationName;
use tedge_api::workflow::TopicName;
use tedge_api::workflow::WorkflowExecutionError;
use tedge_api::workflow::WorkflowSupervisor;
use tedge_api::Jsonify;
Expand Down Expand Up @@ -125,29 +124,27 @@ impl TedgeOperationConverterActor {
}

async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
let (operation, cmd_id) = match self.mqtt_schema.entity_channel_of(&message.topic) {
Ok((_, Channel::Command { operation, cmd_id })) => (operation, cmd_id),
let Ok((_, operation, cmd_id)) = self.extract_command_identifiers(&message.topic.name)
else {
log::error!("Unknown command channel: {}", &message.topic.name);
return Ok(());
};

_ => {
log::error!("Unknown command channel: {}", message.topic.name);
return Ok(());
}
let Ok(state) = GenericCommandState::from_command_message(&message) else {
log::error!("Invalid command payload: {}", &message.topic.name);
return Ok(());
};

let mut log_file = self.open_command_log(&message.topic.name, &operation, &cmd_id);
let mut log_file =
self.open_command_log(state.invoking_command_topic(), &operation, &cmd_id);

match self.workflows.apply_external_update(&operation, &message) {
Ok(None) => {
if message.payload_bytes().is_empty() {
log_file
.log_step("", "The command has been fully processed")
.await;
self.persist_command_board().await?;
}
}
Ok(Some(state)) => {
match self.workflows.apply_external_update(&operation, state) {
Ok(None) => (),
Ok(Some(new_state)) => {
self.persist_command_board().await?;
self.process_command_state_update(state).await?;
if !new_state.is_term() {
self.process_command_state_update(new_state).await?;
}
}
Err(WorkflowExecutionError::UnknownOperation { operation }) => {
info!("Ignoring {operation} operation which is not registered");
Expand All @@ -167,15 +164,13 @@ impl TedgeOperationConverterActor {
&mut self,
state: GenericCommandState,
) -> Result<(), RuntimeError> {
let (target, operation, cmd_id) = match self.mqtt_schema.entity_channel_of(&state.topic) {
Ok((target, Channel::Command { operation, cmd_id })) => (target, operation, cmd_id),

_ => {
log::error!("Unknown command channel: {}", state.topic.name);
return Ok(());
}
let Ok((target, operation, cmd_id)) = self.extract_command_identifiers(&state.topic.name)
else {
log::error!("Unknown command channel: {}", state.topic.name);
return Ok(());
};
let mut log_file = self.open_command_log(&state.topic.name, &operation, &cmd_id);
let mut log_file =
self.open_command_log(state.invoking_command_topic(), &operation, &cmd_id);

let action = match self.workflows.get_action(&state) {
Ok(action) => action,
Expand All @@ -198,12 +193,17 @@ impl TedgeOperationConverterActor {
match action {
OperationAction::Clear => {
if let Some(invoking_command) = self.workflows.invoking_command_state(&state) {
info!(
"Resuming invoking command {}",
invoking_command.topic.as_ref()
);
self.command_sender.send(invoking_command.clone()).await?;
} else {
info!(
"Waiting {} {operation} operation to be cleared",
state.status
);
}
info!(
"Waiting {} {operation} operation to be cleared",
state.status
);
Ok(())
}
OperationAction::MoveTo(next_step) => {
Expand Down Expand Up @@ -371,18 +371,15 @@ impl TedgeOperationConverterActor {

fn open_command_log(
&mut self,
command_topic: &TopicName,
root_command: Option<&str>,
operation: &OperationType,
cmd_id: &str,
) -> CommandLog {
let (root_operation, root_cmd_id) = match self
.workflows
.command_invocation_chain(command_topic)
.pop()
.and_then(|topic| extract_command_identifier(&topic))
let (root_operation, root_cmd_id) = match root_command
.and_then(|root_topic| self.extract_command_identifiers(root_topic).ok())
{
None => (None, None),
Some((op, id)) => (Some(op), Some(id)),
Some((_, op, id)) => (Some(op.to_string()), Some(id)),
};

CommandLog::new(
Expand Down Expand Up @@ -510,6 +507,26 @@ impl TedgeOperationConverterActor {

Ok(())
}

fn extract_command_identifiers(
&self,
topic: impl AsRef<str>,
) -> Result<(EntityTopicId, OperationType, CommandId), CommandTopicError> {
let (target, channel) = self.mqtt_schema.entity_channel_of(topic)?;
match channel {
Channel::Command { operation, cmd_id } => Ok((target, operation, cmd_id)),
_ => Err(CommandTopicError::InvalidCommandTopic),
}
}
}

#[derive(Debug, thiserror::Error)]
enum CommandTopicError {
#[error(transparent)]
InvalidTopic(#[from] EntityTopicError),

#[error("Not a command topic")]
InvalidCommandTopic,
}

/// Log all command steps
Expand Down
43 changes: 11 additions & 32 deletions crates/core/tedge_api/src/workflow/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::workflow::CommandId;
use crate::workflow::ExitHandlers;
use crate::workflow::OperationName;
use crate::workflow::StateExcerptError;
use crate::workflow::TopicName;
use crate::workflow::WorkflowExecutionError;
use mqtt_channel::MqttMessage;
use mqtt_channel::QoS::AtLeastOnce;
Expand Down Expand Up @@ -37,6 +36,7 @@ const INIT: &str = "init";
const SUCCESSFUL: &str = "successful";
const FAILED: &str = "failed";
const REASON: &str = "reason";
const INVOKING_COMMAND: &str = "__invoking_command__";

impl GenericCommandState {
/// Create an init state for a sub-operation
Expand All @@ -47,7 +47,8 @@ impl GenericCommandState {
cmd_id: CommandId,
sub_operation: OperationName,
) -> GenericCommandState {
let sub_cmd_id = sub_command_id(&operation.to_string(), &cmd_id);
let sub_cmd_id = format!("sub-{cmd_id}");
let invoking_topic = schema.topic_for(entity, &Channel::Command { operation, cmd_id });
let topic = schema.topic_for(
entity,
&Channel::Command {
Expand All @@ -57,7 +58,8 @@ impl GenericCommandState {
);
let status = INIT.to_string();
let payload = json!({
STATUS: status
STATUS: status,
INVOKING_COMMAND: invoking_topic.name
});

GenericCommandState {
Expand Down Expand Up @@ -240,6 +242,11 @@ impl GenericCommandState {
&self.topic.name
}

/// Return the topic of the invoking command, if any
pub fn invoking_command_topic(&self) -> Option<&str> {
GenericCommandState::extract_text_property(&self.payload, INVOKING_COMMAND)
}

fn target(&self) -> Option<String> {
match self.topic.name.split('/').collect::<Vec<&str>>()[..] {
[_, t1, t2, t3, t4, "cmd", _, _] => Some(format!("{t1}/{t2}/{t3}/{t4}")),
Expand Down Expand Up @@ -272,35 +279,7 @@ impl GenericCommandState {
}
}

/// Return the invoking command topic name, if any
pub fn invoking_command(sub_command: &TopicName) -> Option<TopicName> {
match sub_command.split('/').collect::<Vec<&str>>()[..] {
[pre, t1, t2, t3, t4, "cmd", _, sub_id] => extract_invoking_command_id(sub_id)
.map(|(op, id)| format!("{pre}/{t1}/{t2}/{t3}/{t4}/cmd/{op}/{id}")),
_ => None,
}
}

/// Build a sub command identifier from its invoking command identifier
///
/// Using such a structure command id for sub commands is key
/// to retrieve the invoking command of a sub-operation from its state using [extract_invoking_command_id].
fn sub_command_id(operation: &str, cmd_id: &str) -> String {
format!("sub:{operation}:{cmd_id}")
}

/// Extract the invoking command identifier from a sub command identifier
///
/// Return None if the given id is not a sub command identifier
/// i.e. if not generated with [sub_command_id].
fn extract_invoking_command_id(sub_cmd_id: &str) -> Option<(&str, &str)> {
match sub_cmd_id.split(':').collect::<Vec<&str>>()[..] {
["sub", operation, cmd_id] => Some((operation, cmd_id)),
_ => None,
}
}

pub fn extract_command_identifier(topic: &str) -> Option<(String, String)> {
fn extract_command_identifier(topic: &str) -> Option<(String, String)> {
match topic.split('/').collect::<Vec<&str>>()[..] {
[_, _, _, _, _, "cmd", operation, cmd_id] => {
Some((operation.to_string(), cmd_id.to_string()))
Expand Down
48 changes: 22 additions & 26 deletions crates/core/tedge_api/src/workflow/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,17 @@ impl WorkflowSupervisor {
pub fn apply_external_update(
&mut self,
operation: &OperationType,
message: &MqttMessage,
command_state: GenericCommandState,
) -> Result<Option<GenericCommandState>, WorkflowExecutionError> {
if !self.workflows.contains_key(operation) {
return Err(WorkflowExecutionError::UnknownOperation {
operation: operation.to_string(),
});
};
let command_state = GenericCommandState::from_command_message(message)?;
if command_state.is_term() {
// The command has been cleared
self.commands.remove(&command_state.topic.name);
Ok(None)
Ok(Some(command_state))
} else if command_state.is_init() {
// This is a new command request
self.commands.insert(command_state.clone())?;
Expand Down Expand Up @@ -133,8 +132,9 @@ impl WorkflowSupervisor {
&self,
sub_command: &GenericCommandState,
) -> Option<&GenericCommandState> {
invoking_command(&sub_command.topic.name)
.and_then(|invoking_topic| self.get_state(&invoking_topic))
sub_command
.invoking_command_topic()
.and_then(|invoking_topic| self.get_state(invoking_topic))
}

/// Return the sub command of a command, if any
Expand All @@ -144,12 +144,20 @@ impl WorkflowSupervisor {
) -> Option<&GenericCommandState> {
self.commands
.lookup_sub_command(command_state.command_topic())
.and_then(|sub_topic| self.get_state(sub_topic))
}

/// Return the chain of sub-operation invocation leading to the given leaf command
pub fn command_invocation_chain(&self, leaf_command: &TopicName) -> Vec<TopicName> {
self.commands.command_invocation_chain(leaf_command)
/// Return the state of the root command which execution leads to the execution of a leaf-command
///
/// Return None, if the given command is not a sub-command
pub fn root_invoking_command_state(
&self,
leaf_command: &GenericCommandState,
) -> Option<&GenericCommandState> {
let invoking_command = self.invoking_command_state(leaf_command)?;
let root_command = self
.root_invoking_command_state(invoking_command)
.unwrap_or(invoking_command);
Some(root_command)
}

/// Update the state of the command board on reception of new state for a command
Expand Down Expand Up @@ -211,24 +219,12 @@ impl CommandBoard {
}

/// Return the sub command of a command, if any
pub fn lookup_sub_command(&self, command_topic: &TopicName) -> Option<&TopicName> {
// The sequential search is okay because in practice there is no more than 10 concurrent commands
pub fn lookup_sub_command(&self, command_topic: &TopicName) -> Option<&GenericCommandState> {
// Sequential search is okay because in practice there is no more than 10 concurrent commands
self.commands
.keys()
.find(|sub_command| invoking_command(sub_command).as_ref() == Some(command_topic))
}

/// Return the chain of command / sub-operation invocation leading to the given leaf command
pub fn command_invocation_chain(&self, leaf_command: &TopicName) -> Vec<TopicName> {
let mut invoking_commands = vec![];
let mut command = leaf_command.clone();
while let Some(super_command) = invoking_command(&command) {
if self.commands.contains_key(&super_command) {
invoking_commands.push(super_command.clone());
}
command = super_command;
}
invoking_commands
.values()
.find(|(_, command)| command.invoking_command_topic() == Some(command_topic))
.map(|(_, command)| command)
}

/// Iterate over the pending commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ Invoke sub-operation from a super-command operation
Should Contain ${workflow_log} super_command/test-42/init:
Should Contain ${workflow_log} super_command/test-42/executing:
Should Contain ${workflow_log} super_command/test-42/awaiting_completion:
Should Contain ${workflow_log} sub_command/sub:super_command:test-42/init: msg=main command log should contain sub command steps
Should Contain ${workflow_log} sub_command/sub:super_command:test-42/executing: msg=main command log should contain sub command steps
Should Contain ${workflow_log} sub_command/sub:super_command:test-42/successful: msg=main command log should contain sub command steps
Should Contain ${workflow_log} sub_command/sub-test-42/init: msg=main command log should contain sub command steps
Should Contain ${workflow_log} sub_command/sub-test-42/executing: msg=main command log should contain sub command steps
Should Contain ${workflow_log} sub_command/sub-test-42/successful: msg=main command log should contain sub command steps
Should Contain ${workflow_log} super_command/test-42/successful:

Use scripts to create sub-operation init states
Expand Down

0 comments on commit da71c7e

Please sign in to comment.