Skip to content

Commit

Permalink
fix(aws_kinesis sink): fix batching or requests #20575 #1407
Browse files Browse the repository at this point in the history
Send batches to AWS Kinesis Data Streams and AWS Firehose independent of
ther parition keys.  In both API's batches of events do not need to
share the same partition key.
This makes the protocol more efficient, as by default the partition key
is a random key being different for every event.
  • Loading branch information
steven-aerts committed Jun 12, 2024
1 parent 662d1d0 commit 02a6141
Showing 1 changed file with 3 additions and 35 deletions.
38 changes: 3 additions & 35 deletions src/sinks/aws_kinesis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,12 @@ where
Ok(req) => Some(req),
}
})
.batched_partitioned(
KinesisPartitioner {
_phantom: PhantomData,
},
|| batch_settings.as_byte_size_config(),
)
.map(|(key, events)| {
.batched(batch_settings.as_byte_size_config())
.map(|events| {
let metadata = RequestMetadata::from_batch(
events.iter().map(|req| req.get_metadata().clone()),
);
BatchKinesisRequest {
key,
events,
metadata,
}
BatchKinesisRequest { events, metadata }
})
.into_driver(self.service)
.run()
Expand Down Expand Up @@ -148,7 +139,6 @@ pub struct BatchKinesisRequest<R>
where
R: Record + Clone,
{
pub key: KinesisKey,
pub events: Vec<KinesisRequest<R>>,
metadata: RequestMetadata,
}
Expand All @@ -159,9 +149,6 @@ where
{
fn clone(&self) -> Self {
Self {
key: KinesisKey {
partition_key: self.key.partition_key.clone(),
},
events: self.events.to_vec(),
metadata: self.metadata.clone(),
}
Expand Down Expand Up @@ -189,22 +176,3 @@ where
&mut self.metadata
}
}

struct KinesisPartitioner<R>
where
R: Record,
{
_phantom: PhantomData<R>,
}

impl<R> Partitioner for KinesisPartitioner<R>
where
R: Record,
{
type Item = KinesisRequest<R>;
type Key = KinesisKey;

fn partition(&self, item: &Self::Item) -> Self::Key {
item.key.clone()
}
}

0 comments on commit 02a6141

Please sign in to comment.