Skip to content

Commit

Permalink
Move topic helper functions under MqttSchema
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Apr 23, 2024
1 parent 9230264 commit f261151
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 20 deletions.
41 changes: 39 additions & 2 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ impl MqttSchema {
MqttSchema { root }
}

/// Build the schema to be used to decode a topic
pub fn from_topic(topic: impl AsRef<str>) -> Self {
let (root, _) = topic
.as_ref()
.split_once('/')
.unwrap_or((topic.as_ref(), ""));
Self::with_root(root.to_string())
}

/// Get the topic addressing a given entity channel
/// ```
/// # use tedge_api::mqtt_topics::{MqttSchema, Channel, EntityTopicId};
Expand Down Expand Up @@ -188,9 +197,37 @@ impl MqttSchema {
pub fn error_topic(&self) -> Topic {
Topic::new_unchecked(&format!("{0}/errors", self.root))
}
}

impl MqttSchema {
/// Extract the entity identifier from a topic
///
/// Note this function is not related to a specific topic root prefix
pub fn get_entity_id(topic: impl AsRef<str>) -> Option<String> {
match topic.as_ref().split('/').collect::<Vec<&str>>()[..] {
[_, t1, t2, t3, t4, ..] => Some(format!("{t1}/{t2}/{t3}/{t4}")),
_ => None,
}
}

/// Extract the operation name from a command topic
///
/// Note this function is not related to a specific topic root prefix
pub fn get_operation_name(topic: impl AsRef<str>) -> Option<String> {
match topic.as_ref().split('/').collect::<Vec<&str>>()[..] {
[_, _, _, _, _, "cmd", op, ..] => Some(op.to_string()),
_ => None,
}
}

/// Extract the command instance identifier from a command topic
///
/// Note this function is not related to a specific topic root prefix
pub fn get_command_id(topic: impl AsRef<str>) -> Option<String> {
match topic.as_ref().split('/').collect::<Vec<&str>>()[..] {
[_, _, _, _, _, "cmd", _, id] => Some(id.to_string()),
_ => None,
}
}

fn parse(&self, topic: &str) -> Result<(EntityTopicId, Channel), EntityTopicError> {
let (root, topic) = topic.split_once('/').ok_or(EntityTopicError::Root {
expected: self.root.to_string(),
Expand Down
32 changes: 14 additions & 18 deletions crates/core/tedge_api/src/workflow/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,17 @@ impl GenericCommandState {

/// Infer the topic of the invoking command, given a sub command topic
fn infer_invoking_command_topic(sub_command_topic: &str) -> Option<String> {
match sub_command_topic.split('/').collect::<Vec<&str>>()[..] {
[pre, t1, t2, t3, t4, "cmd", _, sub_id] => Self::extract_invoking_command_id(sub_id)
.map(|(op, id)| format!("{pre}/{t1}/{t2}/{t3}/{t4}/cmd/{op}/{id}")),
let schema = MqttSchema::from_topic(sub_command_topic);
match schema.entity_channel_of(sub_command_topic) {
Ok((entity, Channel::Command { cmd_id, .. })) => {
Self::extract_invoking_command_id(&cmd_id).map(|(op, id)| {
let channel = Channel::Command {
operation: op.into(),
cmd_id: id.into(),
};
schema.topic_for(&entity, &channel).as_ref().to_string()
})
}
_ => None,
}
}
Expand All @@ -297,18 +305,15 @@ impl GenericCommandState {
}

fn target(&self) -> Option<String> {
match self.topic.name.split('/').collect::<Vec<&str>>()[..] {
[_, t1, t2, t3, t4, "cmd", _, _] => Some(format!("{t1}/{t2}/{t3}/{t4}")),
_ => None,
}
MqttSchema::get_entity_id(&self.topic)
}

pub fn operation(&self) -> Option<String> {
extract_command_identifier(&self.topic.name).map(|(operation, _)| operation)
MqttSchema::get_operation_name(&self.topic)
}

pub fn cmd_id(&self) -> Option<String> {
extract_command_identifier(&self.topic.name).map(|(_, cmd_id)| cmd_id)
MqttSchema::get_command_id(&self.topic)
}

pub fn is_init(&self) -> bool {
Expand All @@ -328,15 +333,6 @@ impl GenericCommandState {
}
}

fn extract_command_identifier(topic: &str) -> Option<(String, String)> {
match topic.split('/').collect::<Vec<&str>>()[..] {
[_, _, _, _, _, "cmd", operation, cmd_id] => {
Some((operation.to_string(), cmd_id.to_string()))
}
_ => None,
}
}

impl GenericStateUpdate {
pub fn empty_payload() -> Value {
json!({})
Expand Down

0 comments on commit f261151

Please sign in to comment.