From 7e81021a2cb6c6da3129635ac2dfbf2866dbb699 Mon Sep 17 00:00:00 2001 From: Rina Fujino Date: Sat, 24 Feb 2024 02:32:04 +0900 Subject: [PATCH] c8y-mapper sends software list to advanced software management endpoint Since Cumulocity 10.14, it is recommended to send software list to the advanced software management endpoint. To trigger this, first c8y-mapper sends c8y_SupportedSoftwareTypes to inventory via JSON over MQTT. Then, post/put software list to their HTTP endpoint. Signed-off-by: Rina Fujino --- crates/core/c8y_api/src/http_proxy.rs | 18 -- crates/core/c8y_api/src/json_c8y.rs | 172 ----------------- .../src/smartrest/smartrest_serializer.rs | 175 +++++++++++++++++- crates/extensions/c8y_http_proxy/src/actor.rs | 39 ---- .../extensions/c8y_http_proxy/src/handle.rs | 19 -- .../extensions/c8y_http_proxy/src/messages.rs | 9 +- crates/extensions/c8y_http_proxy/src/tests.rs | 148 +-------------- .../c8y_mapper_ext/src/converter.rs | 42 +++-- .../c8y_mapper_ext/src/inventory.rs | 16 +- crates/extensions/c8y_mapper_ext/src/tests.rs | 50 ++++- 10 files changed, 258 insertions(+), 430 deletions(-) diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 3a87f4e4cb3..11adca54ab6 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -59,13 +59,6 @@ impl C8yEndPoint { url_get_id } - pub fn get_url_for_sw_list(&self, internal_id: String) -> String { - let mut url_update_swlist = self.get_base_url(); - url_update_swlist.push_str("/inventory/managedObjects/"); - url_update_swlist.push_str(&internal_id); - url_update_swlist - } - pub fn get_url_for_internal_id(&self, device_id: &str) -> String { let mut url_get_id = self.get_base_url(); url_get_id.push_str("/identity/externalIds/c8y_Serial/"); @@ -199,17 +192,6 @@ mod tests { ); } - #[test] - fn get_url_for_sw_list_returns_correct_address() { - let mut c8y = C8yEndPoint::new("test_host", "test_device"); - c8y.devices_internal_id - .insert("test_device".to_string(), "12345".to_string()); - let internal_id = c8y.get_internal_id("test_device".to_string()).unwrap(); - let res = c8y.get_url_for_sw_list(internal_id); - - assert_eq!(res, "https://test_host/inventory/managedObjects/12345"); - } - #[test_case("http://aaa.test.com")] #[test_case("https://aaa.test.com")] #[test_case("ftp://aaa.test.com")] diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 09073ee506a..988e2526533 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -1,4 +1,3 @@ -use download::DownloadInfo; use serde::Deserialize; use serde::Serialize; use serde_json::Value; @@ -9,13 +8,10 @@ use tedge_api::alarm::ThinEdgeAlarmData; use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityType; use tedge_api::event::ThinEdgeEvent; -use tedge_api::messages::SoftwareListCommand; use tedge_api::EntityStore; use tedge_api::Jsonify; -use tedge_api::SoftwareModule; use time::OffsetDateTime; -const EMPTY_STRING: &str = ""; const DEFAULT_ALARM_SEVERITY: AlarmSeverity = AlarmSeverity::Minor; const DEFAULT_ALARM_TYPE: &str = "ThinEdgeAlarm"; @@ -70,60 +66,6 @@ impl InternalIdResponse { } } -#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct C8ySoftwareModuleItem { - pub name: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub version: Option, - pub software_type: String, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(flatten)] - pub url: Option, -} - -impl<'a> Jsonify<'a> for C8ySoftwareModuleItem {} - -impl From for C8ySoftwareModuleItem { - fn from(module: SoftwareModule) -> Self { - let url = if module.url.is_none() { - Some(EMPTY_STRING.into()) - } else { - module.url - }; - - Self { - name: module.name, - version: module.version, - software_type: module.module_type.unwrap_or(SoftwareModule::default_type()), - url, - } - } -} - -#[derive(Debug, Serialize, Eq, PartialEq, Default)] -#[serde(rename_all = "camelCase")] -pub struct C8yUpdateSoftwareListResponse { - #[serde(rename = "c8y_SoftwareList")] - c8y_software_list: Option>, -} - -impl<'a> Jsonify<'a> for C8yUpdateSoftwareListResponse {} - -impl From<&SoftwareListCommand> for C8yUpdateSoftwareListResponse { - fn from(list: &SoftwareListCommand) -> Self { - let mut new_list: Vec = Vec::new(); - list.modules().into_iter().for_each(|software_module| { - let c8y_software_module: C8ySoftwareModuleItem = software_module.into(); - new_list.push(c8y_software_module); - }); - - Self { - c8y_software_list: Some(new_list), - } - } -} - impl From for C8yCreateEvent { fn from(event: ThinEdgeEvent) -> Self { let mut extras = HashMap::new(); @@ -370,7 +312,6 @@ mod tests { use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::entity_store::InvalidExternalIdError; use tedge_api::event::ThinEdgeEventData; - use tedge_api::messages::SoftwareListCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use test_case::test_case; @@ -378,119 +319,6 @@ mod tests { use super::*; - #[test] - fn from_software_module_to_c8y_software_module_item() { - let software_module = SoftwareModule { - module_type: Some("a".into()), - name: "b".into(), - version: Some("c".into()), - url: Some("".into()), - file_path: None, - }; - - let expected_c8y_item = C8ySoftwareModuleItem { - name: "b".into(), - version: Some("c".into()), - software_type: "a".to_string(), - url: Some("".into()), - }; - - let converted: C8ySoftwareModuleItem = software_module.into(); - - assert_eq!(converted, expected_c8y_item); - } - - #[test] - fn from_thin_edge_json_to_c8y_set_software_list() { - let input_json = r#"{ - "id":"1", - "status":"successful", - "currentSoftwareList":[ - {"type":"debian", "modules":[ - {"name":"a"}, - {"name":"b","version":"1.0"}, - {"name":"c","url":"https://foobar.io/c.deb"}, - {"name":"d","version":"beta","url":"https://foobar.io/d.deb"} - ]}, - {"type":"apama","modules":[ - {"name":"m","url":"https://foobar.io/m.epl"} - ]} - ]}"#; - - let command = SoftwareListCommand { - target: EntityTopicId::default_main_device(), - cmd_id: "1".to_string(), - payload: SoftwareListCommandPayload::from_json(input_json).unwrap(), - }; - - let c8y_software_list: C8yUpdateSoftwareListResponse = (&command).into(); - - let expected_struct = C8yUpdateSoftwareListResponse { - c8y_software_list: Some(vec![ - C8ySoftwareModuleItem { - name: "a".into(), - version: None, - software_type: "debian".to_string(), - url: Some("".into()), - }, - C8ySoftwareModuleItem { - name: "b".into(), - version: Some("1.0".into()), - software_type: "debian".to_string(), - url: Some("".into()), - }, - C8ySoftwareModuleItem { - name: "c".into(), - version: None, - software_type: "debian".to_string(), - url: Some("https://foobar.io/c.deb".into()), - }, - C8ySoftwareModuleItem { - name: "d".into(), - version: Some("beta".into()), - software_type: "debian".to_string(), - url: Some("https://foobar.io/d.deb".into()), - }, - C8ySoftwareModuleItem { - name: "m".into(), - version: None, - software_type: "apama".to_string(), - url: Some("https://foobar.io/m.epl".into()), - }, - ]), - }; - - let expected_json = r#"{"c8y_SoftwareList":[{"name":"a","softwareType":"debian","url":""},{"name":"b","version":"1.0","softwareType":"debian","url":""},{"name":"c","softwareType":"debian","url":"https://foobar.io/c.deb"},{"name":"d","version":"beta","softwareType":"debian","url":"https://foobar.io/d.deb"},{"name":"m","softwareType":"apama","url":"https://foobar.io/m.epl"}]}"#; - - assert_eq!(c8y_software_list, expected_struct); - assert_eq!(c8y_software_list.to_json(), expected_json); - } - - #[test] - fn empty_to_c8y_set_software_list() { - let input_json = r#"{ - "id":"1", - "status":"successful", - "currentSoftwareList":[] - }"#; - - let command = &SoftwareListCommand { - target: EntityTopicId::default_main_device(), - cmd_id: "1".to_string(), - payload: SoftwareListCommandPayload::from_json(input_json).unwrap(), - }; - - let c8y_software_list: C8yUpdateSoftwareListResponse = command.into(); - - let expected_struct = C8yUpdateSoftwareListResponse { - c8y_software_list: Some(vec![]), - }; - let expected_json = r#"{"c8y_SoftwareList":[]}"#; - - assert_eq!(c8y_software_list, expected_struct); - assert_eq!(c8y_software_list.to_json(), expected_json); - } - #[test] fn get_id_from_c8y_response() { let managed_object = C8yManagedObject { id: "12345".into() }; diff --git a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs index f36bda2a382..f6cdbdeed76 100644 --- a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs +++ b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs @@ -4,9 +4,10 @@ use crate::smartrest::topic::C8yTopic; use csv::StringRecord; use mqtt_channel::Message; use serde::ser::SerializeSeq; -use serde::Deserialize; use serde::Serialize; use serde::Serializer; +use tedge_api::SoftwareListCommand; +use tedge_api::SoftwareModule; use tracing::warn; pub type SmartRest = String; @@ -104,11 +105,99 @@ pub fn declare_supported_operations(ops: &[&str]) -> String { format!("114,{}", fields_to_csv_string(ops)) } -#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct SmartRestSoftwareModuleItem { - pub software: String, - pub version: Option, - pub url: Option, + pub name: String, + pub version: String, + pub software_type: String, + pub url: String, +} + +impl From for SmartRestSoftwareModuleItem { + fn from(module: SoftwareModule) -> Self { + let url = match module.url { + None => "".to_string(), + Some(download_info) => download_info.url, + }; + + Self { + name: module.name, + version: module.version.unwrap_or_default(), + software_type: module.module_type.unwrap_or(SoftwareModule::default_type()), + url, + } + } +} + +pub enum AdvancedSoftwareList { + Set(Vec), + Append(Vec), +} + +impl AdvancedSoftwareList { + pub fn to_smartrest_payload(self) -> String { + let vec = match self { + AdvancedSoftwareList::Set(items) => Self::create_software_list("140", items), + AdvancedSoftwareList::Append(items) => Self::create_software_list("141", items), + }; + let list: Vec<&str> = vec.iter().map(std::ops::Deref::deref).collect(); + + fields_to_csv_string(list.as_slice()) + } + + fn create_software_list(id: &str, items: Vec) -> Vec { + let mut vec = vec![id.to_string()]; + + if items.is_empty() { + vec.append(&mut vec![ + "".to_string(), + "".to_string(), + "".to_string(), + "".to_string(), + ]); + } else { + for item in items { + vec.push(item.name); + vec.push(item.version); + vec.push(item.software_type); + vec.push(item.url); + } + } + vec + } +} + +pub fn get_advanced_software_list_payloads( + software_list_cmd: &SoftwareListCommand, + chunk_size: usize, +) -> Vec { + let mut messages: Vec = Vec::new(); + + if software_list_cmd.modules().is_empty() { + messages.push(AdvancedSoftwareList::Set(vec![]).to_smartrest_payload()); + return messages; + } + + let mut items: Vec = Vec::new(); + software_list_cmd + .modules() + .into_iter() + .for_each(|software_module| { + let c8y_software_module: SmartRestSoftwareModuleItem = software_module.into(); + items.push(c8y_software_module); + }); + + let mut first = true; + for chunk in items.chunks(chunk_size) { + if first { + messages.push(AdvancedSoftwareList::Set(chunk.to_vec()).to_smartrest_payload()); + first = false; + } else { + messages.push(AdvancedSoftwareList::Append(chunk.to_vec()).to_smartrest_payload()); + } + } + + messages } /// A supported operation of the thin-edge device, used in status updates via SmartREST @@ -229,6 +318,9 @@ pub trait OperationStatusMessage { #[cfg(test)] mod tests { use super::*; + use tedge_api::messages::SoftwareListCommandPayload; + use tedge_api::mqtt_topics::EntityTopicId; + use tedge_api::Jsonify; #[test] fn serialize_smartrest_supported_operations() { @@ -350,4 +442,77 @@ mod tests { let smartrest = fail_operation(CumulocitySupportedOperations::C8ySoftwareUpdate, ""); assert_eq!(smartrest, "502,c8y_SoftwareUpdate,"); } + + #[test] + fn from_software_module_to_smartrest_software_module_item() { + let software_module = SoftwareModule { + module_type: Some("a".into()), + name: "b".into(), + version: Some("c".into()), + url: Some("".into()), + file_path: None, + }; + + let expected_c8y_item = SmartRestSoftwareModuleItem { + name: "b".into(), + version: "c".into(), + software_type: "a".to_string(), + url: "".into(), + }; + + let converted: SmartRestSoftwareModuleItem = software_module.into(); + assert_eq!(converted, expected_c8y_item); + } + + #[test] + fn from_thin_edge_json_to_advanced_software_list() { + let input_json = r#"{ + "id":"1", + "status":"successful", + "currentSoftwareList":[ + {"type":"debian", "modules":[ + {"name":"a"}, + {"name":"b","version":"1.0"}, + {"name":"c","url":"https://foobar.io/c.deb"}, + {"name":"d","version":"beta","url":"https://foobar.io/d.deb"} + ]}, + {"type":"apama","modules":[ + {"name":"m","url":"https://foobar.io/m.epl"} + ]} + ]}"#; + + let command = SoftwareListCommand { + target: EntityTopicId::default_main_device(), + cmd_id: "1".to_string(), + payload: SoftwareListCommandPayload::from_json(input_json).unwrap(), + }; + + let advanced_sw_list = get_advanced_software_list_payloads(&command, 2); + + assert_eq!(advanced_sw_list[0], "140,a,,debian,,b,1.0,debian,"); + assert_eq!( + advanced_sw_list[1], + "141,c,,debian,https://foobar.io/c.deb,d,beta,debian,https://foobar.io/d.deb" + ); + assert_eq!(advanced_sw_list[2], "141,m,,apama,https://foobar.io/m.epl"); + } + + #[test] + fn empty_to_advanced_list() { + let input_json = r#"{ + "id":"1", + "status":"successful", + "currentSoftwareList":[] + }"#; + + let command = &SoftwareListCommand { + target: EntityTopicId::default_main_device(), + cmd_id: "1".to_string(), + payload: SoftwareListCommandPayload::from_json(input_json).unwrap(), + }; + + let advanced_sw_list = get_advanced_software_list_payloads(command, 2); + dbg!(&advanced_sw_list); + assert_eq!(advanced_sw_list[0], "140,,,,"); + } } diff --git a/crates/extensions/c8y_http_proxy/src/actor.rs b/crates/extensions/c8y_http_proxy/src/actor.rs index e20a62e24fc..a9ddf346700 100644 --- a/crates/extensions/c8y_http_proxy/src/actor.rs +++ b/crates/extensions/c8y_http_proxy/src/actor.rs @@ -8,7 +8,6 @@ use crate::messages::C8YRestResult; use crate::messages::CreateEvent; use crate::messages::DownloadFile; use crate::messages::EventId; -use crate::messages::SoftwareListResponse; use crate::messages::Unit; use crate::messages::UploadFile; use crate::messages::UploadLogBinary; @@ -104,11 +103,6 @@ impl Actor for C8YHttpProxyActor { .await .map(|response| response.into()), - C8YRestRequest::SoftwareListResponse(request) => self - .send_software_list_http(request) - .await - .map(|response| response.into()), - C8YRestRequest::UploadLogBinary(request) => self .upload_log_binary(request) .await @@ -337,39 +331,6 @@ impl C8YHttpProxyActor { .await } - async fn send_software_list_http( - &mut self, - software_list: SoftwareListResponse, - ) -> Result { - let device_id = software_list.device_id; - - // Get and set child device internal id - if device_id.ne(&self.end_point.device_id) - && self.end_point.get_internal_id(device_id.clone()).is_err() - { - self.get_and_set_internal_id(device_id.clone()).await?; - } - - let build_request = |end_point: &C8yEndPoint| { - let internal_id = end_point - .get_internal_id(device_id.clone()) - .map_err(|e| C8YRestError::CustomError(e.to_string())); - let url = internal_id.map(|id| end_point.get_url_for_sw_list(id)); - async { - Ok::<_, C8YRestError>( - HttpRequestBuilder::put(url?) - .header("Accept", "application/json") - .header("Content-Type", "application/json") - .json(&software_list.c8y_software_list), - ) - } - }; - - let http_result = self.execute(device_id.clone(), build_request).await?; - http_result.error_for_status()?; - Ok(()) - } - async fn upload_log_binary( &mut self, request: UploadLogBinary, diff --git a/crates/extensions/c8y_http_proxy/src/handle.rs b/crates/extensions/c8y_http_proxy/src/handle.rs index e1d7c00ae34..4d69be1b642 100644 --- a/crates/extensions/c8y_http_proxy/src/handle.rs +++ b/crates/extensions/c8y_http_proxy/src/handle.rs @@ -5,10 +5,8 @@ use crate::messages::C8YRestResult; use crate::messages::CreateEvent; use crate::messages::GetFreshJwtToken; use crate::messages::GetJwtToken; -use crate::messages::SoftwareListResponse; use crate::messages::UploadFile; use crate::messages::UploadLogBinary; -use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use std::path::Path; use std::path::PathBuf; use tedge_actors::ClientMessageBox; @@ -54,23 +52,6 @@ impl C8YHttpProxy { } } - pub async fn send_software_list_http( - &mut self, - c8y_software_list: C8yUpdateSoftwareListResponse, - device_id: String, - ) -> Result<(), C8YRestError> { - let request: C8YRestRequest = SoftwareListResponse { - c8y_software_list, - device_id, - } - .into(); - - match self.c8y.await_response(request).await? { - Ok(C8YRestResponse::Unit(_)) => Ok(()), - unexpected => Err(unexpected.into()), - } - } - pub async fn upload_log_binary( &mut self, log_type: &str, diff --git a/crates/extensions/c8y_http_proxy/src/messages.rs b/crates/extensions/c8y_http_proxy/src/messages.rs index 061674956d2..f2383f7e21d 100644 --- a/crates/extensions/c8y_http_proxy/src/messages.rs +++ b/crates/extensions/c8y_http_proxy/src/messages.rs @@ -1,4 +1,3 @@ -use c8y_api::json_c8y::*; use c8y_api::smartrest::error::SMCumulocityMapperError; use std::collections::HashMap; use std::path::PathBuf; @@ -7,7 +6,7 @@ use tedge_actors::ChannelError; use tedge_http_ext::HttpError; use tedge_utils::file::PermissionEntry; -fan_in_message_type!(C8YRestRequest[GetJwtToken, GetFreshJwtToken, CreateEvent, SoftwareListResponse, UploadLogBinary, UploadFile, DownloadFile]: Debug, PartialEq, Eq); +fan_in_message_type!(C8YRestRequest[GetJwtToken, GetFreshJwtToken, CreateEvent, UploadLogBinary, UploadFile, DownloadFile]: Debug, PartialEq, Eq); //HIPPO Rename EventId to String as there could be many other String responses as well and this macro doesn't allow another String variant fan_in_message_type!(C8YRestResponse[EventId, Url, Unit]: Debug); @@ -66,12 +65,6 @@ pub struct CreateEvent { pub device_id: String, } -#[derive(Debug, PartialEq, Eq)] -pub struct SoftwareListResponse { - pub c8y_software_list: C8yUpdateSoftwareListResponse, - pub device_id: String, -} - #[derive(Debug, PartialEq, Eq)] pub struct UploadLogBinary { pub log_type: String, diff --git a/crates/extensions/c8y_http_proxy/src/tests.rs b/crates/extensions/c8y_http_proxy/src/tests.rs index 40991ca99c3..1cec9a3e291 100644 --- a/crates/extensions/c8y_http_proxy/src/tests.rs +++ b/crates/extensions/c8y_http_proxy/src/tests.rs @@ -7,7 +7,6 @@ use crate::C8YHttpConfig; use crate::C8YHttpProxyBuilder; use async_trait::async_trait; use c8y_api::json_c8y::C8yEventResponse; -use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::json_c8y::InternalIdResponse; use http::StatusCode; use mockito::Matcher; @@ -160,7 +159,7 @@ async fn retry_get_internal_id_when_not_found() { let tmp_dir = "/tmp"; let child_device_id = "child-101"; - let (mut proxy, mut c8y) = spawn_c8y_http_proxy( + let (_proxy, mut c8y) = spawn_c8y_http_proxy( c8y_host.into(), main_device_id.into(), tmp_dir.into(), @@ -224,33 +223,7 @@ async fn retry_get_internal_id_when_not_found() { .build() .unwrap(); c8y.send(Ok(c8y_response)).await.unwrap(); - - // Then let the software_list update succeed - let c8y_software_list = C8yUpdateSoftwareListResponse::default(); - assert_recv( - &mut c8y, - Some( - HttpRequestBuilder::put(format!("https://{c8y_host}/inventory/managedObjects/200")) - .header("content-type", "application/json") - .header("accept", "application/json") - .bearer_auth(token) - .json(&c8y_software_list) - .build() - .unwrap(), - ), - ) - .await; - let c8y_response = HttpResponseBuilder::new().status(200).build().unwrap(); - c8y.send(Ok(c8y_response)).await.unwrap(); }); - - let res = proxy - .send_software_list_http( - C8yUpdateSoftwareListResponse::default(), - child_device_id.into(), - ) - .await; - assert!(res.is_ok(), "Expected software list request to succeed"); } #[tokio::test] @@ -261,7 +234,7 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { let tmp_dir = "/tmp"; let child_device_id = "child-101"; - let (mut proxy, mut c8y) = spawn_c8y_http_proxy( + let (_proxy, mut c8y) = spawn_c8y_http_proxy( c8y_host.into(), main_device_id.into(), tmp_dir.into(), @@ -308,15 +281,6 @@ async fn get_internal_id_retry_fails_after_exceeding_attempts_threshold() { c8y.send(Ok(c8y_response)).await.unwrap(); } }); - - // Fetch the software list so that it internally invokes get_internal_id - let res = proxy - .send_software_list_http( - C8yUpdateSoftwareListResponse::default(), - child_device_id.into(), - ) - .await; - assert!(res.is_err(), "Expected software list request to succeed"); } #[tokio::test] @@ -441,114 +405,6 @@ async fn retry_create_event_on_expired_jwt_with_mock() { assert_eq!(event_id, result.unwrap()); } -#[tokio::test] -async fn retry_software_list_once_with_fresh_internal_id() { - let c8y_host = "c8y.tenant.io"; - let device_id = "device-001"; - let token = "JWT token"; - let external_id = "external-device-001"; - let tmp_dir = "/tmp"; - - let (mut proxy, mut c8y) = - spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; - - // Even before any request is sent to the c8y_proxy - // the proxy requests over HTTP the internal device id. - let _init_request = HttpRequestBuilder::get(format!( - "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" - )) - .bearer_auth(token) - .build() - .unwrap(); - // skip the message - c8y.recv().await; - - // Cumulocity returns the internal device id - let c8y_response = HttpResponseBuilder::new() - .status(200) - .json(&InternalIdResponse::new(device_id, external_id)) - .build() - .unwrap(); - c8y.send(Ok(c8y_response)).await.unwrap(); - - // This internal id is then used by the proxy for subsequent requests. - // Create the software list and publish - let c8y_software_list = C8yUpdateSoftwareListResponse::default(); - - tokio::spawn(async move { - // NOTE: this is done in the background because this call awaits for the response. - proxy - .send_software_list_http(c8y_software_list, device_id.into()) - .await - }); - - let c8y_software_list = C8yUpdateSoftwareListResponse::default(); - // then the upload request received by c8y is related to the internal id - assert_recv( - &mut c8y, - Some( - HttpRequestBuilder::put(format!( - "https://{c8y_host}/inventory/managedObjects/{device_id}" - )) - .header("content-type", "application/json") - .header("accept", "application/json") - .bearer_auth(token) - .json(&c8y_software_list) - .build() - .unwrap(), - ), - ) - .await; - - // The software list upload fails because the device identified with internal id not found - let c8y_response = HttpResponseBuilder::new() - .status(404) - .json(&InternalIdResponse::new(device_id, external_id)) - .build() - .unwrap(); - c8y.send(Ok(c8y_response)).await.unwrap(); - - // Now the mapper gets a new internal id for the specific device id - assert_recv( - &mut c8y, - Some( - HttpRequestBuilder::get(format!( - "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" - )) - .bearer_auth(token) - .build() - .unwrap(), - ), - ) - .await; - - // Cumulocity returns the internal device id, after retrying with the fresh jwt token - let c8y_response = HttpResponseBuilder::new() - .status(200) - .json(&InternalIdResponse::new(device_id, external_id)) - .build() - .unwrap(); - c8y.send(Ok(c8y_response)).await.unwrap(); - - let c8y_software_list = C8yUpdateSoftwareListResponse::default(); - // then the upload request received by c8y is related to the internal id - assert_recv( - &mut c8y, - Some( - HttpRequestBuilder::put(format!( - "https://{c8y_host}/inventory/managedObjects/{device_id}" - )) - .bearer_auth(token) - .header("content-type", "application/json") - .header("accept", "application/json") - .json(&c8y_software_list) - .build() - .unwrap(), - ), - ) - .await; -} - #[tokio::test] async fn auto_retry_upload_log_binary_when_internal_id_expires() { let c8y_host = "c8y.tenant.io"; diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 9282fbf365d..4d0c9c7329a 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -14,7 +14,6 @@ use anyhow::anyhow; use anyhow::Context; use c8y_api::http_proxy::C8yEndPoint; use c8y_api::json_c8y::C8yCreateEvent; -use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::json_c8y_deserializer::C8yDeviceControlOperation; use c8y_api::json_c8y_deserializer::C8yDeviceControlTopic; use c8y_api::json_c8y_deserializer::C8yJsonOverMqttDeserializerError; @@ -35,6 +34,7 @@ use c8y_api::smartrest::operations::get_operations; use c8y_api::smartrest::operations::Operations; use c8y_api::smartrest::operations::ResultFormat; use c8y_api::smartrest::smartrest_serializer::fail_operation; +use c8y_api::smartrest::smartrest_serializer::get_advanced_software_list_payloads; use c8y_api::smartrest::smartrest_serializer::request_pending_operations; use c8y_api::smartrest::smartrest_serializer::set_operation_executing; use c8y_api::smartrest::smartrest_serializer::succeed_operation; @@ -52,6 +52,7 @@ use camino::Utf8Path; use logged_command::LoggedCommand; use plugin_sm::operation_logs::OperationLogs; use plugin_sm::operation_logs::OperationLogsError; +use serde_json::json; use serde_json::Map; use serde_json::Value; use service_monitor::convert_health_status_message; @@ -73,6 +74,7 @@ use tedge_api::event::error::ThinEdgeJsonDeserializerError; use tedge_api::event::ThinEdgeEvent; use tedge_api::messages::CommandStatus; use tedge_api::messages::RestartCommand; +use tedge_api::messages::SoftwareCommandMetadata; use tedge_api::messages::SoftwareListCommand; use tedge_api::messages::SoftwareUpdateCommand; use tedge_api::mqtt_topics::Channel; @@ -83,6 +85,7 @@ use tedge_api::mqtt_topics::OperationType; use tedge_api::pending_entity_store::PendingEntityData; use tedge_api::DownloadInfo; use tedge_api::EntityStore; +use tedge_api::Jsonify; use tedge_config::TEdgeConfigError; use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; @@ -107,6 +110,7 @@ const DEFAULT_EVENT_TYPE: &str = "ThinEdgeEvent"; const FORBIDDEN_ID_CHARS: [char; 3] = ['/', '+', '#']; const REQUESTER_NAME: &str = "c8y-mapper"; const EARLY_MESSAGE_BUFFER_SIZE: usize = 100; +const SOFTWARE_LIST_CHUNK_SIZE: usize = 100; #[derive(Debug)] pub struct MapperConfig { @@ -1041,7 +1045,8 @@ impl CumulocityConverter { match operation { OperationType::Restart => self.register_restart_operation(&source).await, OperationType::SoftwareList => { - self.register_software_list_operation(&source).await + self.register_software_list_operation(&source, message) + .await } OperationType::SoftwareUpdate => { self.register_software_update_operation(&source).await @@ -1443,10 +1448,15 @@ impl CumulocityConverter { async fn register_software_list_operation( &self, - _target: &EntityTopicId, + target: &EntityTopicId, + message: &Message, ) -> Result, ConversionError> { - // On c8y, "software list" is implied by "software update" - Ok(vec![]) + // Send c8y_SupportedSoftwareTypes, which is introduced in c8y >= 10.14 + let data = SoftwareCommandMetadata::from_json(message.payload_str()?)?; + let payload = json!({"c8y_SupportedSoftwareTypes": data.types}).to_string(); + let topic = self.get_inventory_update_topic(target)?; + + Ok(vec![Message::new(&topic, payload)]) } async fn register_software_update_operation( @@ -1542,16 +1552,20 @@ impl CumulocityConverter { match response.status() { CommandStatus::Successful => { - if let Some(device) = self.entity_store.get(target) { - let c8y_software_list: C8yUpdateSoftwareListResponse = (&response).into(); - self.http_proxy - .send_software_list_http( - c8y_software_list, - device.external_id.as_ref().to_string(), - ) - .await?; + let topic = self + .entity_store + .get(target) + .and_then(C8yTopic::smartrest_response_topic) + .ok_or_else(|| Error::UnknownEntity(target.to_string()))?; + let payloads = + get_advanced_software_list_payloads(&response, SOFTWARE_LIST_CHUNK_SIZE); + + let mut messages: Vec = Vec::new(); + for payload in payloads { + messages.push(Message::new(&topic, payload)) } - Ok(vec![response.clearing_message(&self.mqtt_schema)]) + messages.push(response.clearing_message(&self.mqtt_schema)); + Ok(messages) } CommandStatus::Failed { reason } => { diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs index d59c6cee556..463a9eeb000 100644 --- a/crates/extensions/c8y_mapper_ext/src/inventory.rs +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -113,10 +113,7 @@ impl CumulocityConverter { source: &EntityTopicId, fragment_value: JsonValue, ) -> Result { - let entity_external_id = self.entity_store.try_get(source)?.external_id.as_ref(); - let inventory_update_topic = Topic::new_unchecked(&format!( - "{INVENTORY_MANAGED_OBJECTS_TOPIC}/{entity_external_id}" - )); + let inventory_update_topic = self.get_inventory_update_topic(source)?; Ok(Message::new( &inventory_update_topic, @@ -171,6 +168,17 @@ impl CumulocityConverter { info!("Read the fragments from {file_path:?} file"); Ok(json) } + + /// Returns the JSON over MQTT inventory update topic + pub fn get_inventory_update_topic( + &self, + source: &EntityTopicId, + ) -> Result { + let entity_external_id = self.entity_store.try_get(source)?.external_id.as_ref(); + Ok(Topic::new_unchecked(&format!( + "{INVENTORY_MANAGED_OBJECTS_TOPIC}/{entity_external_id}" + ))) + } } #[cfg(test)] diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index a97c14b8c25..286df9d6e1d 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -283,6 +283,51 @@ async fn service_registration_mapping() { .await; } +#[tokio::test] +async fn mapper_publishes_advanced_software_list() { + // The test assures c8y mapper correctly receives software update request from JSON over MQTT + // and converts it to thin-edge json message published on `te/device/main///cmd/software_update/+`. + let cfg_dir = TempTedgeDir::new(); + let (mqtt, http, _fs, _timer, _ul, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; + spawn_dummy_c8y_http_proxy(http); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate software_list request + mqtt.send(MqttMessage::new( + &Topic::new_unchecked("te/device/main///cmd/software_list/c8y-mapper-1234"), + json!({ + "id":"1", + "status":"successful", + "currentSoftwareList":[ + {"type":"debian", "modules":[ + {"name":"a"}, + {"name":"b","version":"1.0"}, + {"name":"c","url":"https://foobar.io/c.deb"}, + {"name":"d","version":"beta","url":"https://foobar.io/d.deb"} + ]}, + {"type":"apama","modules":[ + {"name":"m","url":"https://foobar.io/m.epl"} + ]} + ]}) + .to_string(), + )) + .await + .expect("Send failed"); + + assert_received_contains_str( + &mut mqtt, + [ + ( + "c8y/s/us", + "140,a,,debian,,b,1.0,debian,,c,,debian,https://foobar.io/c.deb,d,beta,debian,https://foobar.io/d.deb,m,,apama,https://foobar.io/m.epl" + ) + ]) + .await; +} + #[tokio::test] async fn mapper_publishes_software_update_request() { // The test assures c8y mapper correctly receives software update request from JSON over MQTT @@ -2503,11 +2548,6 @@ pub(crate) fn spawn_dummy_c8y_http_proxy(mut http: FakeServerBox { - let _ = http - .send(Ok(c8y_http_proxy::messages::C8YRestResponse::Unit(()))) - .await; - } Some(C8YRestRequest::CreateEvent(_)) => { let _ = http .send(Ok(c8y_http_proxy::messages::C8YRestResponse::EventId(