diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 31e69b243df..5810bb76b8e 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -83,6 +83,15 @@ impl MqttSchema { MqttSchema { root } } + /// Build the schema to be used to decode a topic + pub fn from_topic(topic: impl AsRef) -> Self { + let (root, _) = topic + .as_ref() + .split_once('/') + .unwrap_or((topic.as_ref(), "")); + Self::with_root(root.to_string()) + } + /// Get the topic addressing a given entity channel /// ``` /// # use tedge_api::mqtt_topics::{MqttSchema, Channel, EntityTopicId}; @@ -188,9 +197,37 @@ impl MqttSchema { pub fn error_topic(&self) -> Topic { Topic::new_unchecked(&format!("{0}/errors", self.root)) } -} -impl MqttSchema { + /// Extract the entity identifier from a topic + /// + /// Note this function is not related to a specific topic root prefix + pub fn get_entity_id(topic: impl AsRef) -> Option { + match topic.as_ref().split('/').collect::>()[..] { + [_, t1, t2, t3, t4, ..] => Some(format!("{t1}/{t2}/{t3}/{t4}")), + _ => None, + } + } + + /// Extract the operation name from a command topic + /// + /// Note this function is not related to a specific topic root prefix + pub fn get_operation_name(topic: impl AsRef) -> Option { + match topic.as_ref().split('/').collect::>()[..] { + [_, _, _, _, _, "cmd", op, ..] => Some(op.to_string()), + _ => None, + } + } + + /// Extract the command instance identifier from a command topic + /// + /// Note this function is not related to a specific topic root prefix + pub fn get_command_id(topic: impl AsRef) -> Option { + match topic.as_ref().split('/').collect::>()[..] { + [_, _, _, _, _, "cmd", _, id] => Some(id.to_string()), + _ => None, + } + } + fn parse(&self, topic: &str) -> Result<(EntityTopicId, Channel), EntityTopicError> { let (root, topic) = topic.split_once('/').ok_or(EntityTopicError::Root { expected: self.root.to_string(), diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index 3c661afd311..f52bdb2155a 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -271,9 +271,17 @@ impl GenericCommandState { /// Infer the topic of the invoking command, given a sub command topic fn infer_invoking_command_topic(sub_command_topic: &str) -> Option { - match sub_command_topic.split('/').collect::>()[..] { - [pre, t1, t2, t3, t4, "cmd", _, sub_id] => Self::extract_invoking_command_id(sub_id) - .map(|(op, id)| format!("{pre}/{t1}/{t2}/{t3}/{t4}/cmd/{op}/{id}")), + let schema = MqttSchema::from_topic(sub_command_topic); + match schema.entity_channel_of(sub_command_topic) { + Ok((entity, Channel::Command { cmd_id, .. })) => { + Self::extract_invoking_command_id(&cmd_id).map(|(op, id)| { + let channel = Channel::Command { + operation: op.into(), + cmd_id: id.into(), + }; + schema.topic_for(&entity, &channel).as_ref().to_string() + }) + } _ => None, } } @@ -297,18 +305,15 @@ impl GenericCommandState { } fn target(&self) -> Option { - match self.topic.name.split('/').collect::>()[..] { - [_, t1, t2, t3, t4, "cmd", _, _] => Some(format!("{t1}/{t2}/{t3}/{t4}")), - _ => None, - } + MqttSchema::get_entity_id(&self.topic) } pub fn operation(&self) -> Option { - extract_command_identifier(&self.topic.name).map(|(operation, _)| operation) + MqttSchema::get_operation_name(&self.topic) } pub fn cmd_id(&self) -> Option { - extract_command_identifier(&self.topic.name).map(|(_, cmd_id)| cmd_id) + MqttSchema::get_command_id(&self.topic) } pub fn is_init(&self) -> bool { @@ -328,15 +333,6 @@ impl GenericCommandState { } } -fn extract_command_identifier(topic: &str) -> Option<(String, String)> { - match topic.split('/').collect::>()[..] { - [_, _, _, _, _, "cmd", operation, cmd_id] => { - Some((operation.to_string(), cmd_id.to_string())) - } - _ => None, - } -} - impl GenericStateUpdate { pub fn empty_payload() -> Value { json!({})