diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 314253ec7f..d54e19ea9b 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -7,6 +7,7 @@ use crate::actor::IdDownloadRequest; use crate::actor::IdDownloadResult; use crate::dynamic_discovery::DiscoverOp; use crate::error::ConversionError; +use crate::error::MessageConversionError; use crate::json; use crate::operations; use crate::operations::OperationHandler; @@ -121,7 +122,7 @@ pub struct MapperConfig { impl CumulocityConverter { pub async fn convert(&mut self, input: &MqttMessage) -> Vec { let messages_or_err = self.try_convert(input).await; - self.wrap_errors(messages_or_err) + self.wrap_errors_with_input(messages_or_err, input) } pub fn wrap_errors( @@ -131,11 +132,20 @@ impl CumulocityConverter { messages_or_err.unwrap_or_else(|error| vec![self.new_error_message(error)]) } - pub fn wrap_error(&self, message_or_err: Result) -> MqttMessage { - message_or_err.unwrap_or_else(|error| self.new_error_message(error)) + pub fn wrap_errors_with_input( + &self, + messages_or_err: Result, ConversionError>, + input: &MqttMessage, + ) -> Vec { + messages_or_err + .map_err(|error| MessageConversionError { + error, + topic: input.topic.name.clone(), + }) + .unwrap_or_else(|error| vec![self.new_error_message(error)]) } - pub fn new_error_message(&self, error: ConversionError) -> MqttMessage { + pub fn new_error_message(&self, error: impl std::error::Error) -> MqttMessage { error!("Mapping error: {}", error); MqttMessage::new(&self.get_mapper_config().errors_topic, error.to_string()) } @@ -1183,11 +1193,8 @@ impl CumulocityConverter { Ok(vec![]) } _ => { - let result = self - .try_convert_data_message(source, channel, message) - .await; - let messages = self.wrap_errors(result); - Ok(messages) + self.try_convert_data_message(source, channel, message) + .await } } } @@ -2469,9 +2476,12 @@ pub(crate) mod tests { let alarm_payload = json!({ "text": big_alarm_text }).to_string(); let alarm_message = MqttMessage::new(&Topic::new_unchecked(alarm_topic), alarm_payload); - let messages = converter.try_convert(&alarm_message).await.unwrap(); - let payload = messages[0].payload_str().unwrap(); - assert!(payload.ends_with("greater than the threshold size of 16184.")); + let error = converter.try_convert(&alarm_message).await.unwrap_err(); + assert!(matches!( + error, + crate::error::ConversionError::SizeThresholdExceeded(_) + )); + Ok(()) } @@ -2670,7 +2680,7 @@ pub(crate) mod tests { let result = converter.convert(&big_measurement_message).await; let payload = result[0].payload_str().unwrap(); - assert!(payload.starts_with( + assert!(payload.contains( r#"The payload {"temperature0":0,"temperature1":1,"temperature10" received on te/device/main///m/ after translation is"# )); assert!(payload.ends_with("greater than the threshold size of 16184.")); @@ -2721,7 +2731,7 @@ pub(crate) mod tests { // Skipping the first two auto-registration messages and validating the third mapped message let payload = result[0].payload_str().unwrap(); - assert!(payload.starts_with( + assert!(payload.contains( r#"The payload {"temperature0":0,"temperature1":1,"temperature10" received on te/device/child1///m/ after translation is"# )); assert!(payload.ends_with("greater than the threshold size of 16184.")); diff --git a/crates/extensions/c8y_mapper_ext/src/error.rs b/crates/extensions/c8y_mapper_ext/src/error.rs index b20588debc..c56da6eac0 100644 --- a/crates/extensions/c8y_mapper_ext/src/error.rs +++ b/crates/extensions/c8y_mapper_ext/src/error.rs @@ -33,6 +33,13 @@ pub enum MapperError { FromStdIo(#[from] std::io::Error), } +#[derive(Debug, thiserror::Error)] +#[error("Failed to convert a message on topic '{topic}': {error:#}")] +pub struct MessageConversionError { + pub error: ConversionError, + pub topic: String, +} + #[derive(Debug, thiserror::Error)] pub enum ConversionError { #[error(transparent)]