Skip to content

Commit

Permalink
Merge pull request #3217 from Bravo555/improve/c8y-converter-error-re…
Browse files Browse the repository at this point in the history
…porting

Attach topic to c8y message conversion error
  • Loading branch information
Bravo555 authored Oct 31, 2024
2 parents 75d772b + 443ec28 commit a5ee225
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
38 changes: 24 additions & 14 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::dynamic_discovery::DiscoverOp;
use crate::error::ConversionError;
use crate::error::MessageConversionError;
use crate::json;
use crate::operations;
use crate::operations::OperationHandler;
Expand Down Expand Up @@ -121,7 +122,7 @@ pub struct MapperConfig {
impl CumulocityConverter {
pub async fn convert(&mut self, input: &MqttMessage) -> Vec<MqttMessage> {
let messages_or_err = self.try_convert(input).await;
self.wrap_errors(messages_or_err)
self.wrap_errors_with_input(messages_or_err, input)
}

pub fn wrap_errors(
Expand All @@ -131,11 +132,20 @@ impl CumulocityConverter {
messages_or_err.unwrap_or_else(|error| vec![self.new_error_message(error)])
}

pub fn wrap_error(&self, message_or_err: Result<MqttMessage, ConversionError>) -> MqttMessage {
message_or_err.unwrap_or_else(|error| self.new_error_message(error))
pub fn wrap_errors_with_input(
&self,
messages_or_err: Result<Vec<MqttMessage>, ConversionError>,
input: &MqttMessage,
) -> Vec<MqttMessage> {
messages_or_err
.map_err(|error| MessageConversionError {
error,
topic: input.topic.name.clone(),
})
.unwrap_or_else(|error| vec![self.new_error_message(error)])
}

pub fn new_error_message(&self, error: ConversionError) -> MqttMessage {
pub fn new_error_message(&self, error: impl std::error::Error) -> MqttMessage {
error!("Mapping error: {}", error);
MqttMessage::new(&self.get_mapper_config().errors_topic, error.to_string())
}
Expand Down Expand Up @@ -1183,11 +1193,8 @@ impl CumulocityConverter {
Ok(vec![])
}
_ => {
let result = self
.try_convert_data_message(source, channel, message)
.await;
let messages = self.wrap_errors(result);
Ok(messages)
self.try_convert_data_message(source, channel, message)
.await
}
}
}
Expand Down Expand Up @@ -2469,9 +2476,12 @@ pub(crate) mod tests {
let alarm_payload = json!({ "text": big_alarm_text }).to_string();
let alarm_message = MqttMessage::new(&Topic::new_unchecked(alarm_topic), alarm_payload);

let messages = converter.try_convert(&alarm_message).await.unwrap();
let payload = messages[0].payload_str().unwrap();
assert!(payload.ends_with("greater than the threshold size of 16184."));
let error = converter.try_convert(&alarm_message).await.unwrap_err();
assert!(matches!(
error,
crate::error::ConversionError::SizeThresholdExceeded(_)
));

Ok(())
}

Expand Down Expand Up @@ -2670,7 +2680,7 @@ pub(crate) mod tests {
let result = converter.convert(&big_measurement_message).await;

let payload = result[0].payload_str().unwrap();
assert!(payload.starts_with(
assert!(payload.contains(
r#"The payload {"temperature0":0,"temperature1":1,"temperature10" received on te/device/main///m/ after translation is"#
));
assert!(payload.ends_with("greater than the threshold size of 16184."));
Expand Down Expand Up @@ -2721,7 +2731,7 @@ pub(crate) mod tests {

// Skipping the first two auto-registration messages and validating the third mapped message
let payload = result[0].payload_str().unwrap();
assert!(payload.starts_with(
assert!(payload.contains(
r#"The payload {"temperature0":0,"temperature1":1,"temperature10" received on te/device/child1///m/ after translation is"#
));
assert!(payload.ends_with("greater than the threshold size of 16184."));
Expand Down
7 changes: 7 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ pub enum MapperError {
FromStdIo(#[from] std::io::Error),
}

#[derive(Debug, thiserror::Error)]
#[error("Failed to convert a message on topic '{topic}': {error:#}")]
pub struct MessageConversionError {
pub error: ConversionError,
pub topic: String,
}

#[derive(Debug, thiserror::Error)]
pub enum ConversionError {
#[error(transparent)]
Expand Down

0 comments on commit a5ee225

Please sign in to comment.