From e67a58af8ed411e5e730839e16fd3a0184b46d32 Mon Sep 17 00:00:00 2001 From: Steven Aerts Date: Wed, 3 Jul 2024 23:58:14 +0200 Subject: [PATCH] fix(aws_kinesis sink): fix batching of requests #20575 #1407 (#20653) Send batches to AWS Kinesis Data Streams and AWS Firehose independent of ther parition keys. In both API's batches of events do not need to share the same partition key. This makes the protocol more efficient, as by default the partition key is a random key being different for every event. --- changelog.d/20575_kinesis_batching.fix.md | 3 ++ src/sinks/aws_kinesis/sink.rs | 38 ++--------------------- 2 files changed, 6 insertions(+), 35 deletions(-) create mode 100644 changelog.d/20575_kinesis_batching.fix.md diff --git a/changelog.d/20575_kinesis_batching.fix.md b/changelog.d/20575_kinesis_batching.fix.md new file mode 100644 index 0000000000000..9fdec2e1d85fc --- /dev/null +++ b/changelog.d/20575_kinesis_batching.fix.md @@ -0,0 +1,3 @@ +Batching records for AWS Kinesis Data Streams and AWS Firehose became independent of the partition key, improving efficiency significantly. + +authors: steven-aerts diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index ea28d905fd8e2..f29f246a7ffc7 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -65,21 +65,12 @@ where Ok(req) => Some(req), } }) - .batched_partitioned( - KinesisPartitioner { - _phantom: PhantomData, - }, - || batch_settings.as_byte_size_config(), - ) - .map(|(key, events)| { + .batched(batch_settings.as_byte_size_config()) + .map(|events| { let metadata = RequestMetadata::from_batch( events.iter().map(|req| req.get_metadata().clone()), ); - BatchKinesisRequest { - key, - events, - metadata, - } + BatchKinesisRequest { events, metadata } }) .into_driver(self.service) .run() @@ -148,7 +139,6 @@ pub struct BatchKinesisRequest where R: Record + Clone, { - pub key: KinesisKey, pub events: Vec>, metadata: RequestMetadata, } @@ -159,9 +149,6 @@ where { fn clone(&self) -> Self { Self { - key: KinesisKey { - partition_key: self.key.partition_key.clone(), - }, events: self.events.to_vec(), metadata: self.metadata.clone(), } @@ -189,22 +176,3 @@ where &mut self.metadata } } - -struct KinesisPartitioner -where - R: Record, -{ - _phantom: PhantomData, -} - -impl Partitioner for KinesisPartitioner -where - R: Record, -{ - type Item = KinesisRequest; - type Key = KinesisKey; - - fn partition(&self, item: &Self::Item) -> Self::Key { - item.key.clone() - } -}