diff --git a/crates/core/c8y_api/src/smartrest/inventory.rs b/crates/core/c8y_api/src/smartrest/inventory.rs index 42d3e473eb1..f15c5a82246 100644 --- a/crates/core/c8y_api/src/smartrest/inventory.rs +++ b/crates/core/c8y_api/src/smartrest/inventory.rs @@ -68,6 +68,21 @@ pub fn service_creation_message( ancestors: &[String], prefix: &TopicPrefix, ) -> Result { + Ok(MqttMessage::new( + &publish_topic_from_ancestors(ancestors, prefix), + service_creation_message_payload(service_id, service_name, service_type, service_status)?, + )) +} + +/// Create a SmartREST message for creating a service on device. +/// The provided ancestors list must contain all the parents of the given service +/// starting from its immediate parent device. +pub fn service_creation_message_payload( + service_id: &str, + service_name: &str, + service_type: &str, + service_status: &str, +) -> Result { // TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format if service_id.is_empty() { return Err(InvalidValueError { @@ -94,16 +109,13 @@ pub fn service_creation_message( }); } - Ok(MqttMessage::new( - &publish_topic_from_ancestors(ancestors, prefix), - fields_to_csv_string(&[ - "102", - service_id, - service_type, - service_name, - service_status, - ]), - )) + Ok(fields_to_csv_string(&[ + "102", + service_id, + service_type, + service_name, + service_status, + ])) } /// Create a SmartREST message for updating service status. diff --git a/crates/core/tedge/src/cli/common.rs b/crates/core/tedge/src/cli/common.rs index 0f19edd2180..72388a60c9e 100644 --- a/crates/core/tedge/src/cli/common.rs +++ b/crates/core/tedge/src/cli/common.rs @@ -1,6 +1,6 @@ use tedge_config::system_services::SystemService; -#[derive(Copy, Clone, Debug, strum_macros::Display, strum_macros::IntoStaticStr)] +#[derive(Copy, Clone, Debug, strum_macros::Display, strum_macros::IntoStaticStr, PartialEq, Eq)] pub enum Cloud { #[strum(serialize = "Cumulocity")] C8y, diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index d900c290cf0..98c10bfb9f2 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -502,19 +502,25 @@ fn new_bridge( } } + if let Err(err) = + write_generic_mosquitto_config_to_file(config_location, common_mosquitto_config) + { + // We want to preserve previous errors and therefore discard result of this function. + let _ = clean_up(config_location, bridge_config); + return Err(err); + } + if bridge_config.bridge_location == BridgeLocation::Mosquitto { println!("Saving configuration for requested bridge.\n"); - if let Err(err) = - write_bridge_config_to_file(config_location, bridge_config, common_mosquitto_config) - { + if let Err(err) = write_bridge_config_to_file(config_location, bridge_config) { // We want to preserve previous errors and therefore discard result of this function. let _ = clean_up(config_location, bridge_config); return Err(err); } } else { println!("Deleting mosquitto bridge configuration in favour of built-in bridge\n"); - clean_up(config_location, bridge_config)?; + use_built_in_bridge(config_location, bridge_config)?; } if let Err(err) = service_manager_result { @@ -543,18 +549,7 @@ fn new_bridge( Ok(()) } -fn restart_mosquitto( - bridge_config: &BridgeConfig, - service_manager: &dyn SystemServiceManager, - config_location: &TEdgeConfigLocation, -) -> Result<(), ConnectError> { - println!("Restarting mosquitto service.\n"); - - if let Err(err) = service_manager.stop_service(SystemService::Mosquitto) { - clean_up(config_location, bridge_config)?; - return Err(err.into()); - } - +pub fn chown_certificate_and_key(bridge_config: &BridgeConfig) { let (user, group) = match bridge_config.bridge_location { BridgeLocation::BuiltIn => ("tedge", "tedge"), BridgeLocation::Mosquitto => (crate::BROKER_USER, crate::BROKER_GROUP), @@ -570,6 +565,21 @@ fn restart_mosquitto( warn!("Failed to change ownership of {path} to {user}:{group}: {err}"); } } +} + +fn restart_mosquitto( + bridge_config: &BridgeConfig, + service_manager: &dyn SystemServiceManager, + config_location: &TEdgeConfigLocation, +) -> Result<(), ConnectError> { + println!("Restarting mosquitto service.\n"); + + if let Err(err) = service_manager.stop_service(SystemService::Mosquitto) { + clean_up(config_location, bridge_config)?; + return Err(err.into()); + } + + chown_certificate_and_key(bridge_config); if let Err(err) = service_manager.restart_service(SystemService::Mosquitto) { clean_up(config_location, bridge_config)?; @@ -597,7 +607,7 @@ fn enable_software_management( // To preserve error chain and not discard other errors we need to ignore error here // (don't use '?' with the call to this function to preserve original error). -fn clean_up( +pub fn clean_up( config_location: &TEdgeConfigLocation, bridge_config: &BridgeConfig, ) -> Result<(), ConnectError> { @@ -606,6 +616,19 @@ fn clean_up( Ok(()) } +pub fn use_built_in_bridge( + config_location: &TEdgeConfigLocation, + bridge_config: &BridgeConfig, +) -> Result<(), ConnectError> { + let path = get_bridge_config_file_path(config_location, bridge_config); + std::fs::write( + path, + "# This file is left empty as the built-in bridge is enabled", + ) + .or_else(ok_if_not_found)?; + Ok(()) +} + fn bridge_config_exists( config_location: &TEdgeConfigLocation, bridge_config: &BridgeConfig, @@ -619,9 +642,8 @@ fn bridge_config_exists( Ok(()) } -fn write_bridge_config_to_file( +fn write_generic_mosquitto_config_to_file( config_location: &TEdgeConfigLocation, - bridge_config: &BridgeConfig, common_mosquitto_config: &CommonMosquittoConfig, ) -> Result<(), ConnectError> { let dir_path = config_location @@ -637,6 +659,20 @@ fn write_bridge_config_to_file( common_mosquitto_config.serialize(&mut common_draft)?; common_draft.persist()?; + Ok(()) +} + +fn write_bridge_config_to_file( + config_location: &TEdgeConfigLocation, + bridge_config: &BridgeConfig, +) -> Result<(), ConnectError> { + let dir_path = config_location + .tedge_config_root_path + .join(TEDGE_BRIDGE_CONF_DIR_PATH); + + // This will forcefully create directory structure if it doesn't exist, we should find better way to do it, maybe config should deal with it? + create_directories(dir_path)?; + let config_path = get_bridge_config_file_path(config_location, bridge_config); let mut config_draft = DraftFile::new(config_path)?.with_mode(0o644); bridge_config.serialize(&mut config_draft)?; diff --git a/crates/core/tedge/src/cli/disconnect/disconnect_bridge.rs b/crates/core/tedge/src/cli/disconnect/disconnect_bridge.rs index bf09e65c0bc..4975faaaae8 100644 --- a/crates/core/tedge/src/cli/disconnect/disconnect_bridge.rs +++ b/crates/core/tedge/src/cli/disconnect/disconnect_bridge.rs @@ -85,12 +85,8 @@ impl DisconnectBridgeCommand { // If bridge config file was not found we assume that the bridge doesn't exist, // We finish early returning exit code 0. Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - if self.built_in_bridge { - Ok(()) - } else { - println!("Bridge doesn't exist. Operation finished!"); - Err(DisconnectBridgeError::BridgeFileDoesNotExist) - } + println!("Bridge doesn't exist. Operation finished!"); + Err(DisconnectBridgeError::BridgeFileDoesNotExist) } Err(e) => Err(DisconnectBridgeError::FileOperationFailed( diff --git a/crates/core/tedge/src/cli/refresh_bridges.rs b/crates/core/tedge/src/cli/refresh_bridges.rs index 067b90cf2a0..d45d227ad15 100644 --- a/crates/core/tedge/src/cli/refresh_bridges.rs +++ b/crates/core/tedge/src/cli/refresh_bridges.rs @@ -9,6 +9,7 @@ use tedge_config::TEdgeConfigLocation; use super::common::Cloud; use super::connect::ConnectError; use crate::bridge::BridgeConfig; +use crate::bridge::BridgeLocation; use crate::bridge::CommonMosquittoConfig; use crate::bridge::TEDGE_BRIDGE_CONF_DIR_PATH; use crate::command::BuildContext; @@ -28,7 +29,7 @@ impl Command for RefreshBridgesCmd { fn execute(&self) -> anyhow::Result<()> { let clouds = established_bridges(&self.config_location); - if clouds.is_empty() { + if clouds.is_empty() && !self.config.mqtt.bridge.built_in { println!("No bridges to refresh."); return Ok(()); } @@ -36,11 +37,30 @@ impl Command for RefreshBridgesCmd { let common_mosquitto_config = CommonMosquittoConfig::from_tedge_config(&self.config); common_mosquitto_config.save(&self.config_location)?; - for cloud in clouds { - println!("Refreshing bridge {cloud}"); + if !self.config.mqtt.bridge.built_in { + for cloud in &clouds { + println!("Refreshing bridge {cloud}"); - let bridge_config = super::connect::bridge_config(&self.config, cloud)?; - refresh_bridge(&bridge_config, &self.config_location)?; + let bridge_config = super::connect::bridge_config(&self.config, *cloud)?; + refresh_bridge(&bridge_config, &self.config_location)?; + } + } + + for cloud in [Cloud::Aws, Cloud::Azure, Cloud::C8y] { + // (attempt to) reassert ownership of the certificate and key + // This is necessary when upgrading from the mosquitto bridge to the built-in bridge + if let Ok(bridge_config) = super::connect::bridge_config(&self.config, cloud) { + super::connect::chown_certificate_and_key(&bridge_config); + + if bridge_config.bridge_location == BridgeLocation::BuiltIn + && clouds.contains(&cloud) + { + println!( + "Deleting mosquitto bridge configuration for {cloud} in favour of built-in bridge" + ); + super::connect::use_built_in_bridge(&self.config_location, &bridge_config)?; + } + } } println!("Restarting mosquitto service.\n"); diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index fdac59ef3a5..47952038b99 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -17,9 +17,11 @@ use tedge_config::TEdgeConfig; use tedge_downloader_ext::DownloaderActor; use tedge_file_system_ext::FsWatchActorBuilder; use tedge_http_ext::HttpActor; +use tedge_mqtt_bridge::rumqttc::LastWill; use tedge_mqtt_bridge::use_key_and_cert; use tedge_mqtt_bridge::BridgeConfig; use tedge_mqtt_bridge::MqttBridgeActorBuilder; +use tedge_mqtt_bridge::QoS; use tedge_mqtt_ext::MqttActorBuilder; use tedge_timer_ext::TimerActor; use tedge_uploader_ext::UploaderActor; @@ -72,7 +74,36 @@ impl TEdgeComponent for CumulocityMapper { tc.forward_from_remote(topic, local_prefix.clone(), "")?; } - tc.forward_from_local("#", local_prefix, "")?; + // Templates + tc.forward_from_local("s/ut/#", local_prefix.clone(), "")?; + + // Static templates + tc.forward_from_local("s/us", local_prefix.clone(), "")?; + tc.forward_from_local("s/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/us/#", local_prefix.clone(), "")?; + + // SmartREST2 + tc.forward_from_local("s/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/uc/#", local_prefix.clone(), "")?; + + // c8y JSON + tc.forward_from_local( + "inventory/managedObjects/update/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local( + "measurement/measurements/create/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local("event/events/create/#", local_prefix.clone(), "")?; + tc.forward_from_local("alarm/alarms/create/#", local_prefix.clone(), "")?; + tc.forward_from_local("s/uat", local_prefix.clone(), "")?; let c8y = tedge_config.c8y.mqtt.or_config_not_set()?; let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( @@ -89,14 +120,68 @@ impl TEdgeComponent for CumulocityMapper { &tedge_config, )?; - let bridge_actor = MqttBridgeActorBuilder::new( - &tedge_config, - c8y_mapper_config.bridge_service_name(), - tc, - cloud_config, - ) - .await; - runtime.spawn(bridge_actor).await?; + let main_device_xid: EntityExternalId = + tedge_config.device.id.try_read(&tedge_config)?.into(); + let service_type = &tedge_config.service.ty; + let service_type = if service_type.is_empty() { + "service".to_string() + } else { + service_type.to_string() + }; + + // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme + + // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some + // predefined topic id instead of trying to derive it from current device? + let entity_topic_id: EntityTopicId = tedge_config + .mqtt + .device_topic_id + .clone() + .parse() + .context("Invalid device_topic_id")?; + + let mapper_service_topic_id = entity_topic_id + .default_service_for_device(CUMULOCITY_MAPPER_NAME) + .context("Can't derive service name if device topic id not in default scheme")?; + + let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id( + &mapper_service_topic_id, + &main_device_xid, + ); + + let last_will_message_mapper = + c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + CUMULOCITY_MAPPER_NAME, + service_type.as_str(), + "down", + )?; + let last_will_message_bridge = + c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + &c8y_mapper_config.bridge_service_name(), + service_type.as_str(), + "down", + )?; + + cloud_config.set_last_will(LastWill { + topic: "s/us".into(), + qos: QoS::AtLeastOnce, + message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(), + retain: false, + }); + + runtime + .spawn( + MqttBridgeActorBuilder::new( + &tedge_config, + c8y_mapper_config.bridge_service_name(), + tc, + cloud_config, + ) + .await, + ) + .await?; } let mut jwt_actor = C8YJwtRetriever::builder( mqtt_config.clone(), diff --git a/crates/extensions/tedge_mqtt_bridge/src/config.rs b/crates/extensions/tedge_mqtt_bridge/src/config.rs index 034f29d382f..54370660ba2 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/config.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/config.rs @@ -404,7 +404,7 @@ mod tests { "b/".into(), ) .unwrap()]); - assert_eq!(converter.convert_topic("a/topic"), "b/topic") + assert_eq!(converter.convert_topic("a/topic"), Some("b/topic".into())) } #[test] @@ -413,7 +413,7 @@ mod tests { BridgeRule::try_new("topic".into(), "a/".into(), "b/".into()).unwrap(), BridgeRule::try_new("#".into(), "a/".into(), "c/".into()).unwrap(), ]); - assert_eq!(converter.convert_topic("a/topic"), "b/topic"); + assert_eq!(converter.convert_topic("a/topic"), Some("b/topic".into())); } #[test] @@ -422,7 +422,7 @@ mod tests { BridgeRule::try_new("topic".into(), "x/".into(), "b/".into()).unwrap(), BridgeRule::try_new("#".into(), "a/".into(), "c/".into()).unwrap(), ]); - assert_eq!(converter.convert_topic("a/topic"), "c/topic"); + assert_eq!(converter.convert_topic("a/topic"), Some("c/topic".into())); } } diff --git a/crates/extensions/tedge_mqtt_bridge/src/health.rs b/crates/extensions/tedge_mqtt_bridge/src/health.rs index 4f3bc6de8d9..e24284c0d8e 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/health.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/health.rs @@ -52,6 +52,8 @@ impl BridgeHealthMonitor { let status = statuses.values().fold(Some(Status::Up), overall_status); if last_status != status { last_status = status; + + // TODO could this deadlock? self.target .publish(&self.topic, QoS::AtLeastOnce, true, status.unwrap().json()) .await diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index e0e38afcd53..f9256936f0b 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use certificate::parse_root_certificate::create_tls_config; use futures::SinkExt; use futures::StreamExt; +pub use rumqttc; use rumqttc::AsyncClient; use rumqttc::ClientError; use rumqttc::Event; @@ -319,20 +320,21 @@ async fn half_bridge( // Forward messages from event loop to target Event::Incoming(Incoming::Publish(publish)) => { if let Some(publish) = loop_breaker.ensure_not_looped(publish).await { - let topic = transformer.convert_topic(&publish.topic); - target - .publish( - topic.clone(), - publish.qos, - publish.retain, - publish.payload.clone(), - ) - .await - .unwrap(); - companion_bridge_half - .send(Some((topic.into_owned(), publish))) - .await - .unwrap(); + if let Some(topic) = transformer.convert_topic(&publish.topic) { + target + .publish( + topic.clone(), + publish.qos, + publish.retain, + publish.payload.clone(), + ) + .await + .unwrap(); + companion_bridge_half + .send(Some((topic.into_owned(), publish))) + .await + .unwrap(); + } } } @@ -375,6 +377,7 @@ enum Status { impl Status { fn json(self) -> &'static str { match self { + // TODO Robot test that I make it to Cumulocity Status::Up => r#"{"status":"up"}"#, Status::Down => r#"{"status":"down"}"#, } @@ -629,8 +632,11 @@ mod tests { tc.forward_from_local("s/us", "c8y/", "").unwrap(); tc.forward_from_local("#", "c8y/", "secondary/").unwrap(); let [(rules, _), _] = tc.converters_and_bidirectional_topic_filters(); - assert_eq!(rules.convert_topic("c8y/s/us"), "s/us"); - assert_eq!(rules.convert_topic("c8y/other"), "secondary/other"); + assert_eq!(rules.convert_topic("c8y/s/us"), Some("s/us".into())); + assert_eq!( + rules.convert_topic("c8y/other"), + Some("secondary/other".into()) + ); } #[test] @@ -639,8 +645,11 @@ mod tests { tc.forward_from_remote("s/ds", "c8y/", "").unwrap(); tc.forward_from_remote("#", "c8y/", "secondary/").unwrap(); let [_, (rules, _)] = tc.converters_and_bidirectional_topic_filters(); - assert_eq!(rules.convert_topic("s/ds"), "c8y/s/ds"); - assert_eq!(rules.convert_topic("secondary/other"), "c8y/other"); + assert_eq!(rules.convert_topic("s/ds"), Some("c8y/s/ds".into())); + assert_eq!( + rules.convert_topic("secondary/other"), + Some("c8y/other".into()) + ); } #[test] diff --git a/crates/extensions/tedge_mqtt_bridge/src/topics.rs b/crates/extensions/tedge_mqtt_bridge/src/topics.rs index 240c05bdd61..07cb9eed831 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/topics.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/topics.rs @@ -1,6 +1,7 @@ use crate::BridgeRule; use rumqttc::matches; use std::borrow::Cow; +use tracing::log::warn; pub fn matches_ignore_dollar_prefix(topic: &str, filter: &str) -> bool { match (&topic[..1], &filter[..1]) { @@ -12,11 +13,13 @@ pub fn matches_ignore_dollar_prefix(topic: &str, filter: &str) -> bool { pub struct TopicConverter(pub Vec); impl TopicConverter { - pub fn convert_topic<'a>(&'a self, topic: &'a str) -> Cow<'a, str> { + pub fn convert_topic<'a>(&'a self, topic: &'a str) -> Option> { self.0 .iter() .find_map(|rule| rule.apply(topic)) - // TODO should this be an error - .unwrap_or_else(|| panic!("Failed to convert {topic:?} with {:?}", self.0)) + .or_else(|| { + warn!("Failed to convert {topic:?}"); + None + }) } } diff --git a/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py b/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py index 31c43431667..96c013c9e6a 100644 --- a/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py +++ b/tests/RobotFramework/libraries/ThinEdgeIO/ThinEdgeIO.py @@ -18,14 +18,11 @@ from DeviceLibrary import DeviceLibrary, DeviceAdapter from Cumulocity import Cumulocity, retry - relativetime_ = Union[datetime, str] - devices_lib = DeviceLibrary() c8y_lib = Cumulocity() - logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(module)s -%(levelname)s- %(message)s" ) @@ -56,11 +53,11 @@ class ThinEdgeIO(DeviceLibrary): """ThinEdgeIO Library""" def __init__( - self, - image: str = DeviceLibrary.DEFAULT_IMAGE, - adapter: str = None, - bootstrap_script: str = DeviceLibrary.DEFAULT_BOOTSTRAP_SCRIPT, - **kwargs, + self, + image: str = DeviceLibrary.DEFAULT_IMAGE, + adapter: str = None, + bootstrap_script: str = DeviceLibrary.DEFAULT_BOOTSTRAP_SCRIPT, + **kwargs, ): super().__init__( image=image, adapter=adapter, bootstrap_script=bootstrap_script, **kwargs @@ -147,7 +144,7 @@ def get_debian_architecture(self): @keyword("Get Logs") def get_logs( - self, name: str = None, date_from: Union[datetime, float] = None, show=True + self, name: str = None, date_from: Union[datetime, float] = None, show=True ): """Get device logs (override base class method to add additional debug info) @@ -224,11 +221,14 @@ def _hide_sensitive_factory(self): # This is fragile and should be improved upon once a more suitable/robust method of logging and querying # the mqtt messages is found. token_replace_pattern = re.compile(r"\{.+$") + def _hide(line: str) -> str: if C8Y_TOKEN_TOPIC in line and "71," in line: - line_sensitive = token_replace_pattern.sub(f"(redacted log entry): Received token: topic={C8Y_TOKEN_TOPIC}, message=71,", line) + line_sensitive = token_replace_pattern.sub( + f"(redacted log entry): Received token: topic={C8Y_TOKEN_TOPIC}, message=71,", line) return line_sensitive return line + return _hide def log_operations(self, mo_id: str, status: str = None): @@ -250,7 +250,7 @@ def log_operations(self, mo_id: str, status: str = None): log_method = ( log.info if operation.status - in (operation.Status.SUCCESSFUL, operation.Status.FAILED) + in (operation.Status.SUCCESSFUL, operation.Status.FAILED) else log.warning ) log_method( @@ -429,12 +429,12 @@ def tedge_disconnect_connect(self, mapper: str = "c8y", sleep: float = 0.0): # Assert presence of a topic (with timeout) # def mqtt_match_messages( - self, - topic: str, - message_pattern: str = None, - date_from: relativetime_ = None, - date_to: relativetime_ = None, - **kwargs, + self, + topic: str, + message_pattern: str = None, + date_from: relativetime_ = None, + date_to: relativetime_ = None, + **kwargs, ) -> List[Dict[str, Any]]: """Match mqtt messages using different types of filters @@ -464,7 +464,7 @@ def mqtt_match_messages( message = json.loads(line) if "message" in message: if message_pattern_re is None or message_pattern_re.match( - message["message"]["payload"] + message["message"]["payload"] ): messages.append(message) except Exception as ex: @@ -477,7 +477,7 @@ def mqtt_match_messages( item for item in messages if not topic - or (topic and mqtt_topic_match(mqtt_matcher, item["message"]["topic"])) + or (topic and mqtt_topic_match(mqtt_matcher, item["message"]["topic"])) ] return matching @@ -501,7 +501,7 @@ def assert_service_health_status_down(self, service: str, device: str = "main") @keyword("Service Health Status Should Be Equal") def assert_service_health_status_equal( - self, service: str, status: str, device: str = "main" + self, service: str, status: str, device: str = "main" ) -> Dict[str, Any]: return self._assert_health_status(service, status=status, device=device) @@ -541,11 +541,11 @@ def _assert_health_status(self, service: str, status: str, device: str = "main") @keyword("Setup") def setup( - self, - skip_bootstrap: bool = None, - cleanup: bool = None, - adapter: str = None, - wait_for_healthy: bool = True, + self, + skip_bootstrap: bool = None, + cleanup: bool = None, + adapter: str = None, + wait_for_healthy: bool = True, ) -> str: serial_sn = super().setup(skip_bootstrap, cleanup, adapter) @@ -554,15 +554,15 @@ def setup( return serial_sn def _assert_mqtt_topic_messages( - self, - topic: str, - date_from: relativetime_ = None, - date_to: relativetime_ = None, - minimum: int = 1, - maximum: int = None, - message_pattern: str = None, - message_contains: str = None, - **kwargs, + self, + topic: str, + date_from: relativetime_ = None, + date_to: relativetime_ = None, + minimum: int = 1, + maximum: int = None, + message_pattern: str = None, + message_contains: str = None, + **kwargs, ) -> List[Dict[str, Any]]: # log.info("Checking mqtt messages for topic: %s", topic) if message_contains: @@ -583,13 +583,13 @@ def _assert_mqtt_topic_messages( if minimum is not None: assert ( - len(messages) >= minimum - ), f"Matching messages is less than minimum.\nwanted: {minimum}\ngot: {len(messages)}\n\nmessages:\n{messages}" + len(messages) >= minimum + ), f"Matching messages on topic '{topic}' is less than minimum.\nwanted: {minimum}\ngot: {len(messages)}\n\nmessages:\n{messages}" if maximum is not None: assert ( - len(messages) <= maximum - ), f"Matching messages is greater than maximum.\nwanted: {maximum}\ngot: {len(messages)}\n\nmessages:\n{messages}" + len(messages) <= maximum + ), f"Matching messages on topic '{topic}' is greater than maximum.\nwanted: {maximum}\ngot: {len(messages)}\n\nmessages:\n{messages}" return messages @@ -613,15 +613,15 @@ def log_mqtt_messages(self, topic: str = "#", date_from: Union[datetime, float] @keyword("Should Have MQTT Messages") def mqtt_should_have_topic( - self, - topic: str, - date_from: relativetime_ = None, - date_to: relativetime_ = None, - message_pattern: str = None, - message_contains: str = None, - minimum: int = 1, - maximum: int = None, - **kwargs, + self, + topic: str, + date_from: relativetime_ = None, + date_to: relativetime_ = None, + message_pattern: str = None, + message_contains: str = None, + minimum: int = 1, + maximum: int = None, + **kwargs, ) -> List[Dict[str, Any]]: """ Check for the presence of a topic @@ -660,11 +660,11 @@ def mqtt_should_have_topic( @keyword("Register Child Device") def register_child( - self, - parent_name: str, - child_name: str, - supported_operations: Union[List[str], str] = None, - name: str = None, + self, + parent_name: str, + child_name: str, + supported_operations: Union[List[str], str] = None, + name: str = None, ): """ Register a child device to a parent along with a given list of supported operations @@ -700,7 +700,7 @@ def set_restart_command(self, command: str, **kwargs): ) @keyword("Set Restart Timeout") - def set_restart_timeout(self, value: Union[str,int], **kwargs): + def set_restart_timeout(self, value: Union[str, int], **kwargs): """Set the restart timeout interval in seconds for how long thin-edge.io should wait to for a device restart to happen. @@ -714,7 +714,7 @@ def set_restart_timeout(self, value: Union[str,int], **kwargs): command = "sed -i -e '/reboot_timeout_seconds/d' /etc/tedge/system.toml" else: command = f"sed -i -e '/reboot_timeout_seconds/d' -e '/reboot =/a reboot_timeout_seconds = {value}' /etc/tedge/system.toml", - self.execute_command(command, **kwargs,) + self.execute_command(command, **kwargs, ) @keyword("Escape Pattern") def regexp_escape(self, pattern: str, is_json: bool = False): @@ -733,6 +733,7 @@ def regexp_escape(self, pattern: str, is_json: bool = False): return value + def to_date(value: relativetime_) -> datetime: if isinstance(value, datetime): return value diff --git a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-aws.robot b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-aws.robot index 610763904b4..ee2af2706ab 100644 --- a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-aws.robot +++ b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-aws.robot @@ -14,6 +14,7 @@ Watchdog does not kill mapper if it responds Execute Command sudo systemctl stop tedge-mapper-aws.service Execute Command sudo systemctl stop tedge-watchdog.service Execute Command cmd=sudo sed -i '10iWatchdogSec=5' /lib/systemd/system/tedge-mapper-aws.service + Execute Command cmd=sudo sed -i "s/\\\\[Service\\\\]/\\\\0\\\\nEnvironment=\"TEDGE_MQTT_BRIDGE_BUILT_IN=false\"/" /lib/systemd/system/tedge-mapper-aws.service Execute Command sudo systemctl daemon-reload Execute Command sudo systemctl start tedge-mapper-aws.service Execute Command sudo systemctl start tedge-watchdog.service diff --git a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-az.robot b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-az.robot index 64746a7b84c..bd88bad473b 100644 --- a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-az.robot +++ b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-az.robot @@ -16,6 +16,7 @@ Stop tedge-mapper-az Update the service file Execute Command cmd=sudo sed -i '10iWatchdogSec=30' /lib/systemd/system/tedge-mapper-az.service + Execute Command cmd=sudo sed -i "s/\\\\[Service\\\\]/\\\\0\\\\nEnvironment=\"TEDGE_MQTT_BRIDGE_BUILT_IN=false\"/" /lib/systemd/system/tedge-mapper-az.service Reload systemd files Execute Command sudo systemctl daemon-reload diff --git a/tests/RobotFramework/tests/aws/aws_telemetry.robot b/tests/RobotFramework/tests/aws/aws_telemetry.robot index d0f1b57a142..2bd048db8f2 100644 --- a/tests/RobotFramework/tests/aws/aws_telemetry.robot +++ b/tests/RobotFramework/tests/aws/aws_telemetry.robot @@ -62,6 +62,7 @@ Publish health status message for main device service *** Keywords *** Custom Setup Setup + Execute Command tedge config set mqtt.bridge.built_in false Execute Command sudo systemctl start tedge-mapper-aws.service ThinEdgeIO.Service Health Status Should Be Up tedge-mapper-aws diff --git a/tests/RobotFramework/tests/azure/azure_telemetry.robot b/tests/RobotFramework/tests/azure/azure_telemetry.robot index 14d3bfac449..5560933b11f 100644 --- a/tests/RobotFramework/tests/azure/azure_telemetry.robot +++ b/tests/RobotFramework/tests/azure/azure_telemetry.robot @@ -74,6 +74,7 @@ Publish health status message for main device service *** Keywords *** Custom Setup Setup + Execute Command tedge config set mqtt.bridge.built_in false Execute Command sudo systemctl restart tedge-mapper-az.service ThinEdgeIO.Service Health Status Should Be Up tedge-mapper-az diff --git a/tests/RobotFramework/tests/cumulocity/bridge_config/local_cleansession.robot b/tests/RobotFramework/tests/cumulocity/bridge_config/local_cleansession.robot index 471f25bdfa2..4c485d0b012 100644 --- a/tests/RobotFramework/tests/cumulocity/bridge_config/local_cleansession.robot +++ b/tests/RobotFramework/tests/cumulocity/bridge_config/local_cleansession.robot @@ -51,5 +51,7 @@ Suite Setup ${DEVICE_SN}= Setup Set Suite Variable ${DEVICE_SN} + Execute Command tedge config set mqtt.bridge.built_in false + # Print which mosquitto version is being used Execute Command mosquitto --help | head -1 diff --git a/tests/RobotFramework/tests/cumulocity/custom_operation/custom_operation_timeout/custom_operation.robot b/tests/RobotFramework/tests/cumulocity/custom_operation/custom_operation_timeout/custom_operation.robot index 43f6648c029..127eed84ba0 100644 --- a/tests/RobotFramework/tests/cumulocity/custom_operation/custom_operation_timeout/custom_operation.robot +++ b/tests/RobotFramework/tests/cumulocity/custom_operation/custom_operation_timeout/custom_operation.robot @@ -30,6 +30,4 @@ Custom Setup Set Suite Variable $DEVICE_SN Device Should Exist ${DEVICE_SN} ThinEdgeIO.Transfer To Device ${CURDIR}/customop_handler.* /etc/tedge/operations/ - ThinEdgeIO.Restart Service tedge-mapper-c8y - ThinEdgeIO.Disconnect Then Connect Mapper c8y - \ No newline at end of file + diff --git a/tests/RobotFramework/tests/cumulocity/jwt/jwt_request.robot b/tests/RobotFramework/tests/cumulocity/jwt/jwt_request.robot index 98aa2c37c2b..6a03404a21a 100644 --- a/tests/RobotFramework/tests/cumulocity/jwt/jwt_request.robot +++ b/tests/RobotFramework/tests/cumulocity/jwt/jwt_request.robot @@ -19,7 +19,7 @@ Custom Setup ${DEVICE_SN}= Setup Set Suite Variable $DEVICE_SN Device Should Exist ${DEVICE_SN} - Stop Service tedge-mapper-c8y - Stop Service tedge-agent - Should Have MQTT Messages te/device/main/service/mosquitto-c8y-bridge/status/health + Execute Command tedge config set mqtt.bridge.built_in true + Execute Command tedge reconnect c8y + Should Have MQTT Messages te/device/main/service/tedge-mapper-bridge-c8y/status/health Sleep 1s wait just in case that the server responds to already sent messages diff --git a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot index 4b9016be536..cd08c8142c5 100644 --- a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot +++ b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot @@ -16,11 +16,15 @@ Thin-edge devices support sending simple measurements ${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=ThinEdgeMeasurement value=temperature series=temperature Log ${measurements} +Built-in bridge reports health status + Service Health Status Should Be Up tedge-mapper-bridge-custom-c8y-prefix + Bridge stops if mapper stops running Execute Command tedge mqtt pub ${C8Y_TOPIC_PREFIX}/s/us '200,CustomMeasurement,temperature,25' ${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature Log ${measurements} Execute Command systemctl stop tedge-mapper-c8y + Service Health Status Should Be Down tedge-mapper-bridge-custom-c8y-prefix Execute Command tedge mqtt pub ${C8Y_TOPIC_PREFIX}/s/us '200,CustomMeasurement,temperature,25' ${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature Log ${measurements} @@ -272,9 +276,7 @@ Custom Setup Device Should Exist ${DEVICE_SN} ThinEdgeIO.Execute Command tedge config set mqtt.bridge.built_in true ThinEdgeIO.Execute Command tedge config set c8y.bridge.topic_prefix custom-c8y-prefix - File Should Exist /etc/tedge/mosquitto-conf/c8y-bridge.conf ThinEdgeIO.Execute Command tedge reconnect c8y - File Should Not Exist /etc/tedge/mosquitto-conf/c8y-bridge.conf Service Health Status Should Be Up tedge-mapper-c8y ${output}= Execute Command sudo tedge connect c8y --test Should Contain ${output} Connection check to c8y cloud is successful. diff --git a/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-aws.robot b/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-aws.robot index 29ccf760582..a31c4e8533f 100644 --- a/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-aws.robot +++ b/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-aws.robot @@ -28,6 +28,7 @@ Publish measurements to unsubscribed topic Custom Setup Setup Execute Command sudo tedge config set aws.topics "te/+/+/+/+/e/+" + Execute Command sudo tedge config set mqtt.bridge.built_in false Execute Command sudo systemctl start tedge-mapper-aws.service ThinEdgeIO.Service Health Status Should Be Up tedge-mapper-aws diff --git a/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-az.robot b/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-az.robot index bdf3323b8cd..73e8ec83a58 100644 --- a/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-az.robot +++ b/tests/RobotFramework/tests/mqtt/custom_sub_topics_tedge-mapper-az.robot @@ -25,6 +25,7 @@ Publish measurements to unsubscribed topic Custom Setup Setup Execute Command sudo tedge config set az.topics te/+/+/+/+/e/+ + Execute Command sudo tedge config set mqtt.bridge.built_in false Execute Command sudo systemctl restart tedge-mapper-az.service ThinEdgeIO.Service Health Status Should Be Up tedge-mapper-az diff --git a/tests/RobotFramework/tests/tedge_connect/tedge_connect_test.robot b/tests/RobotFramework/tests/tedge_connect/tedge_connect_test.robot index 33d01f1d8af..81ac891a9ee 100644 --- a/tests/RobotFramework/tests/tedge_connect/tedge_connect_test.robot +++ b/tests/RobotFramework/tests/tedge_connect/tedge_connect_test.robot @@ -20,7 +20,11 @@ Non-root users should be able to read the mosquitto configuration files #2154 [Tags] \#2154 Execute Command sudo tedge connect c8y || true Should Have File Permissions /etc/tedge/mosquitto-conf/tedge-mosquitto.conf 644 root:root + Execute Command sudo tedge config set mqtt.bridge.built_in false + Execute Command sudo tedge reconnect c8y + Should Have File Permissions /etc/tedge/mosquitto-conf/tedge-mosquitto.conf 644 root:root Should Have File Permissions /etc/tedge/mosquitto-conf/c8y-bridge.conf 644 root:root + Execute Command sudo tedge config set mqtt.bridge.built_in true # Reset things after running the test tedge_connect_test_negative