From 106b84fd21aad714da8f660925d12f39f5ef8256 Mon Sep 17 00:00:00 2001 From: PradeepKiruvale Date: Wed, 26 Jul 2023 12:51:14 +0530 Subject: [PATCH] Retry the http requests with fresh jwt token, if the requests fail due to expired JWT token or internal id. (#2065) Signed-off-by: Pradeep Kumar K J Co-authored-by: Pradeep Kumar K J --- Cargo.lock | 17 +- crates/core/c8y_api/src/http_proxy.rs | 66 +++- crates/core/c8y_api/src/json_c8y.rs | 6 +- .../c8y_config_manager/src/tests.rs | 6 +- .../c8y_config_manager/src/upload.rs | 8 +- .../c8y_firmware_manager/src/config.rs | 2 +- crates/extensions/c8y_http_proxy/Cargo.toml | 1 + crates/extensions/c8y_http_proxy/src/actor.rs | 343 +++++++++++------- .../extensions/c8y_http_proxy/src/handle.rs | 17 +- .../extensions/c8y_http_proxy/src/messages.rs | 12 +- crates/extensions/c8y_http_proxy/src/tests.rs | 251 ++++++++++++- .../extensions/c8y_log_manager/src/actor.rs | 8 +- .../c8y_mapper_ext/src/converter.rs | 19 +- crates/extensions/c8y_mapper_ext/src/tests.rs | 2 +- 14 files changed, 571 insertions(+), 187 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8b4acc503a..6b4f879becb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -647,6 +647,7 @@ dependencies = [ "async-trait", "c8y_api", "download", + "http", "log", "mqtt_channel", "tedge_actors", @@ -1335,9 +1336,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" +checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" dependencies = [ "percent-encoding", ] @@ -1691,9 +1692,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -2402,9 +2403,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pharos" @@ -4301,9 +4302,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "url" -version = "2.3.1" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" dependencies = [ "form_urlencoded", "idna", diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 167e1a9910b..24bb4fdba7d 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -6,35 +6,47 @@ use mqtt_channel::StreamExt; use mqtt_channel::Topic; use mqtt_channel::TopicFilter; use reqwest::Url; +use std::collections::HashMap; use std::time::Duration; use tedge_config::mqtt_config::MqttConfigBuildError; use tedge_config::TEdgeConfig; use tracing::error; use tracing::info; +#[derive(thiserror::Error, Debug)] +pub enum C8yEndPointError { + #[error("Cumulocity internal id not found for the device: {0}")] + InternalIdNotFound(String), +} + /// Define a C8y endpoint #[derive(Debug)] pub struct C8yEndPoint { - pub c8y_host: String, + c8y_host: String, pub device_id: String, - pub c8y_internal_id: String, + pub token: Option, + devices_internal_id: HashMap, } impl C8yEndPoint { - pub fn new(c8y_host: &str, device_id: &str, c8y_internal_id: &str) -> C8yEndPoint { + pub fn new(c8y_host: &str, device_id: &str) -> C8yEndPoint { C8yEndPoint { c8y_host: c8y_host.into(), device_id: device_id.into(), - c8y_internal_id: c8y_internal_id.into(), + token: None, + devices_internal_id: HashMap::new(), } } - pub fn get_c8y_internal_id(&self) -> &str { - &self.c8y_internal_id + pub fn get_internal_id(&self, device_id: String) -> Result { + match self.devices_internal_id.get(&device_id) { + Some(internal_id) => Ok(internal_id.to_string()), + None => Err(C8yEndPointError::InternalIdNotFound(device_id.clone())), + } } - pub fn set_c8y_internal_id(&mut self, id: String) { - self.c8y_internal_id = id; + pub fn set_internal_id(&mut self, device_id: String, internal_id: String) { + self.devices_internal_id.insert(device_id, internal_id); } fn get_base_url(&self) -> String { @@ -47,18 +59,17 @@ impl C8yEndPoint { url_get_id } - pub fn get_url_for_sw_list(&self) -> String { + pub fn get_url_for_sw_list(&self, internal_id: String) -> String { let mut url_update_swlist = self.get_base_url(); url_update_swlist.push_str("/inventory/managedObjects/"); - url_update_swlist.push_str(&self.c8y_internal_id); - + url_update_swlist.push_str(&internal_id); url_update_swlist } - pub fn get_url_for_get_id(&self, device_id: Option<&str>) -> String { + pub fn get_url_for_internal_id(&self, device_id: String) -> String { let mut url_get_id = self.get_base_url(); url_get_id.push_str("/identity/externalIds/c8y_Serial/"); - url_get_id.push_str(device_id.unwrap_or(&self.device_id)); + url_get_id.push_str(device_id.as_str()); url_get_id } @@ -178,13 +189,14 @@ pub enum JwtError { #[cfg(test)] mod tests { + use super::*; use test_case::test_case; #[test] fn get_url_for_get_id_returns_correct_address() { - let c8y = C8yEndPoint::new("test_host", "test_device", "internal-id"); - let res = c8y.get_url_for_get_id(None); + let c8y = C8yEndPoint::new("test_host", "test_device"); + let res = c8y.get_url_for_internal_id("test_device".into()); assert_eq!( res, @@ -194,8 +206,11 @@ mod tests { #[test] fn get_url_for_sw_list_returns_correct_address() { - let c8y = C8yEndPoint::new("test_host", "test_device", "12345"); - let res = c8y.get_url_for_sw_list(); + let mut c8y = C8yEndPoint::new("test_host", "test_device"); + c8y.devices_internal_id + .insert("test_device".to_string(), "12345".to_string()); + let internal_id = c8y.get_internal_id("test_device".to_string()).unwrap(); + let res = c8y.get_url_for_sw_list(internal_id); assert_eq!(res, "https://test_host/inventory/managedObjects/12345"); } @@ -210,7 +225,7 @@ mod tests { #[test_case("https://t1124124.test.com/path/to/file.test")] #[test_case("https://t1124124.test.com/path/to/file")] fn url_is_my_tenant_correct_urls(url: &str) { - let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id"); + let c8y = C8yEndPoint::new("test.test.com", "test_device"); assert!(c8y.url_is_in_my_tenant_domain(url)); } @@ -220,7 +235,20 @@ mod tests { #[test_case("http://test.com:123456")] #[test_case("http://test.com::12345")] fn url_is_my_tenant_incorrect_urls(url: &str) { - let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id"); + let c8y = C8yEndPoint::new("test.test.com", "test_device"); assert!(!c8y.url_is_in_my_tenant_domain(url)); } + + #[test] + fn check_non_cached_internal_id_for_a_device() { + let mut c8y = C8yEndPoint::new("test_host", "test_device"); + c8y.devices_internal_id + .insert("test_device".to_string(), "12345".to_string()); + let end_pt_err = c8y.get_internal_id("test_child".into()).unwrap_err(); + + assert_eq!( + end_pt_err.to_string(), + "Cumulocity internal id not found for the device: test_child".to_string() + ); + } } diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 47b492a9243..78b1a2ec45d 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -17,7 +17,7 @@ use time::OffsetDateTime; const EMPTY_STRING: &str = ""; -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "camelCase")] pub struct C8yCreateEvent { #[serde(skip_serializing_if = "Option::is_none")] @@ -42,7 +42,7 @@ pub struct C8yEventResponse { pub id: String, } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)] #[serde(rename_all = "camelCase")] pub struct C8yManagedObject { pub id: String, @@ -98,7 +98,7 @@ impl From for C8ySoftwareModuleItem { } } -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Default)] #[serde(rename_all = "camelCase")] pub struct C8yUpdateSoftwareListResponse { #[serde(rename = "c8y_SoftwareList")] diff --git a/crates/extensions/c8y_config_manager/src/tests.rs b/crates/extensions/c8y_config_manager/src/tests.rs index 14bea97aec3..c855ca33828 100644 --- a/crates/extensions/c8y_config_manager/src/tests.rs +++ b/crates/extensions/c8y_config_manager/src/tests.rs @@ -105,7 +105,7 @@ async fn test_config_upload_tedge_device() -> Result<(), DynError> { .assert_received([UploadConfigFile { config_path: test_config_path.into(), config_type: test_config_type.to_string(), - child_device_id: None, + device_id, }]) .await; @@ -511,7 +511,7 @@ async fn test_child_device_successful_config_snapshot_response_mapping() -> Resu .join("config_snapshot") .join(test_config_type), config_type: test_config_type.into(), - child_device_id: Some(child_device_id.into()), + device_id: child_device_id.into(), }]) .await; @@ -585,7 +585,7 @@ async fn test_child_config_snapshot_successful_response_without_uploaded_file_ma .join("config_snapshot") .join(test_config_type), config_type: test_config_type.into(), - child_device_id: Some(child_device_id.into()), + device_id: child_device_id.into(), }]) .await; diff --git a/crates/extensions/c8y_config_manager/src/upload.rs b/crates/extensions/c8y_config_manager/src/upload.rs index c4d084b730a..e275459394b 100644 --- a/crates/extensions/c8y_config_manager/src/upload.rs +++ b/crates/extensions/c8y_config_manager/src/upload.rs @@ -90,7 +90,7 @@ impl ConfigUploadManager { self.upload_config_file( Path::new(config_file_path.as_str()), &config_upload_request.config_type, - None, + config_upload_request.device, message_box, ) .await @@ -334,7 +334,7 @@ impl ConfigUploadManager { .upload_config_file( Path::new(&uploaded_config_file_path), &config_response.get_config_type(), - Some(config_response.get_child_id()), + config_response.get_child_id(), message_box, ) .await?; @@ -357,12 +357,12 @@ impl ConfigUploadManager { &mut self, config_file_path: &Path, config_type: &str, - child_device_id: Option, + device_id: String, message_box: &mut ConfigManagerMessageBox, ) -> Result { let url = message_box .c8y_http_proxy - .upload_config_file(config_file_path, config_type, child_device_id) + .upload_config_file(config_file_path, config_type, device_id) .await?; Ok(url) } diff --git a/crates/extensions/c8y_firmware_manager/src/config.rs b/crates/extensions/c8y_firmware_manager/src/config.rs index d8ccf4ea055..8184b75e617 100644 --- a/crates/extensions/c8y_firmware_manager/src/config.rs +++ b/crates/extensions/c8y_firmware_manager/src/config.rs @@ -52,7 +52,7 @@ impl FirmwareManagerConfig { let firmware_update_response_topics = TopicFilter::new_unchecked(FIRMWARE_UPDATE_RESPONSE_TOPICS); - let c8y_end_point = C8yEndPoint::new(&c8y_url, &tedge_device_id, "not used"); + let c8y_end_point = C8yEndPoint::new(&c8y_url, &tedge_device_id); Self { tedge_device_id, diff --git a/crates/extensions/c8y_http_proxy/Cargo.toml b/crates/extensions/c8y_http_proxy/Cargo.toml index fbe663a5d31..63dd60a54c9 100644 --- a/crates/extensions/c8y_http_proxy/Cargo.toml +++ b/crates/extensions/c8y_http_proxy/Cargo.toml @@ -13,6 +13,7 @@ repository = { workspace = true } async-trait = "0.1" c8y_api = { path = "../../core/c8y_api" } download = { path = "../../common/download" } +http = "0.2" log = "0.4" mqtt_channel = { path = "../../common/mqtt_channel" } tedge_actors = { path = "../../core/tedge_actors" } diff --git a/crates/extensions/c8y_http_proxy/src/actor.rs b/crates/extensions/c8y_http_proxy/src/actor.rs index 08fad967c6a..22869b6ac05 100644 --- a/crates/extensions/c8y_http_proxy/src/actor.rs +++ b/crates/extensions/c8y_http_proxy/src/actor.rs @@ -7,6 +7,7 @@ use crate::messages::C8YRestRequest; use crate::messages::C8YRestResult; use crate::messages::DownloadFile; use crate::messages::EventId; +use crate::messages::SoftwareListResponse; use crate::messages::Unit; use crate::messages::UploadConfigFile; use crate::messages::UploadLogBinary; @@ -16,13 +17,13 @@ use c8y_api::http_proxy::C8yEndPoint; use c8y_api::json_c8y::C8yCreateEvent; use c8y_api::json_c8y::C8yEventResponse; use c8y_api::json_c8y::C8yManagedObject; -use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::json_c8y::InternalIdResponse; use c8y_api::smartrest::error::SMCumulocityMapperError; use c8y_api::OffsetDateTime; use download::Auth; use download::DownloadInfo; use download::Downloader; +use http::status::StatusCode; use log::debug; use log::error; use log::info; @@ -45,7 +46,6 @@ const RETRY_TIMEOUT_SECS: u64 = 20; pub struct C8YHttpProxyActor { end_point: C8yEndPoint, - child_devices: HashMap, peers: C8YHttpProxyMessageBox, } @@ -80,16 +80,17 @@ impl Actor for C8YHttpProxyActor { while let Some((client_id, request)) = self.peers.clients.recv().await { let result = match request { - C8YRestRequest::GetJwtToken(_) => { - self.get_jwt_token().await.map(|response| response.into()) - } + C8YRestRequest::GetJwtToken(_) => self + .get_and_set_jwt_token() + .await + .map(|response| response.into()), C8YRestRequest::C8yCreateEvent(request) => self .create_event(request) .await .map(|response| response.into()), - C8YRestRequest::C8yUpdateSoftwareListResponse(request) => self + C8YRestRequest::SoftwareListResponse(request) => self .send_software_list_http(request) .await .map(|response| response.into()), @@ -98,7 +99,6 @@ impl Actor for C8YHttpProxyActor { .upload_log_binary(request) .await .map(|response| response.into()), - C8YRestRequest::UploadConfigFile(request) => self .upload_config_file(request) .await @@ -117,20 +117,25 @@ impl Actor for C8YHttpProxyActor { impl C8YHttpProxyActor { pub fn new(config: C8YHttpConfig, message_box: C8YHttpProxyMessageBox) -> Self { - let unknown_internal_id = ""; - let end_point = C8yEndPoint::new(&config.c8y_host, &config.device_id, unknown_internal_id); - let child_devices = HashMap::default(); + let end_point = C8yEndPoint::new(&config.c8y_host, &config.device_id); C8YHttpProxyActor { end_point, - child_devices, peers: message_box, } } async fn init(&mut self) -> Result<(), C8YConnectionError> { info!(target: self.name(), "start initialisation"); - while self.end_point.get_c8y_internal_id().is_empty() { - if let Err(error) = self.try_get_and_set_internal_id().await { + + while self + .end_point + .get_internal_id(self.end_point.device_id.clone()) + .is_err() + { + if let Err(error) = self + .get_and_set_internal_id(self.end_point.device_id.clone()) + .await + { error!( "An error occurred while retrieving internal Id, operation will retry in {} seconds\n Error: {:?}", RETRY_TIMEOUT_SECS, error @@ -162,80 +167,142 @@ impl C8YHttpProxyActor { Ok(()) } - async fn try_get_and_set_internal_id(&mut self) -> Result<(), C8YRestError> { - let internal_id = self.try_get_internal_id(None).await?; - self.end_point.set_c8y_internal_id(internal_id); + async fn get_and_set_internal_id(&mut self, device_id: String) -> Result<(), C8YRestError> { + let internal_id = self.try_get_internal_id(device_id.clone()).await?; + self.end_point.set_internal_id(device_id, internal_id); + Ok(()) } - async fn get_c8y_internal_child_id( - &mut self, - child_device_id: String, - ) -> Result { - if let Some(c8y_internal_id) = self.child_devices.get(&child_device_id) { - Ok(c8y_internal_id.clone()) - } else { - let c8y_internal_id = self.try_get_internal_id(Some(&child_device_id)).await?; - self.child_devices - .insert(child_device_id, c8y_internal_id.clone()); - Ok(c8y_internal_id) + async fn try_get_internal_id(&mut self, device_id: String) -> Result { + let url_get_id: String = self.end_point.get_url_for_internal_id(device_id); + if self.end_point.token.is_none() { + self.get_fresh_token().await?; } - } - - async fn try_get_internal_id( - &mut self, - device_id: Option<&str>, - ) -> Result { - let url_get_id = self.end_point.get_url_for_get_id(device_id); + let request = HttpRequestBuilder::get(&url_get_id) + .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .build()?; + let res = match self.peers.http.await_response(request).await? { + Ok(response) => match response.status() { + StatusCode::OK => Ok(Ok(response)), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { + self.get_fresh_token().await?; + let request = HttpRequestBuilder::get(&url_get_id) + .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .build()?; + Ok(self.peers.http.await_response(request).await?) + } + code => Err(C8YRestError::FromHttpError( + tedge_http_ext::HttpError::HttpStatusError(code), + )), + }, - let request_internal_id = HttpRequestBuilder::get(url_get_id); - let res = self.execute(request_internal_id).await?; - let res = res.error_for_status()?; + Err(e) => Err(C8YRestError::FromHttpError(e)), + }; + let res = res?.error_for_status()?; let internal_id_response: InternalIdResponse = res.json().await?; let internal_id = internal_id_response.id(); + Ok(internal_id) } async fn execute( &mut self, - request_builder: HttpRequestBuilder, + device_id: String, + build_request: impl Fn(&C8yEndPoint) -> Result, ) -> Result { - // Get a JWT token to authenticate the device - let request_builder = if let Ok(token) = self.peers.jwt.await_response(()).await? { - request_builder.bearer_auth(token) - } else { - return Err(C8YRestError::CustomError("JWT token not available".into())); - }; + let request_builder = build_request(&self.end_point); + let request = request_builder? + .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .build()?; + let resp = self.peers.http.await_response(request).await?; + match resp { + Ok(response) => match response.status() { + StatusCode::OK | StatusCode::CREATED => Ok(Ok(response)), + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { + self.try_request_with_fresh_token(build_request).await + } + StatusCode::NOT_FOUND => { + self.try_request_with_fresh_internal_id(device_id, build_request) + .await + } + code => Err(C8YRestError::FromHttpError( + tedge_http_ext::HttpError::HttpStatusError(code), + )), + }, - // TODO Add timeout - // TODO Manage 403 errors - let request = request_builder.build()?; + Err(e) => Err(C8YRestError::FromHttpError(e)), + } + } + + async fn get_fresh_token(&mut self) -> Result { + self.end_point.token = None; + self.get_and_set_jwt_token().await + } + + async fn try_request_with_fresh_token( + &mut self, + build_request: impl Fn(&C8yEndPoint) -> Result, + ) -> Result { + // get new token not the cached one + self.get_fresh_token().await?; + // build the request + let request_builder = build_request(&self.end_point); + let request = request_builder?.build()?; + // retry the request Ok(self.peers.http.await_response(request).await?) } - async fn create_event( + async fn try_request_with_fresh_internal_id( &mut self, - mut c8y_event: C8yCreateEvent, - ) -> Result { - if c8y_event.source.is_none() { - c8y_event.source = Some(C8yManagedObject { - id: self.end_point.get_c8y_internal_id().to_string(), - }); - } - self.send_event_internal(c8y_event).await + device_id: String, + build_request: impl Fn(&C8yEndPoint) -> Result, + ) -> Result { + // get new internal id not the cached one + self.get_and_set_internal_id(device_id).await?; + + let request_builder = build_request(&self.end_point); + let request = request_builder? + .bearer_auth(self.end_point.token.clone().unwrap_or_default()) + .build()?; + Ok(self.peers.http.await_response(request).await?) + } + + async fn create_event(&mut self, c8y_event: C8yCreateEvent) -> Result { + let create_event = |internal_id: String| -> C8yCreateEvent { + C8yCreateEvent { + source: Some(C8yManagedObject { id: internal_id }), + event_type: c8y_event.event_type.clone(), + time: c8y_event.time, + text: c8y_event.text.clone(), + extras: c8y_event.extras.clone(), + } + }; + self.send_event_internal( + c8y_event.source.clone().unwrap_or_default().id, + create_event, + ) + .await } async fn send_software_list_http( &mut self, - software_list: C8yUpdateSoftwareListResponse, + software_list: SoftwareListResponse, ) -> Result { - let url = self.end_point.get_url_for_sw_list(); - let req_builder = HttpRequestBuilder::put(url) - .header("Accept", "application/json") - .header("Content-Type", "application/json") - .json(&software_list); - let http_result = self.execute(req_builder).await?; + let device_id = software_list.device_id; + let build_request = |end_point: &C8yEndPoint| -> Result { + let internal_id = end_point + .get_internal_id(device_id.clone()) + .map_err(|e| C8YRestError::CustomError(e.to_string()))?; + let url = end_point.get_url_for_sw_list(internal_id); + Ok(HttpRequestBuilder::put(url) + .header("Accept", "application/json") + .header("Content-Type", "application/json") + .json(&software_list.c8y_software_list)) + }; + + let http_result = self.execute(device_id.clone(), build_request).await?; let _ = http_result.error_for_status()?; Ok(()) } @@ -244,26 +311,37 @@ impl C8YHttpProxyActor { &mut self, request: UploadLogBinary, ) -> Result { - let log_file_event = self - .create_event_request(request.log_type, None, None, request.child_device_id) + let device_id = request.device_id; + let create_event = |internal_id: String| -> C8yCreateEvent { + C8yCreateEvent { + source: Some(C8yManagedObject { id: internal_id }), + event_type: request.log_type.clone(), + time: OffsetDateTime::now_utc(), + text: request.log_type.clone(), + extras: HashMap::new(), + } + }; + let event_response_id = self + .send_event_internal(device_id.clone(), create_event) .await?; - let event_response_id = self.send_event_internal(log_file_event).await?; - - let binary_upload_event_url = self - .end_point - .get_url_for_event_binary_upload(&event_response_id); + let build_request = |end_point: &C8yEndPoint| -> Result { + let binary_upload_event_url = + end_point.get_url_for_event_binary_upload(&event_response_id); + Ok(HttpRequestBuilder::post(&binary_upload_event_url) + .header("Accept", "application/json") + .header("Content-Type", "text/plain") + .body(request.log_content.clone())) + }; - let req_builder = HttpRequestBuilder::post(binary_upload_event_url.clone()) - .header("Accept", "application/json") - .header("Content-Type", "text/plain") - .body(request.log_content); - let http_result = self.execute(req_builder).await?.unwrap(); + let http_result = self.execute(device_id.clone(), build_request).await??; if !http_result.status().is_success() { Err(C8YRestError::CustomError("Upload failed".into())) } else { - Ok(binary_upload_event_url) + Ok(self + .end_point + .get_url_for_event_binary_upload(&event_response_id)) } } @@ -271,40 +349,58 @@ impl C8YHttpProxyActor { &mut self, request: UploadConfigFile, ) -> Result { + let device_id = request.device_id; // read the config file contents let config_content = std::fs::read_to_string(request.config_path) .map_err(>::into)?; - let config_file_event = self - .create_event_request(request.config_type, None, None, request.child_device_id) - .await?; + let create_event = |internal_id: String| -> C8yCreateEvent { + C8yCreateEvent { + source: Some(C8yManagedObject { id: internal_id }), + event_type: request.config_type.clone(), + time: OffsetDateTime::now_utc(), + text: request.config_type.clone(), + extras: HashMap::new(), + } + }; - debug!(target: self.name(), "Creating config event: {:?}", config_file_event); - let event_response_id = self.send_event_internal(config_file_event).await?; + let event_response_id = self + .send_event_internal(device_id.clone(), create_event) + .await?; debug!(target: self.name(), "Config event created with id: {:?}", event_response_id); - let binary_upload_event_url = self - .end_point - .get_url_for_event_binary_upload(&event_response_id); - let req_builder = HttpRequestBuilder::post(binary_upload_event_url.clone()) - .header("Accept", "application/json") - .header("Content-Type", "text/plain") - .body(config_content.to_string()); - debug!(target: self.name(), "Uploading config file to URL: {}", binary_upload_event_url); - let http_result = self.execute(req_builder).await?.unwrap(); + let build_request = |end_point: &C8yEndPoint| -> Result { + let binary_upload_event_url = + end_point.get_url_for_event_binary_upload(&event_response_id); + Ok(HttpRequestBuilder::post(&binary_upload_event_url) + .header("Accept", "application/json") + .header("Content-Type", "text/plain") + .body(config_content.to_string())) + }; + debug!(target: self.name(), "Uploading config file to URL: {}", self.end_point + .get_url_for_event_binary_upload(&event_response_id)); + let http_result = self.execute(device_id.clone(), build_request).await??; if !http_result.status().is_success() { Err(C8YRestError::CustomError("Upload failed".into())) } else { - Ok(binary_upload_event_url) + Ok(self + .end_point + .get_url_for_event_binary_upload(&event_response_id)) } } - async fn get_jwt_token(&mut self) -> Result { - if let Ok(token) = self.peers.jwt.await_response(()).await? { - Ok(token) - } else { - Err(C8YRestError::CustomError("JWT token not available".into())) + async fn get_and_set_jwt_token(&mut self) -> Result { + match self.end_point.token.clone() { + Some(token) => Ok(token), + None => { + if let Ok(token) = self.peers.jwt.await_response(()).await? { + self.end_point.token = Some(token.clone()); + Ok(token) + } else { + Err(C8YRestError::CustomError("JWT token not available".into())) + } + } } } @@ -315,7 +411,7 @@ impl C8YHttpProxyActor { .end_point .url_is_in_my_tenant_domain(download_info.url()) { - let token = self.get_jwt_token().await?; + let token = self.get_and_set_jwt_token().await?; download_info.auth = Some(Auth::new_bearer(token.as_str())); } @@ -327,43 +423,30 @@ impl C8YHttpProxyActor { Ok(()) } - async fn create_event_request( - &mut self, - event_type: String, - event_text: Option, - event_time: Option, - child_device_id: Option, - ) -> Result { - let device_internal_id = if let Some(device_id) = child_device_id { - self.get_c8y_internal_child_id(device_id).await? - } else { - self.end_point.get_c8y_internal_id().to_string() - }; - - let c8y_managed_object = C8yManagedObject { - id: device_internal_id, - }; - - Ok(C8yCreateEvent::new( - Some(c8y_managed_object), - event_type.clone(), - event_time.unwrap_or_else(OffsetDateTime::now_utc), - event_text.unwrap_or(event_type), - HashMap::new(), - )) - } - async fn send_event_internal( &mut self, - c8y_event: C8yCreateEvent, + device_id: String, + create_event: impl Fn(String) -> C8yCreateEvent, ) -> Result { - let create_event_url = self.end_point.get_url_for_create_event(); + // Get and set child device internal id + if device_id.ne(&self.end_point.device_id) { + self.get_and_set_internal_id(device_id.clone()).await?; + } + + let build_request = |end_point: &C8yEndPoint| -> Result { + let create_event_url = end_point.get_url_for_create_event(); + let internal_id = end_point + .get_internal_id(device_id.clone()) + .map_err(|e| C8YRestError::CustomError(e.to_string()))?; + let updated_c8y_event = create_event(internal_id); + + Ok(HttpRequestBuilder::post(&create_event_url) + .header("Accept", "application/json") + .header("Content-Type", "application/json") + .json(&updated_c8y_event)) + }; - let req_builder = HttpRequestBuilder::post(create_event_url) - .header("Accept", "application/json") - .header("Content-Type", "application/json") - .json(&c8y_event); - let http_result = self.execute(req_builder).await?; + let http_result = self.execute(device_id.clone(), build_request).await?; let http_response = http_result.error_for_status()?; let event_response: C8yEventResponse = http_response.json().await?; Ok(event_response.id) diff --git a/crates/extensions/c8y_http_proxy/src/handle.rs b/crates/extensions/c8y_http_proxy/src/handle.rs index b8f61010ab4..5f37d86090b 100644 --- a/crates/extensions/c8y_http_proxy/src/handle.rs +++ b/crates/extensions/c8y_http_proxy/src/handle.rs @@ -3,6 +3,7 @@ use crate::messages::C8YRestRequest; use crate::messages::C8YRestResponse; use crate::messages::C8YRestResult; use crate::messages::GetJwtToken; +use crate::messages::SoftwareListResponse; use crate::messages::UploadConfigFile; use crate::messages::UploadLogBinary; use c8y_api::json_c8y::C8yCreateEvent; @@ -49,8 +50,14 @@ impl C8YHttpProxy { pub async fn send_software_list_http( &mut self, c8y_software_list: C8yUpdateSoftwareListResponse, + device_id: String, ) -> Result<(), C8YRestError> { - let request: C8YRestRequest = c8y_software_list.into(); + let request: C8YRestRequest = SoftwareListResponse { + c8y_software_list, + device_id, + } + .into(); + match self.c8y.await_response(request).await? { Ok(C8YRestResponse::Unit(_)) => Ok(()), unexpected => Err(unexpected.into()), @@ -61,12 +68,12 @@ impl C8YHttpProxy { &mut self, log_type: &str, log_content: &str, - child_device_id: Option, + device_id: String, ) -> Result { let request: C8YRestRequest = UploadLogBinary { log_type: log_type.to_string(), log_content: log_content.to_string(), - child_device_id, + device_id, } .into(); match self.c8y.await_response(request).await? { @@ -79,12 +86,12 @@ impl C8YHttpProxy { &mut self, config_path: &Path, config_type: &str, - child_device_id: Option, + device_id: String, ) -> Result { let request: C8YRestRequest = UploadConfigFile { config_path: config_path.to_owned(), config_type: config_type.to_string(), - child_device_id, + device_id, } .into(); match self.c8y.await_response(request).await? { diff --git a/crates/extensions/c8y_http_proxy/src/messages.rs b/crates/extensions/c8y_http_proxy/src/messages.rs index d6ce2c52122..e97b02646ec 100644 --- a/crates/extensions/c8y_http_proxy/src/messages.rs +++ b/crates/extensions/c8y_http_proxy/src/messages.rs @@ -6,7 +6,7 @@ use tedge_actors::ChannelError; use tedge_http_ext::HttpError; use tedge_utils::file::PermissionEntry; -fan_in_message_type!(C8YRestRequest[GetJwtToken, C8yCreateEvent, C8yUpdateSoftwareListResponse, UploadLogBinary, UploadConfigFile, DownloadFile]: Debug, PartialEq, Eq); +fan_in_message_type!(C8YRestRequest[GetJwtToken, C8yCreateEvent, SoftwareListResponse, UploadLogBinary, UploadConfigFile, DownloadFile]: Debug, PartialEq, Eq); //HIPPO Rename EventId to String as there could be many other String responses as well and this macro doesn't allow another String variant fan_in_message_type!(C8YRestResponse[EventId, Unit]: Debug); @@ -43,18 +43,24 @@ pub type C8YRestResult = Result; #[derive(Debug, PartialEq, Eq)] pub struct GetJwtToken; +#[derive(Debug, PartialEq, Eq)] +pub struct SoftwareListResponse { + pub c8y_software_list: C8yUpdateSoftwareListResponse, + pub device_id: String, +} + #[derive(Debug, PartialEq, Eq)] pub struct UploadLogBinary { pub log_type: String, pub log_content: String, - pub child_device_id: Option, + pub device_id: String, } #[derive(Debug, PartialEq, Eq)] pub struct UploadConfigFile { pub config_path: PathBuf, pub config_type: String, - pub child_device_id: Option, + pub device_id: String, } #[derive(Debug, Clone, Eq, PartialEq)] diff --git a/crates/extensions/c8y_http_proxy/src/tests.rs b/crates/extensions/c8y_http_proxy/src/tests.rs index 1092a8abf1d..c8cb09e412e 100644 --- a/crates/extensions/c8y_http_proxy/src/tests.rs +++ b/crates/extensions/c8y_http_proxy/src/tests.rs @@ -2,6 +2,7 @@ use crate::credentials::ConstJwtRetriever; use crate::handle::C8YHttpProxy; use crate::C8YHttpConfig; use crate::C8YHttpProxyBuilder; +use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; use c8y_api::json_c8y::InternalIdResponse; use std::path::PathBuf; use tedge_actors::Actor; @@ -46,7 +47,7 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { tokio::spawn(async move { // NOTE: this is done in the background because this call awaits for the response. proxy - .upload_log_binary("test.log", "some log content", None) + .upload_log_binary("test.log", "some log content", "device-001".into()) .await .unwrap(); }); @@ -63,6 +64,254 @@ async fn c8y_http_proxy_requests_the_device_internal_id_on_start() { .await; } +#[tokio::test] +async fn retry_internal_id_on_expired_jwt() { + let c8y_host = "c8y.tenant.io"; + let device_id = "device-001"; + let token = "JWT token"; + let external_id = "external-device-001"; + let tmp_dir = "/tmp"; + + let (mut proxy, mut c8y) = + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + + // Even before any request is sent to the c8y_proxy + // the proxy requests over HTTP the internal device id. + let init_request = HttpRequestBuilder::get(format!( + "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" + )) + .bearer_auth(token) + .build() + .unwrap(); + c8y.assert_recv(Some(init_request)).await; + + // Cumulocity returns unauthorized error (401), because the jwt token has expired + let c8y_response = HttpResponseBuilder::new().status(401).build().unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + c8y.assert_recv(Some( + HttpRequestBuilder::get(format!( + "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" + )) + .bearer_auth(token) + .build() + .unwrap(), + )) + .await; + // Mapper retries to get the internal device id, after getting a fresh jwt token + let c8y_response = HttpResponseBuilder::new() + .status(200) + .json(&InternalIdResponse::new(device_id, external_id)) + .build() + .unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + + // This internal id is then used by the proxy for subsequent requests. + // For instance, if the proxy upload a log file + tokio::spawn(async move { + // NOTE: this is done in the background because this call awaits for the response. + proxy + .upload_log_binary("test.log", "some log content", "device-001".into()) + .await + .unwrap(); + }); + + // then the upload request received by c8y is related to the internal id + c8y.assert_recv(Some( + HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) + .bearer_auth(token) + .header("content-type", "application/json") + .header("accept", "application/json") + .build() + .unwrap(), + )) + .await; +} + +#[tokio::test] +async fn retry_software_list_once_with_fresh_internal_id() { + let c8y_host = "c8y.tenant.io"; + let device_id = "device-001"; + let token = "JWT token"; + let external_id = "external-device-001"; + let tmp_dir = "/tmp"; + + let (mut proxy, mut c8y) = + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + + // Even before any request is sent to the c8y_proxy + // the proxy requests over HTTP the internal device id. + let _init_request = HttpRequestBuilder::get(format!( + "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" + )) + .bearer_auth(token) + .build() + .unwrap(); + // skip the message + c8y.recv().await; + + // Cumulocity returns the internal device id + let c8y_response = HttpResponseBuilder::new() + .status(200) + .json(&InternalIdResponse::new(device_id, external_id)) + .build() + .unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + + // This internal id is then used by the proxy for subsequent requests. + // Create the software list and publish + let c8y_software_list = C8yUpdateSoftwareListResponse::default(); + + tokio::spawn(async move { + // NOTE: this is done in the background because this call awaits for the response. + proxy + .send_software_list_http(c8y_software_list, device_id.into()) + .await + }); + + let c8y_software_list = C8yUpdateSoftwareListResponse::default(); + // then the upload request received by c8y is related to the internal id + c8y.assert_recv(Some( + HttpRequestBuilder::put(format!( + "https://{c8y_host}/inventory/managedObjects/{device_id}" + )) + .header("content-type", "application/json") + .header("accept", "application/json") + .bearer_auth(token) + .json(&c8y_software_list) + .build() + .unwrap(), + )) + .await; + + // The software list upload fails because the device identified with internal id not found + let c8y_response = HttpResponseBuilder::new() + .status(404) + .json(&InternalIdResponse::new(device_id, external_id)) + .build() + .unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + + // Now the mapper gets a new internal id for the specific device id + c8y.assert_recv(Some( + HttpRequestBuilder::get(format!( + "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" + )) + .bearer_auth(token) + .build() + .unwrap(), + )) + .await; + + // Cumulocity returns the internal device id, after retrying with the fresh jwt token + let c8y_response = HttpResponseBuilder::new() + .status(200) + .json(&InternalIdResponse::new(device_id, external_id)) + .build() + .unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + + let c8y_software_list = C8yUpdateSoftwareListResponse::default(); + // then the upload request received by c8y is related to the internal id + c8y.assert_recv(Some( + HttpRequestBuilder::put(format!( + "https://{c8y_host}/inventory/managedObjects/{device_id}" + )) + .bearer_auth(token) + .header("content-type", "application/json") + .header("accept", "application/json") + .json(&c8y_software_list) + .build() + .unwrap(), + )) + .await; +} + +#[tokio::test] +async fn auto_retry_upload_log_binary_when_internal_id_expires() { + let c8y_host = "c8y.tenant.io"; + let device_id = "device-001"; + let token = "JWT token"; + let external_id = "external-device-001"; + let tmp_dir = "/tmp"; + + let (mut proxy, mut c8y) = + spawn_c8y_http_proxy(c8y_host.into(), device_id.into(), tmp_dir.into(), token).await; + + // Even before any request is sent to the c8y_proxy + // the proxy requests over HTTP the internal device id. + let init_request = HttpRequestBuilder::get(format!( + "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" + )) + .bearer_auth(token) + .build() + .unwrap(); + c8y.assert_recv(Some(init_request)).await; + // Cumulocity returns the internal device id + let c8y_response = HttpResponseBuilder::new() + .status(200) + .json(&InternalIdResponse::new(device_id, external_id)) + .build() + .unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + // This internal id is then used by the proxy for subsequent requests. + // For instance, if the proxy upload a log file + tokio::spawn(async move { + // NOTE: this is done in the background because this call awaits for the response. + proxy + .upload_log_binary("test.log", "some log content", "device-001".into()) + .await + .unwrap(); + }); + // then the upload request received by c8y is related to the internal id + c8y.assert_recv(Some( + HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) + .bearer_auth(token) + .header("content-type", "application/json") + .header("accept", "application/json") + .build() + .unwrap(), + )) + .await; + + // Creating the event over http failed due to the device is NOT_FOUND + let c8y_response = HttpResponseBuilder::new() + .status(404) + .json(&InternalIdResponse::new(device_id, external_id)) + .build() + .unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + + // Mapper retries the call with internal id + let c8y_response = HttpResponseBuilder::new() + .status(200) + .json(&InternalIdResponse::new(device_id, external_id)) + .build() + .unwrap(); + c8y.send(Ok(c8y_response)).await.unwrap(); + + // Now there is a request to get the internal id + c8y.assert_recv(Some( + HttpRequestBuilder::get(format!( + "https://{c8y_host}/identity/externalIds/c8y_Serial/{device_id}" + )) + .bearer_auth(token) + .build() + .unwrap(), + )) + .await; + + // then the upload request received by c8y is related to the internal id + c8y.assert_recv(Some( + HttpRequestBuilder::post(format!("https://{c8y_host}/event/events/")) + .bearer_auth(token) + .header("content-type", "application/json") + .header("accept", "application/json") + .build() + .unwrap(), + )) + .await; +} + /// Spawn an `C8YHttpProxyActor` instance /// Return two handles: /// - one `C8YHttpProxy` to send requests to the actor diff --git a/crates/extensions/c8y_log_manager/src/actor.rs b/crates/extensions/c8y_log_manager/src/actor.rs index 96cf64ba977..5922f24708c 100644 --- a/crates/extensions/c8y_log_manager/src/actor.rs +++ b/crates/extensions/c8y_log_manager/src/actor.rs @@ -271,7 +271,11 @@ impl LogManagerActor { let upload_event_url = self .http_proxy - .upload_log_binary(&smartrest_request.log_type, &log_content, None) + .upload_log_binary( + &smartrest_request.log_type, + &log_content, + self.config.device_id.clone(), + ) .await?; let successful = LogfileRequest::successful(Some(upload_event_url))?; @@ -786,7 +790,7 @@ mod tests { Some(C8YRestRequest::UploadLogBinary(UploadLogBinary { log_type: "type_two".to_string(), log_content: "filename: file_c\nSome content\n".to_string(), - child_device_id: None + device_id: "SUT".into() })) ); diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 081ddda2408..7d59a6d1df5 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -210,7 +210,7 @@ impl CumulocityConverter { let log_dir = config.logs_path.join(TEDGE_AGENT_LOG_DIR); let operation_logs = OperationLogs::try_new(log_dir.into())?; - let c8y_endpoint = C8yEndPoint::new(&c8y_host, &device_name, ""); + let c8y_endpoint = C8yEndPoint::new(&c8y_host, &device_name); let mapper_config = MapperConfig { out_topic: Topic::new_unchecked("c8y/measurement/measurements/create"), @@ -706,15 +706,18 @@ impl Converter for CumulocityConverter { Ok(validate_and_publish_software_list( message.payload_str()?, &mut self.http_proxy, + self.device_name.clone(), //derive from topic, when supported for child device also. ) .await?) } Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareUpdateResponse)) => { debug!("Software update"); - Ok( - publish_operation_status(message.payload_str()?, &mut self.http_proxy) - .await?, + Ok(publish_operation_status( + message.payload_str()?, + &mut self.http_proxy, + self.device_name.clone(), ) + .await?) } Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::RestartResponse)) => { Ok(publish_restart_operation_status(message.payload_str()?).await?) @@ -929,6 +932,7 @@ async fn publish_restart_operation_status( async fn publish_operation_status( json_response: &str, http_proxy: &mut C8YHttpProxy, + device_id: String, ) -> Result, CumulocityMapperError> { let response = SoftwareUpdateResponse::from_json(json_response)?; let topic = C8yTopic::SmartRestResponse.to_topic()?; @@ -942,13 +946,13 @@ async fn publish_operation_status( let smartrest_set_operation = SmartRestSetOperationToSuccessful::from_thin_edge_json(response)?.to_smartrest()?; - validate_and_publish_software_list(json_response, http_proxy).await?; + validate_and_publish_software_list(json_response, http_proxy, device_id).await?; Ok(vec![Message::new(&topic, smartrest_set_operation)]) } OperationStatus::Failed => { let smartrest_set_operation = SmartRestSetOperationToFailed::from_thin_edge_json(response)?.to_smartrest()?; - validate_and_publish_software_list(json_response, http_proxy).await?; + validate_and_publish_software_list(json_response, http_proxy, device_id).await?; Ok(vec![Message::new(&topic, smartrest_set_operation)]) } } @@ -957,6 +961,7 @@ async fn publish_operation_status( async fn validate_and_publish_software_list( payload: &str, http_proxy: &mut C8YHttpProxy, + device_id: String, ) -> Result, CumulocityMapperError> { let response = &SoftwareListResponse::from_json(payload)?; @@ -964,7 +969,7 @@ async fn validate_and_publish_software_list( OperationStatus::Successful => { let c8y_software_list: C8yUpdateSoftwareListResponse = response.into(); http_proxy - .send_software_list_http(c8y_software_list) + .send_software_list_http(c8y_software_list, device_id) .await?; } diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 411239592f6..b12c3d3adb7 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -1302,7 +1302,7 @@ fn spawn_dummy_c8y_http_proxy(mut http: SimpleMessageBox { + Some(C8YRestRequest::SoftwareListResponse(_)) => { let _ = http .send(Ok(c8y_http_proxy::messages::C8YRestResponse::Unit(()))) .await;