diff --git a/changelog.d/20214_amqp_expiration.enhancement.md b/changelog.d/20214_amqp_expiration.enhancement.md new file mode 100644 index 0000000000000..7f9f285d2491c --- /dev/null +++ b/changelog.d/20214_amqp_expiration.enhancement.md @@ -0,0 +1,3 @@ +added support for `expiration_ms` on AMQP sink, to set an expiration on messages sent + +authors: sonnens diff --git a/src/sinks/amqp/config.rs b/src/sinks/amqp/config.rs index c24fcdc110ff3..1c5794b0022dd 100644 --- a/src/sinks/amqp/config.rs +++ b/src/sinks/amqp/config.rs @@ -12,12 +12,13 @@ use super::sink::AmqpSink; #[derive(Clone, Debug, Default)] pub struct AmqpPropertiesConfig { /// Content-Type for the AMQP messages. - #[configurable(derived)] pub(crate) content_type: Option, /// Content-Encoding for the AMQP messages. - #[configurable(derived)] pub(crate) content_encoding: Option, + + /// Expiration for AMQP messages (in milliseconds) + pub(crate) expiration_ms: Option, } impl AmqpPropertiesConfig { @@ -29,6 +30,9 @@ impl AmqpPropertiesConfig { if let Some(content_encoding) = &self.content_encoding { prop = prop.with_content_encoding(ShortString::from(content_encoding.clone())); } + if let Some(expiration_ms) = &self.expiration_ms { + prop = prop.with_expiration(ShortString::from(expiration_ms.to_string())); + } prop } } diff --git a/website/cue/reference/components/sinks/base/amqp.cue b/website/cue/reference/components/sinks/base/amqp.cue index 333293b890827..4e8756abec522 100644 --- a/website/cue/reference/components/sinks/base/amqp.cue +++ b/website/cue/reference/components/sinks/base/amqp.cue @@ -312,6 +312,11 @@ base: components: sinks: amqp: configuration: { required: false type: string: {} } + expiration_ms: { + description: "Expiration for AMQP messages (in milliseconds)" + required: false + type: uint: {} + } } } routing_key: {