diff --git a/changelog.d/20575_kinesis_batching.fix.md b/changelog.d/20575_kinesis_batching.fix.md new file mode 100644 index 0000000000000..9fdec2e1d85fc --- /dev/null +++ b/changelog.d/20575_kinesis_batching.fix.md @@ -0,0 +1,3 @@ +Batching records for AWS Kinesis Data Streams and AWS Firehose became independent of the partition key, improving efficiency significantly. + +authors: steven-aerts diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index ea28d905fd8e2..f29f246a7ffc7 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -65,21 +65,12 @@ where Ok(req) => Some(req), } }) - .batched_partitioned( - KinesisPartitioner { - _phantom: PhantomData, - }, - || batch_settings.as_byte_size_config(), - ) - .map(|(key, events)| { + .batched(batch_settings.as_byte_size_config()) + .map(|events| { let metadata = RequestMetadata::from_batch( events.iter().map(|req| req.get_metadata().clone()), ); - BatchKinesisRequest { - key, - events, - metadata, - } + BatchKinesisRequest { events, metadata } }) .into_driver(self.service) .run() @@ -148,7 +139,6 @@ pub struct BatchKinesisRequest where R: Record + Clone, { - pub key: KinesisKey, pub events: Vec>, metadata: RequestMetadata, } @@ -159,9 +149,6 @@ where { fn clone(&self) -> Self { Self { - key: KinesisKey { - partition_key: self.key.partition_key.clone(), - }, events: self.events.to_vec(), metadata: self.metadata.clone(), } @@ -189,22 +176,3 @@ where &mut self.metadata } } - -struct KinesisPartitioner -where - R: Record, -{ - _phantom: PhantomData, -} - -impl Partitioner for KinesisPartitioner -where - R: Record, -{ - type Item = KinesisRequest; - type Key = KinesisKey; - - fn partition(&self, item: &Self::Item) -> Self::Key { - item.key.clone() - } -}