Skip to content

Commit

Permalink
fix(aws_kinesis sink): fix batching of requests vectordotdev#20575 ve…
Browse files Browse the repository at this point in the history
…ctordotdev#1407 (vectordotdev#20653)

Send batches to AWS Kinesis Data Streams and AWS Firehose independent of
ther parition keys.  In both API's batches of events do not need to
share the same partition key.
This makes the protocol more efficient, as by default the partition key
is a random key being different for every event.
  • Loading branch information
steven-aerts authored and ym committed Aug 14, 2024
1 parent b1ebaea commit e67a58a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 35 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20575_kinesis_batching.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Batching records for AWS Kinesis Data Streams and AWS Firehose became independent of the partition key, improving efficiency significantly.

authors: steven-aerts
38 changes: 3 additions & 35 deletions src/sinks/aws_kinesis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,12 @@ where
Ok(req) => Some(req),
}
})
.batched_partitioned(
KinesisPartitioner {
_phantom: PhantomData,
},
|| batch_settings.as_byte_size_config(),
)
.map(|(key, events)| {
.batched(batch_settings.as_byte_size_config())
.map(|events| {
let metadata = RequestMetadata::from_batch(
events.iter().map(|req| req.get_metadata().clone()),
);
BatchKinesisRequest {
key,
events,
metadata,
}
BatchKinesisRequest { events, metadata }
})
.into_driver(self.service)
.run()
Expand Down Expand Up @@ -148,7 +139,6 @@ pub struct BatchKinesisRequest<R>
where
R: Record + Clone,
{
pub key: KinesisKey,
pub events: Vec<KinesisRequest<R>>,
metadata: RequestMetadata,
}
Expand All @@ -159,9 +149,6 @@ where
{
fn clone(&self) -> Self {
Self {
key: KinesisKey {
partition_key: self.key.partition_key.clone(),
},
events: self.events.to_vec(),
metadata: self.metadata.clone(),
}
Expand Down Expand Up @@ -189,22 +176,3 @@ where
&mut self.metadata
}
}

struct KinesisPartitioner<R>
where
R: Record,
{
_phantom: PhantomData<R>,
}

impl<R> Partitioner for KinesisPartitioner<R>
where
R: Record,
{
type Item = KinesisRequest<R>;
type Key = KinesisKey;

fn partition(&self, item: &Self::Item) -> Self::Key {
item.key.clone()
}
}

0 comments on commit e67a58a

Please sign in to comment.