Skip to content

Commit

Permalink
enhancement(amqp sink): add expiration option to AMQP messages (#20215)
Browse files Browse the repository at this point in the history
* add expiration to AMQP messages

* Update src/sinks/amqp/config.rs

Co-authored-by: Stephen Wakely <[email protected]>

* PR changes requested

---------

Co-authored-by: Stephen Wakely <[email protected]>
  • Loading branch information
sonnens and StephenWakely authored Apr 11, 2024
1 parent 665ab39 commit 7d7b1a2
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20214_amqp_expiration.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
added support for `expiration_ms` on AMQP sink, to set an expiration on messages sent

authors: sonnens
8 changes: 6 additions & 2 deletions src/sinks/amqp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ use super::sink::AmqpSink;
#[derive(Clone, Debug, Default)]
pub struct AmqpPropertiesConfig {
/// Content-Type for the AMQP messages.
#[configurable(derived)]
pub(crate) content_type: Option<String>,

/// Content-Encoding for the AMQP messages.
#[configurable(derived)]
pub(crate) content_encoding: Option<String>,

/// Expiration for AMQP messages (in milliseconds)
pub(crate) expiration_ms: Option<u64>,
}

impl AmqpPropertiesConfig {
Expand All @@ -29,6 +30,9 @@ impl AmqpPropertiesConfig {
if let Some(content_encoding) = &self.content_encoding {
prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
}
if let Some(expiration_ms) = &self.expiration_ms {
prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
}
prop
}
}
Expand Down
5 changes: 5 additions & 0 deletions website/cue/reference/components/sinks/base/amqp.cue
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ base: components: sinks: amqp: configuration: {
required: false
type: string: {}
}
expiration_ms: {
description: "Expiration for AMQP messages (in milliseconds)"
required: false
type: uint: {}
}
}
}
routing_key: {
Expand Down

0 comments on commit 7d7b1a2

Please sign in to comment.