From 869f60bbb8261762bb4319049db594cede661ab7 Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Wed, 19 Jul 2023 16:08:25 -0500 Subject: [PATCH] fix(sinks): spike out encoding events before batching Signed-off-by: Luke Steensen --- src/sinks/aws_s3/sink.rs | 176 ++++++++++++++++++++++++++++- src/sinks/s3_common/partitioner.rs | 16 ++- 2 files changed, 180 insertions(+), 12 deletions(-) diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index c9849cf834e13..d578b10acd77b 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -1,11 +1,20 @@ -use std::io; +use std::{fmt, io, num::NonZeroUsize, sync::Arc}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use chrono::Utc; use codecs::encoding::Framer; +use futures::StreamExt; +use futures_util::stream::BoxStream; +use tokio_util::codec::Encoder as _; +use tower::Service; use uuid::Uuid; -use vector_common::request_metadata::RequestMetadata; -use vector_core::event::Finalizable; +use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata}; +use vector_core::{ + event::Finalizable, + partition::Partitioner, + stream::{BatcherSettings, DriverResponse}, + ByteSizeOf, +}; use crate::{ codecs::{Encoder, Transformer}, @@ -13,16 +22,171 @@ use crate::{ sinks::{ s3_common::{ config::S3Options, - partitioner::S3PartitionKey, + partitioner::{S3KeyPartitioner, S3PartitionKey}, service::{S3Metadata, S3Request}, }, util::{ metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, - RequestBuilder, + RequestBuilder, SinkBuilderExt, }, }, }; +struct NewS3Sink { + service: Svc, + partitioner: S3KeyPartitioner, + transformer: Transformer, + framer: Framer, + serializer: codecs::encoding::Serializer, + batcher_settings: BatcherSettings, + options: S3RequestOptions, +} + +struct EncodedEvent { + inner: Event, + encoded: BytesMut, +} + +// hack to reuse this trait for encoded size +impl ByteSizeOf for EncodedEvent { + fn size_of(&self) -> usize { + self.allocated_bytes() + } + + fn allocated_bytes(&self) -> usize { + self.encoded.len() + } +} + +struct WrappedPartitioner(S3KeyPartitioner); + +impl Partitioner for WrappedPartitioner { + type Item = EncodedEvent; + type Key = Option; + + fn partition(&self, item: &Self::Item) -> Self::Key { + self.0.partition(&item.inner) + } +} + +impl NewS3Sink +where + Svc: Service + Send + 'static, + Svc::Future: Send + 'static, + Svc::Response: DriverResponse + Send + 'static, + Svc::Error: fmt::Debug + Into + Send, +{ + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let transformer = self.transformer; + let mut serializer = self.serializer; + let partitioner = WrappedPartitioner(self.partitioner); + let service = self.service; + let framer = Arc::new(self.framer); + let batcher_settings = self.batcher_settings; + let options = Arc::new(self.options); + + let combined_encoder = Arc::new(Encoder::::new( + framer.as_ref().clone(), + serializer.clone(), + )); + + let builder_limit = NonZeroUsize::new(64); + + input + .map(|event| { + let mut to_encode = event.clone(); + transformer.transform(&mut to_encode); + + let mut encoded = BytesMut::new(); + serializer.encode(to_encode, &mut encoded).unwrap(); + + EncodedEvent { + inner: event, + encoded, + } + }) + .batched_partitioned(partitioner, batcher_settings) + .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) + .concurrent_map(builder_limit, move |(partition_key, encoded_events)| { + let framer = Arc::clone(&framer); + let combined_encoder = Arc::clone(&combined_encoder); + let options = Arc::clone(&options); + + Box::pin(async move { + // This is silly because we really just need the prefix, delimiter, and suffix. Oh well. + let mut framer = framer.as_ref().clone(); + + let mut grouped_sizes = GroupedCountByteSize::new_tagged(); + let mut events = Vec::with_capacity(encoded_events.len()); + let mut encoded = Vec::with_capacity(encoded_events.len()); + for e in encoded_events { + grouped_sizes.add_event(&e.inner, e.encoded.len().into()); + events.push(e.inner); + encoded.push(e.encoded); + } + + // TODO: this doesn't include framing, is that right? + let events_encoded_size = encoded.iter().map(BytesMut::len).sum::(); + + let finalizers = events.take_finalizers(); + let s3_key_prefix = partition_key.key_prefix.clone(); + + let metadata = S3Metadata { + partition_key, + s3_key: s3_key_prefix, + finalizers, + }; + + // TODO: not doing compression yet + let mut payload = BytesMut::new(); + payload.extend_from_slice(combined_encoder.batch_prefix()); + let mut remaining = encoded.len(); + for buf in encoded { + payload.extend_from_slice(buf.as_ref()); + remaining -= 1; + if remaining > 0 { + // write the frame delimiter + framer.encode((), &mut payload).expect("framing to bytes"); + } + } + payload.extend_from_slice(combined_encoder.batch_suffix()); + + let request_metadata = RequestMetadata::new( + events.len(), + events_encoded_size, + payload.len(), + // TODO: same since no compression yet + payload.len(), + // TODO: just using encoded size here, not sure if we still need to estimate? + grouped_sizes, + ); + + options.build_request( + metadata, + request_metadata, + EncodeResult::uncompressed(payload.freeze()), + ) + }) + }) + .into_driver(service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl vector_core::sink::StreamSink for NewS3Sink +where + Svc: Service + Send + 'static, + Svc::Future: Send + 'static, + Svc::Response: DriverResponse + Send + 'static, + Svc::Error: fmt::Debug + Into + Send, +{ + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} + #[derive(Clone)] pub struct S3RequestOptions { pub bucket: String, diff --git a/src/sinks/s3_common/partitioner.rs b/src/sinks/s3_common/partitioner.rs index 16d1e636e3d08..f1946fef82233 100644 --- a/src/sinks/s3_common/partitioner.rs +++ b/src/sinks/s3_common/partitioner.rs @@ -18,13 +18,8 @@ impl S3KeyPartitioner { ) -> Self { Self(key_prefix_template, ssekms_key_id_template) } -} -impl Partitioner for S3KeyPartitioner { - type Item = Event; - type Key = Option; - - fn partition(&self, item: &Self::Item) -> Self::Key { + pub fn partition(&self, item: &Event) -> Option { let key_prefix = self .0 .render_string(item) @@ -56,3 +51,12 @@ impl Partitioner for S3KeyPartitioner { }) } } + +impl Partitioner for S3KeyPartitioner { + type Item = Event; + type Key = Option; + + fn partition(&self, item: &Self::Item) -> Self::Key { + self.partition(item) + } +}