From 21be0686bea733903998532c8c2991214a14402f Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Wed, 16 Aug 2023 15:58:41 +0000 Subject: [PATCH] Add entity store Signed-off-by: Marcel Guzik --- crates/core/tedge_api/src/entity.rs | 284 +++++++++ crates/core/tedge_api/src/entity_store.rs | 565 ++++++++++++++++++ crates/core/tedge_api/src/lib.rs | 3 + crates/extensions/c8y_mapper_ext/src/actor.rs | 117 ++-- crates/extensions/c8y_mapper_ext/src/tests.rs | 49 +- 5 files changed, 937 insertions(+), 81 deletions(-) create mode 100644 crates/core/tedge_api/src/entity.rs create mode 100644 crates/core/tedge_api/src/entity_store.rs diff --git a/crates/core/tedge_api/src/entity.rs b/crates/core/tedge_api/src/entity.rs new file mode 100644 index 00000000000..0c756a4a174 --- /dev/null +++ b/crates/core/tedge_api/src/entity.rs @@ -0,0 +1,284 @@ +//! A module defining entities, their types, and utilities for parsing MQTT +//! topics following the default thin-edge MQTT scheme. + +use std::str::FromStr; + +// TODO: read from config +const MQTT_ROOT: &str = "te"; + +/// A thin-edge entity MQTT topic. +/// +/// An entity topic consists of 3 groups: root, entity identifier, and +/// optionally a channel. To be a valid entity topic, a topic must start with a +/// root, and then have its entity identifier and channel (if present) groups +/// successfully parsed. +/// +/// ``` +/// # use tedge_api::entity::{EntityTopic, Channel, ChannelCategory}; +/// let entity_topic: EntityTopic = +/// format!("te/device/child001/service/service001/m/measurement_type") +/// .parse() +/// .unwrap(); +/// assert_eq!(entity_topic.entity_id(), "device/child001/service/service001"); +/// assert_eq!(entity_topic.channel(), Some(&Channel { +/// category: ChannelCategory::Measurement, +/// r#type: "measurement_type".to_string(), +/// suffix: "".to_string() +/// })); +/// ``` +/// +/// https://thin-edge.github.io/thin-edge.io/next/references/mqtt-api/#topic-scheme +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EntityTopic { + entity_id: EntityId, + channel: Option, +} + +impl EntityTopic { + pub fn entity_id(&self) -> &str { + self.entity_id.0.as_str() + } + + pub fn channel(&self) -> Option<&Channel> { + self.channel.as_ref() + } + + /// Returns a device name if entity topic identifier is not using a custom + /// schema. + pub fn device_name(&self) -> Option<&str> { + match self.entity_id.0.split('/').collect::>()[..] { + ["device", device_id, "service", _] => Some(device_id), + ["device", device_id, "", ""] => Some(device_id), + _ => None, + } + } + + /// Returns a service name if entity topic identifier is not using a custom + /// schema and the entity identifier refers to the service. + pub fn service_name(&self) -> Option<&str> { + match self.entity_id.0.split('/').collect::>()[..] { + ["device", _, "service", service_id] => Some(service_id), + _ => None, + } + } +} + +impl FromStr for EntityTopic { + type Err = EntityTopicError; + + fn from_str(topic: &str) -> Result { + const ENTITY_ID_SEGMENTS: usize = 4; + + let (root, topic) = topic.split_once('/').ok_or(EntityTopicError::Root { + expected: MQTT_ROOT.to_string(), + got: topic.to_string(), + })?; + + if root != MQTT_ROOT { + return Err(EntityTopicError::Root { + expected: MQTT_ROOT.to_string(), + got: root.to_string(), + }); + } + + let mut topic_separator_indices = topic.match_indices('/').map(|(i, _)| i); + let id_channel_separator_index = topic_separator_indices.nth(3).unwrap_or(topic.len()); + + let (entity_id, channel) = topic.split_at(id_channel_separator_index); + + let entity_id_segments = entity_id.matches('/').count(); + let missing_slashes = ENTITY_ID_SEGMENTS - entity_id_segments - 1; + let entity_id = format!("{entity_id}{:/<1$}", "", missing_slashes); + + let channel = channel.trim_start_matches('/'); + let channel = if !channel.is_empty() { + Some(Channel::new(channel)?) + } else { + None + }; + + Ok(EntityTopic { + entity_id: EntityId(entity_id.to_string()), + channel, + }) + } +} + +impl TryFrom for EntityTopic { + type Error = EntityTopicError; + + fn try_from(topic: mqtt_channel::Topic) -> Result { + topic.name.parse() + } +} + +#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] +pub enum EntityTopicError { + #[error("Fist topic segment expected to be {expected:?}, got {got:?}")] + Root { expected: String, got: String }, + + #[error("Channel group invalid")] + Channel(#[from] ChannelError), +} + +/// Represents an entity identifier group in thin-edge MQTT scheme. +/// +/// An entity identifier is a fixed 4-segment group, as such any 4 topic +/// segments that come after the root are considered a part of an identifier, +/// even if they contain values usually present in the channel group, e.g. +/// `/m/`. +/// +/// If the topic ends before the expected 4 segments, the remaining segments are +/// filled by empty segments (`//`). +/// +/// # Example +/// +/// +/// https://thin-edge.github.io/thin-edge.io/next/references/mqtt-api/#group-identifier +#[derive(Debug, Clone, PartialEq, Eq)] +struct EntityId(String); + +/// Represents a channel group in thin-edge MQTT scheme. +/// +/// A valid channel needs to be at least 2 segments long, with the first segment +/// containing a valid category. +/// +/// +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Channel { + pub category: ChannelCategory, + pub r#type: String, + pub suffix: String, +} + +impl Channel { + pub fn new(channel: &str) -> Result { + let (category, channel) = channel.split_once('/').ok_or(ChannelError::TooShort)?; + let kind = match category { + "m" => ChannelCategory::Measurement, + "e" => ChannelCategory::Event, + "a" => ChannelCategory::Alarm, + "cmd" => ChannelCategory::Command, + _ => return Err(ChannelError::InvalidCategory(category.to_string())), + }; + + let (r#type, suffix) = channel.split_once('/').unwrap_or((channel, "")); + + if r#type.is_empty() { + return Err(ChannelError::TooShort); + } + + Ok(Channel { + category: kind, + r#type: r#type.to_string(), + suffix: suffix.to_string(), + }) + } +} + +impl FromStr for Channel { + type Err = ChannelError; + + fn from_str(s: &str) -> Result { + Self::new(s) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChannelCategory { + Measurement, + Event, + Alarm, + Command, +} + +#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] +pub enum ChannelError { + #[error("Channel needs to have at least 2 segments")] + TooShort, + + #[error("Invalid category: {0:?}")] + InvalidCategory(String), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_full_correct_topic() { + let entity_topic: EntityTopic = + format!("{MQTT_ROOT}/device/child001/service/service001/m/measurement_type") + .parse() + .unwrap(); + + assert_eq!( + entity_topic, + EntityTopic { + entity_id: EntityId("device/child001/service/service001".to_string()), + channel: Some(Channel { + category: ChannelCategory::Measurement, + r#type: "measurement_type".to_string(), + suffix: "".to_string() + }) + } + ); + } + + #[test] + fn parses_nochannel_correct_topic() { + let topic1: EntityTopic = format!("{MQTT_ROOT}/device/child001/service/service001/") + .parse() + .unwrap(); + let topic2: EntityTopic = format!("{MQTT_ROOT}/device/child001/service/service001") + .parse() + .unwrap(); + + let topic = EntityTopic { + entity_id: EntityId("device/child001/service/service001".to_string()), + channel: None, + }; + + assert_eq!(topic1, topic); + assert_eq!(topic2, topic); + } + + #[test] + fn parses_noservice_entity_correct_topic() { + let topic1: EntityTopic = format!("{MQTT_ROOT}/device/child001//").parse().unwrap(); + let topic2: EntityTopic = format!("{MQTT_ROOT}/device/child001").parse().unwrap(); + + let topic = EntityTopic { + entity_id: EntityId("device/child001//".to_string()), + channel: None, + }; + + assert_eq!(topic1, topic); + assert_eq!(topic2, topic); + } + + #[test] + fn no_root() { + let topic = "device/child001/service/service001/m/measurement_type".parse::(); + + assert!(topic.is_err()); + } + + #[test] + fn incorrect_channel() { + let topic1 = format!( + "{MQTT_ROOT}/device/child001/service/service001/incorrect_category/measurement_type" + ) + .parse::(); + + let topic2 = + format!("{MQTT_ROOT}/device/child001/service/service001/m/").parse::(); + + let topic3 = + format!("{MQTT_ROOT}/device/child001/service/service001/m").parse::(); + + assert!(topic1.is_err()); + assert!(topic2.is_err()); + assert!(topic3.is_err()); + } +} diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs new file mode 100644 index 00000000000..5b418a27a09 --- /dev/null +++ b/crates/core/tedge_api/src/entity_store.rs @@ -0,0 +1,565 @@ +//! A store containing registered MQTT entities. +//! +//! References: +//! +//! - +//! - + +// TODO: move entity business logic to its own module + +use std::collections::HashMap; + +use mqtt_channel::Message; +use mqtt_channel::Topic; + +/// Represents an "MQTT entity identifier" portion of the MQTT topic +/// +/// Example: +/// - topic: `te/device/dev1/service/myservice/m//my_measurement +/// - entity id: `device/dev1/service/myservice` +/// +/// +type EntityId = String; + +// type checker is not able to infer `&EntityId` as &str, so alias is needed +// +// TODO: try using Rc, Cow, or string interning to get rid of duplicate strings +type EntityIdRef<'a> = &'a str; + +// In the future, root will be read from config +const MQTT_ROOT: &str = "te"; + +/// A store for topic-based entity metadata lookup. +/// +/// This object is a hashmap from MQTT identifiers to entities (devices or +/// services) that publish on those topics. It keeps track of type of entities, +/// their relationships (parent and child entities), and other metadata. +/// +/// The entity store takes as input registration messages published by entities +/// (devices and services) and stores information about entities and their +/// hierarchy, allowing to efficiently query it. It's possible to: +/// +/// - enumerate all registered devices +/// - check if a given entity is already registered +/// - query services and child devices of a given device +/// - query parent of an entity +/// +/// # Examples +/// +/// ``` +/// # use mqtt_channel::{Message, Topic}; +/// # use tedge_api::entity_store::{EntityStore, EntityRegistrationMessage}; +/// let mqtt_message = Message::new( +/// &Topic::new("te/device/main//").unwrap(), +/// r#"{"@type": "device"}"#.to_string(), +/// ); +/// let registration_message = EntityRegistrationMessage::try_from(mqtt_message).unwrap(); +/// +/// let mut entity_store = EntityStore::with_main_device(registration_message); +/// ``` +#[derive(Debug, Clone)] +pub struct EntityStore { + main_device: EntityId, + entities: HashMap, + external_id_index: HashMap, +} + +impl EntityStore { + /// Creates a new entity store with a given main device. + pub fn with_main_device(main_device: EntityRegistrationMessage) -> Option { + if main_device.r#type != EntityType::MainDevice { + return None; + } + + let external_id = main_device.external_id?; + let metadata = EntityMetadata { + external_id: external_id.clone(), + r#type: main_device.r#type, + parent: None, + other: main_device.payload, + }; + + Some(EntityStore { + main_device: main_device.mqtt_id.clone(), + entities: HashMap::from([(main_device.mqtt_id.clone(), metadata)]), + external_id_index: HashMap::from([(external_id, main_device.mqtt_id)]), + }) + } + + /// Returns information about an entity under a given MQTT entity topic. + pub fn get(&self, entity_id: &str) -> Option<&EntityMetadata> { + self.entities.get(entity_id) + } + + /// Returns information for an entity under a given external id. + pub fn get_by_external_id(&self, external_id: &str) -> Option<&EntityMetadata> { + let mqtt_id = self.external_id_index.get(external_id)?; + self.get(mqtt_id) + } + + /// Returns the MQTT identifier of the main device. + /// + /// The main device is an entity with `@type: "device"`. + pub fn main_device(&self) -> EntityIdRef { + self.main_device.as_str() + } + + /// Returns MQTT identifiers of child devices of a given device. + pub fn child_devices(&self, entity_topic: EntityIdRef) -> Vec { + self.entities + .iter() + .filter(|(_, e)| { + // can be replaced by `is_some_and` after MSRV upgrade to 1.70 + e.parent.as_ref().map_or(false, |p| p == entity_topic) + && e.r#type == EntityType::ChildDevice + }) + .map(|(k, _)| k.as_str()) + .collect() + } + + /// Returns MQTT identifiers of services running on a given device. + pub fn services(&self, entity_topic: EntityIdRef) -> Vec { + self.entities + .iter() + .filter(|(_, e)| { + // can be replaced by `is_some_and` after MSRV upgrade to 1.70 + e.parent.as_ref().map_or(false, |p| p == entity_topic) + && e.r#type == EntityType::Service + }) + .map(|(k, _)| k.as_str()) + .collect() + } + + /// Updates entity store state based on the content of the entity + /// registration message. + /// + /// It can register a new entity in the store or update already registered + /// entity, returning a list of all entities affected by the update, e.g.: + /// + /// - when adding/removing a child device or service, the parent is affected + pub fn update(&mut self, message: EntityRegistrationMessage) -> Result, Error> { + if message.r#type == EntityType::MainDevice && message.mqtt_id != self.main_device { + return Err(Error::MainDeviceAlreadyRegistered( + self.main_device.as_str().into(), + )); + } + + let mut affected_entities = vec![]; + + let parent = match message.r#type { + EntityType::ChildDevice => message.parent.or(Some(self.main_device.clone())), + EntityType::Service => message.parent.or(Some(self.main_device.clone())), + EntityType::MainDevice => None, + }; + + // parent device is affected if new device is its child + if let Some(parent) = &parent { + if !self.entities.contains_key(parent) { + return Err(Error::NoParent(parent.clone().into_boxed_str())); + } + + affected_entities.push(parent.clone()); + } + + let external_id = message + .external_id + .unwrap_or_else(|| self.derive_external_id(&message.mqtt_id)); + let entity_metadata = EntityMetadata { + r#type: message.r#type, + external_id: external_id.clone(), + parent, + other: message.payload, + }; + + // device is affected if it was previously registered and was updated + let previous = self + .entities + .insert(message.mqtt_id.clone(), entity_metadata); + + if previous.is_some() { + affected_entities.push(message.mqtt_id); + } else { + self.external_id_index.insert(external_id, message.mqtt_id); + } + + Ok(affected_entities) + } + + /// An iterator over all registered entities. + pub fn iter(&self) -> impl Iterator { + self.entities.iter() + } + + /// Generate child device external ID. + /// + /// The external id is generated by prefixing the id with main device name + /// (device_common_name) and then appending the MQTT entity topic with `/` + /// characters replaced by `:`. + /// + /// # Examples + /// - `device/main//` => `DEVICE_COMMON_NAME` + /// - `device/child001//` => `DEVICE_COMMON_NAME:device:child001` + /// - `device/child001/service/service001` => `DEVICE_COMMON_NAME:device:child001:service:service001` + /// - `factory01/hallA/packaging/belt001` => `DEVICE_COMMON_NAME:factory01:hallA:packaging:belt001` + fn derive_external_id(&self, entity_topic: EntityIdRef) -> String { + if entity_topic == self.main_device { + self.get(&self.main_device).unwrap().external_id.to_string() + } else { + let main_device_external_id = &self.get(&self.main_device).unwrap().external_id; + let external_id_suffix = entity_topic.replace('/', ":"); + let external_id_suffix = external_id_suffix.trim_matches(':'); + + format!("{main_device_external_id}:{external_id_suffix}") + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EntityMetadata { + parent: Option, + r#type: EntityType, + external_id: String, + other: serde_json::Value, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum EntityType { + MainDevice, + ChildDevice, + Service, +} + +/// Represents an error encountered while updating the store. +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum Error { + #[error("Specified parent {0:?} does not exist in the store")] + NoParent(Box), + + #[error("Main device was not registered. Before registering child entities, register the main device")] + NoMainDevice, + + #[error("The main device was already registered at topic {0}")] + MainDeviceAlreadyRegistered(Box), +} + +/// An object representing a valid entity registration message. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EntityRegistrationMessage { + mqtt_id: EntityId, + external_id: Option, + r#type: EntityType, + parent: Option, + payload: serde_json::Value, +} + +impl EntityRegistrationMessage { + /// Parses a MQTT message as an entity registration message. + /// + /// MQTT message is an entity registration message if + /// - published on a prefix of `te/+/+/+/+` + /// - its payload contains a registration message. + pub fn new(message: Message) -> Option { + let payload = parse_entity_register_payload(message.payload_bytes())?; + + let r#type = payload + .get("@type") + .and_then(|t| t.as_str()) + .map(|t| t.to_owned())?; + let r#type = match r#type.as_str() { + "device" => EntityType::MainDevice, + "child-device" => EntityType::ChildDevice, + "service" => EntityType::Service, + _ => return None, + }; + + let parent = if r#type == EntityType::ChildDevice || r#type == EntityType::Service { + payload + .get("@parent") + .and_then(|p| p.as_str()) + .map(|p| p.to_owned()) + } else { + None + }; + + let external_id = payload + .get("@id") + .and_then(|id| id.as_str()) + .map(|id| id.to_string()); + + let mqtt_id = message + .topic + .name + .strip_prefix(MQTT_ROOT) + .and_then(|s| s.strip_prefix('/'))?; + + Some(Self { + mqtt_id: mqtt_id.to_string(), + external_id, + r#type, + parent, + payload, + }) + } + + /// Creates a entity registration message for a main device. + pub fn main_device(external_id: String) -> Self { + Self { + mqtt_id: "device/main//".to_string(), + external_id: Some(external_id), + r#type: EntityType::MainDevice, + parent: None, + payload: serde_json::json!({}), + } + } +} + +impl TryFrom for EntityRegistrationMessage { + type Error = (); + + fn try_from(value: Message) -> Result { + EntityRegistrationMessage::new(value).ok_or(()) + } +} + +/// Parse a MQTT message payload as an entity registration payload. +/// +/// Returns `Some(register_payload)` if a payload is valid JSON and is a +/// registration payload, or `None` otherwise. +fn parse_entity_register_payload(payload: &[u8]) -> Option { + let payload = serde_json::from_slice::(payload).ok()?; + + if payload.get("@type").is_some() { + Some(payload) + } else { + None + } +} + +/// Extracts an MQTT entity identifier from an MQTT topic. +/// +/// This function is usually used for obtaining an entity MQTT identifier from a +/// command or telemetry topic of this entity. +/// +/// The MQTT topic has to contain `root` and `identifier` groups described in +/// [thin-edge documentation on MQTT topic scheme](https://thin-edge.github.io/thin-edge.io/next/references/mqtt-api/#topic-scheme). +/// If these groups are not present, the function returns `None`. +/// +/// ``` +/// # use mqtt_channel::Topic; +/// # use tedge_api::entity_store::entity_mqtt_id; +/// let entity_measurement_topic = Topic::new("te/device/main/service/my_service/m/my_measurement").unwrap(); +/// assert_eq!(entity_mqtt_id(&entity_measurement_topic), Some("device/main/service/my_service")); + +/// let custom_topic = Topic::new("te/device/1/2/3/m/my_measurement").unwrap(); +/// assert_eq!(entity_mqtt_id(&custom_topic), Some("device/1/2/3")); +/// +/// let custom_topic = Topic::new("custom_root/device/1/2/3/m/my_measurement").unwrap(); +/// assert_eq!(entity_mqtt_id(&custom_topic), None); +/// ``` +// TODO: this should be moved to MQTT parsing module when it's created +// https://github.com/thin-edge/thin-edge.io/pull/2118#issuecomment-1668110422 +pub fn entity_mqtt_id(topic: &Topic) -> Option<&str> { + let topic = topic + .name + .strip_prefix(MQTT_ROOT) + .and_then(|s| s.strip_prefix('/'))?; + + let identifier_len = topic + .match_indices('/') + .nth(3) + .map(|(i, _)| i) + .unwrap_or(topic.len()); + + Some(&topic[..identifier_len]) +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn registers_main_device() { + let store = EntityStore::with_main_device(EntityRegistrationMessage { + mqtt_id: "device/main//".to_string(), + external_id: Some("test-device".to_string()), + r#type: EntityType::MainDevice, + parent: None, + payload: json!({"@type": "device"}), + }) + .unwrap(); + + assert_eq!(store.main_device(), "device/main//"); + assert!(store.get("device/main//").is_some()); + } + + #[test] + fn lists_child_devices() { + let mut store = EntityStore::with_main_device(EntityRegistrationMessage { + mqtt_id: "device/main//".to_string(), + external_id: Some("test-device".to_string()), + r#type: EntityType::MainDevice, + parent: None, + payload: json!({"@type": "device"}), + }) + .unwrap(); + + // If the @parent info is not provided, it is assumed to be an immediate + // child of the main device. + let updated_entities = store + .update( + EntityRegistrationMessage::new(Message::new( + &Topic::new("te/device/child1//").unwrap(), + json!({"@type": "child-device"}).to_string(), + )) + .unwrap(), + ) + .unwrap(); + + assert_eq!(updated_entities, ["device/main//"]); + assert_eq!(store.child_devices("device/main//"), ["device/child1//"]); + + let updated_entities = store + .update( + EntityRegistrationMessage::new(Message::new( + &Topic::new("te/device/child2//").unwrap(), + json!({"@type": "child-device", "@parent": "device/main//"}).to_string(), + )) + .unwrap(), + ) + .unwrap(); + assert_eq!(updated_entities, ["device/main//"]); + let children = store.child_devices("device/main//"); + assert!(children.iter().any(|&e| e == "device/child1//")); + assert!(children.iter().any(|&e| e == "device/child2//")); + } + + #[test] + fn lists_services() { + let mut store = EntityStore::with_main_device(EntityRegistrationMessage { + r#type: EntityType::MainDevice, + external_id: Some("test-device".to_string()), + mqtt_id: "device/main//".to_string(), + parent: None, + payload: json!({}), + }) + .unwrap(); + + // Services are namespaced under devices, so `parent` is not necessary + let updated_entities = store + .update(EntityRegistrationMessage { + r#type: EntityType::Service, + external_id: None, + mqtt_id: "device/main/service/service1".to_string(), + parent: None, + payload: json!({}), + }) + .unwrap(); + + assert_eq!(updated_entities, ["device/main//"]); + assert_eq!( + store.services("device/main//"), + ["device/main/service/service1"] + ); + + let updated_entities = store + .update(EntityRegistrationMessage { + r#type: EntityType::Service, + external_id: None, + mqtt_id: "device/main/service/service2".to_string(), + parent: None, + payload: json!({}), + }) + .unwrap(); + + assert_eq!(updated_entities, ["device/main//"]); + let services = store.services("device/main//"); + assert!(services + .iter() + .any(|&e| e == "device/main/service/service1")); + assert!(services + .iter() + .any(|&e| e == "device/main/service/service2")); + } + + /// Forbids creating multiple main devices. + /// + /// Publishing new registration message on a topic where main device is + /// registered updates the main device and is allowed. Creating a new main + /// device on another topic is not allowed. + #[test] + fn forbids_multiple_main_devices() { + let mut store = EntityStore::with_main_device(EntityRegistrationMessage { + mqtt_id: "device/main//".try_into().unwrap(), + r#type: EntityType::MainDevice, + external_id: Some("test-device".to_string()), + parent: None, + payload: json!({}), + }) + .unwrap(); + + let res = store.update(EntityRegistrationMessage { + mqtt_id: "device/another_main//".try_into().unwrap(), + external_id: Some("test-device".to_string()), + r#type: EntityType::MainDevice, + parent: None, + payload: json!({}), + }); + + assert_eq!( + res, + Err(Error::MainDeviceAlreadyRegistered("device/main//".into())) + ); + } + + #[test] + fn forbids_nonexistent_parents() { + let mut store = EntityStore::with_main_device(EntityRegistrationMessage { + mqtt_id: "device/main//".try_into().unwrap(), + external_id: Some("test-device".to_string()), + r#type: EntityType::MainDevice, + parent: None, + payload: json!({}), + }) + .unwrap(); + + let res = store.update(EntityRegistrationMessage { + mqtt_id: "device/main//".try_into().unwrap(), + external_id: None, + r#type: EntityType::ChildDevice, + parent: Some("device/myawesomeparent//".to_string()), + payload: json!({}), + }); + + assert!(matches!(res, Err(Error::NoParent(_)))); + } + + #[test] + fn generates_external_ids() { + let mut store = EntityStore::with_main_device(EntityRegistrationMessage { + mqtt_id: "device/main//".try_into().unwrap(), + external_id: Some("test-device".to_string()), + r#type: EntityType::MainDevice, + parent: None, + payload: json!({}), + }) + .unwrap(); + + store + .update(EntityRegistrationMessage { + mqtt_id: "device/child001/service/service001".to_string(), + external_id: None, + r#type: EntityType::ChildDevice, + parent: None, + payload: serde_json::json!({}), + }) + .unwrap(); + + let entity1 = store.get_by_external_id("test-device:device:child001:service:service001"); + assert_eq!( + entity1.unwrap().external_id, + "test-device:device:child001:service:service001" + ); + } +} diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index 73bdf39c067..c9c3cfc0539 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -6,6 +6,8 @@ pub mod topic; pub mod alarm; pub mod builder; pub mod data; +pub mod entity; +pub mod entity_store; pub mod event; pub mod group; pub mod health; @@ -15,6 +17,7 @@ pub mod serialize; pub mod utils; pub use download::*; +pub use entity_store::EntityStore; pub use error::*; pub use messages::control_filter_topic; pub use messages::software_filter_topic; diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index 2c3ee1e1238..d5dd1cd6807 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -7,7 +7,6 @@ use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC; use c8y_http_proxy::handle::C8YHttpProxy; use c8y_http_proxy::messages::C8YRestRequest; use c8y_http_proxy::messages::C8YRestResult; -use std::collections::HashMap; use std::path::PathBuf; use std::time::Duration; use tedge_actors::adapt; @@ -27,6 +26,8 @@ use tedge_actors::Sender; use tedge_actors::ServiceProvider; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::entity_store; +use tedge_api::EntityStore; use tedge_file_system_ext::FsWatchEvent; use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; @@ -37,6 +38,7 @@ use tedge_timer_ext::Timeout; use tedge_utils::file::create_directory_with_defaults; use tedge_utils::file::FileError; +const MQTT_ROOT: &str = "te"; const SYNC_WINDOW: Duration = Duration::from_secs(3); pub type SyncStart = SetTimeout<()>; @@ -50,7 +52,7 @@ pub struct C8yMapperActor { messages: SimpleMessageBox, mqtt_publisher: LoggingSender, timer_sender: LoggingSender, - registered_entities: HashMap, + entity_store: EntityStore, } #[async_trait] @@ -93,23 +95,35 @@ impl C8yMapperActor { messages: SimpleMessageBox, mqtt_publisher: LoggingSender, timer_sender: LoggingSender, + device_id: String, ) -> Self { + let main_device = entity_store::EntityRegistrationMessage::main_device(device_id); Self { converter, messages, mqtt_publisher, timer_sender, - registered_entities: HashMap::new(), + entity_store: EntityStore::with_main_device(main_device).unwrap(), } } async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> { - // register device if not registered - if let Some(entity_id) = entity_mqtt_id(&message.topic) { - if self.registered_entities.get(entity_id).is_none() { - let register_messages = self.auto_register_entity(entity_id); - for msg in register_messages { - let _ = self.mqtt_publisher.send(msg).await; + // to which entity is the message related + if let Some(entity_id) = entity_store::entity_mqtt_id(&message.topic) { + // if message is registration message: + if is_entity_register_message(&message) { + // update entity store with registration message + // TODO: emit error message on Err + self.entity_store + .update(message.clone().try_into().unwrap()) + .unwrap(); + } else { + // if device is unregistered register using auto-registration + if self.entity_store.get(entity_id).is_none() { + let register_messages = self.auto_register_entity(entity_id); + for msg in register_messages { + let _ = self.mqtt_publisher.send(msg).await; + } } } } @@ -134,32 +148,42 @@ impl C8yMapperActor { /// being registered. fn auto_register_entity(&mut self, entity_id: &str) -> Vec { let mut register_messages = vec![]; - if let Some(("te", topic)) = entity_id.split_once('/') { - let (device_id, service_id) = match topic.split('/').collect::>()[..] { - ["device", device_id, "service", service_id, ..] => (device_id, Some(service_id)), - ["device", device_id, ..] => (device_id, None), - _ => return register_messages, - }; - - let device_register_topic = format!("te/device/{device_id}"); - let device_register_payload = r#"{ "@type": "device", "type": "Gateway" }"#.to_string(); - register_messages.push(Message::new( - &Topic::new(&device_register_topic).unwrap(), + let (device_id, service_id) = match entity_id.split('/').collect::>()[..] { + ["device", device_id, "service", service_id, ..] => (device_id, Some(service_id)), + ["device", device_id, "", ""] => (device_id, None), + _ => return register_messages, + }; + + // register device if not registered + let device_topic = format!("device/{device_id}//"); + if self.entity_store.get(&device_topic).is_none() { + let device_register_payload = r#"{ "@type": "child-device" }"#.to_string(); + let device_register_message = Message::new( + &Topic::new(&device_topic).unwrap(), device_register_payload.clone(), - )); - self.register_entity(device_register_topic, device_register_payload); - - if let Some(service_id) = service_id { - let service_register_topic = format!("te/device/{device_id}/service/{service_id}"); - let service_register_payload = - r#"{"@type": "service", "type": "systemd"}"#.to_string(); - register_messages.push(Message::new( - &Topic::new(&service_register_topic).unwrap(), - service_register_payload.clone(), - )); - self.register_entity(service_register_topic, service_register_payload); - } + ) + .with_retain(); + register_messages.push(device_register_message.clone()); + self.entity_store + .update(device_register_message.try_into().unwrap()) + .unwrap(); + } + + // register service itself + if let Some(service_id) = service_id { + let service_topic = format!("{MQTT_ROOT}/device/{device_id}/service/{service_id}"); + let service_register_payload = r#"{"@type": "service", "type": "systemd"}"#.to_string(); + let service_register_message = Message::new( + &Topic::new(&service_topic).unwrap(), + service_register_payload.clone(), + ) + .with_retain(); + register_messages.push(service_register_message.clone()); + self.entity_store + .update(service_register_message.try_into().unwrap()) + .unwrap(); } + register_messages } @@ -168,9 +192,9 @@ impl C8yMapperActor { /// If a given entity was registered previously, the function will do /// nothing. Otherwise it will save registration data to memory, free to be /// queried by other components. - fn register_entity(&mut self, topic: String, payload: String) { - self.registered_entities.entry(topic).or_insert(payload); - } + // fn register_entity(&mut self, topic: String, payload: String) { + // self.entity_store.entry(&topic).or_insert(payload); + // } async fn process_file_watch_event( &mut self, @@ -222,17 +246,15 @@ impl C8yMapperActor { } } -fn entity_mqtt_id(topic: &Topic) -> Option<&str> { - match topic.name.split('/').collect::>()[..] { - ["te", "device", _device_id, "service", service_id, ..] => { - Some(&topic.name[..topic.name.find(service_id).unwrap() + service_id.len()]) - } - ["te", "device", device_id, ..] => { - Some(&topic.name[..topic.name.find(device_id).unwrap() + device_id.len()]) - } - _ => None, - } +/// Check if a message is an entity registration message. +fn is_entity_register_message(message: &Message) -> bool { + let Ok(payload) = serde_json::from_slice::(message.payload_bytes()) else { + return false; + }; + + message.retain && payload.get("@type").is_some() && payload.get("type").is_some() } + pub struct C8yMapperBuilder { config: C8yMapperConfig, box_builder: SimpleMessageBoxBuilder, @@ -290,6 +312,8 @@ impl Builder for C8yMapperBuilder { let mqtt_publisher = LoggingSender::new("C8yMapper => Mqtt".into(), self.mqtt_publisher); let timer_sender = LoggingSender::new("C8yMapper => Timer".into(), self.timer_sender); + let device_id = self.config.device_id.clone(); + let converter = CumulocityConverter::new(self.config, mqtt_publisher.clone(), self.http_proxy) .map_err(|err| RuntimeError::ActorError(Box::new(err)))?; @@ -301,6 +325,7 @@ impl Builder for C8yMapperBuilder { message_box, mqtt_publisher, timer_sender, + device_id, )) } } diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 879c99fe9fc..3fa7e9c3f36 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -15,7 +15,6 @@ use std::time::Duration; use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::Actor; use tedge_actors::Builder; -use tedge_actors::Message; use tedge_actors::MessageReceiver; use tedge_actors::NoMessage; use tedge_actors::Sender; @@ -1180,8 +1179,9 @@ async fn inventory_registers_unknown_entity_once() { "", ); - mqtt.send(measurement_message.clone()).await.unwrap(); - mqtt.send(measurement_message).await.unwrap(); + for _ in 0..5 { + mqtt.send(measurement_message.clone()).await.unwrap(); + } mqtt.close_sender(); @@ -1190,35 +1190,13 @@ async fn inventory_registers_unknown_entity_once() { messages.push(msg); } - // Assert device register message was published once - let mut device_register_messages = messages - .iter() - .filter(|message| message.topic == "te/device/main".try_into().unwrap()); - let device_register_message = device_register_messages - .next() - .expect("Device register message must be present"); - - let device_register_payload = - serde_json::from_slice::(device_register_message.payload_bytes()) - .expect("Device register message payload must be JSON"); - assert_json_include!( - actual: device_register_payload, - expected: json!({ "@type": "device", "type": "Gateway" }) - ); - - assert_eq!( - device_register_messages.next(), - None, - "There can't be more than 1 device register message" - ); - - // Assert service register message was published once - let mut service_register_messages = messages + // we should not emit a registration message for the main device, only the + // service + let mut dut_register_messages: Vec<_> = messages .iter() - .filter(|message| message.topic == "te/device/main/service/my_service".try_into().unwrap()); - let service_register_message = service_register_messages - .next() - .expect("Service register message must be present"); + .filter(|message| message.topic.name.starts_with("te/device/main")) + .collect(); + let service_register_message = dut_register_messages.remove(0); let service_register_payload = serde_json::from_slice::(service_register_message.payload_bytes()) @@ -1228,10 +1206,11 @@ async fn inventory_registers_unknown_entity_once() { expected: json!({"@type": "service", "type": "systemd"}) ); - assert_eq!( - service_register_messages.next(), - None, - "There can't be more than 1 service register message" + assert!( + !dut_register_messages + .into_iter() + .any(|message| message == service_register_message), + "duplicate registration message" ); }