Skip to content

Commit

Permalink
fix(sinks): spike out encoding events before batching
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <[email protected]>
  • Loading branch information
lukesteensen committed Jul 19, 2023
1 parent b00727e commit 869f60b
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 12 deletions.
176 changes: 170 additions & 6 deletions src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,192 @@
use std::io;
use std::{fmt, io, num::NonZeroUsize, sync::Arc};

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use chrono::Utc;
use codecs::encoding::Framer;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use tokio_util::codec::Encoder as _;
use tower::Service;
use uuid::Uuid;
use vector_common::request_metadata::RequestMetadata;
use vector_core::event::Finalizable;
use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata};
use vector_core::{
event::Finalizable,
partition::Partitioner,
stream::{BatcherSettings, DriverResponse},
ByteSizeOf,
};

use crate::{
codecs::{Encoder, Transformer},
event::Event,
sinks::{
s3_common::{
config::S3Options,
partitioner::S3PartitionKey,
partitioner::{S3KeyPartitioner, S3PartitionKey},
service::{S3Metadata, S3Request},
},
util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder,
RequestBuilder, SinkBuilderExt,
},
},
};

struct NewS3Sink<Svc> {
service: Svc,
partitioner: S3KeyPartitioner,
transformer: Transformer,
framer: Framer,
serializer: codecs::encoding::Serializer,
batcher_settings: BatcherSettings,
options: S3RequestOptions,
}

struct EncodedEvent {
inner: Event,
encoded: BytesMut,
}

// hack to reuse this trait for encoded size
impl ByteSizeOf for EncodedEvent {
fn size_of(&self) -> usize {
self.allocated_bytes()
}

fn allocated_bytes(&self) -> usize {
self.encoded.len()
}
}

struct WrappedPartitioner(S3KeyPartitioner);

impl Partitioner for WrappedPartitioner {
type Item = EncodedEvent;
type Key = Option<S3PartitionKey>;

fn partition(&self, item: &Self::Item) -> Self::Key {
self.0.partition(&item.inner)
}
}

impl<Svc> NewS3Sink<Svc>
where
Svc: Service<S3Request> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse + Send + 'static,
Svc::Error: fmt::Debug + Into<crate::Error> + Send,
{
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let transformer = self.transformer;
let mut serializer = self.serializer;
let partitioner = WrappedPartitioner(self.partitioner);
let service = self.service;
let framer = Arc::new(self.framer);
let batcher_settings = self.batcher_settings;
let options = Arc::new(self.options);

let combined_encoder = Arc::new(Encoder::<Framer>::new(
framer.as_ref().clone(),
serializer.clone(),
));

let builder_limit = NonZeroUsize::new(64);

input
.map(|event| {
let mut to_encode = event.clone();
transformer.transform(&mut to_encode);

let mut encoded = BytesMut::new();
serializer.encode(to_encode, &mut encoded).unwrap();

EncodedEvent {
inner: event,
encoded,
}
})
.batched_partitioned(partitioner, batcher_settings)
.filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
.concurrent_map(builder_limit, move |(partition_key, encoded_events)| {
let framer = Arc::clone(&framer);
let combined_encoder = Arc::clone(&combined_encoder);
let options = Arc::clone(&options);

Box::pin(async move {
// This is silly because we really just need the prefix, delimiter, and suffix. Oh well.
let mut framer = framer.as_ref().clone();

let mut grouped_sizes = GroupedCountByteSize::new_tagged();
let mut events = Vec::with_capacity(encoded_events.len());
let mut encoded = Vec::with_capacity(encoded_events.len());
for e in encoded_events {
grouped_sizes.add_event(&e.inner, e.encoded.len().into());
events.push(e.inner);
encoded.push(e.encoded);
}

// TODO: this doesn't include framing, is that right?
let events_encoded_size = encoded.iter().map(BytesMut::len).sum::<usize>();

let finalizers = events.take_finalizers();
let s3_key_prefix = partition_key.key_prefix.clone();

let metadata = S3Metadata {
partition_key,
s3_key: s3_key_prefix,
finalizers,
};

// TODO: not doing compression yet
let mut payload = BytesMut::new();
payload.extend_from_slice(combined_encoder.batch_prefix());
let mut remaining = encoded.len();
for buf in encoded {
payload.extend_from_slice(buf.as_ref());
remaining -= 1;
if remaining > 0 {
// write the frame delimiter
framer.encode((), &mut payload).expect("framing to bytes");
}
}
payload.extend_from_slice(combined_encoder.batch_suffix());

let request_metadata = RequestMetadata::new(
events.len(),
events_encoded_size,
payload.len(),
// TODO: same since no compression yet
payload.len(),
// TODO: just using encoded size here, not sure if we still need to estimate?
grouped_sizes,
);

options.build_request(
metadata,
request_metadata,
EncodeResult::uncompressed(payload.freeze()),
)
})
})
.into_driver(service)
.run()
.await
}
}

#[async_trait::async_trait]
impl<Svc> vector_core::sink::StreamSink<Event> for NewS3Sink<Svc>
where
Svc: Service<S3Request> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse + Send + 'static,
Svc::Error: fmt::Debug + Into<crate::Error> + Send,
{
async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
self.run_inner(input).await
}
}

#[derive(Clone)]
pub struct S3RequestOptions {
pub bucket: String,
Expand Down
16 changes: 10 additions & 6 deletions src/sinks/s3_common/partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@ impl S3KeyPartitioner {
) -> Self {
Self(key_prefix_template, ssekms_key_id_template)
}
}

impl Partitioner for S3KeyPartitioner {
type Item = Event;
type Key = Option<S3PartitionKey>;

fn partition(&self, item: &Self::Item) -> Self::Key {
pub fn partition(&self, item: &Event) -> Option<S3PartitionKey> {
let key_prefix = self
.0
.render_string(item)
Expand Down Expand Up @@ -56,3 +51,12 @@ impl Partitioner for S3KeyPartitioner {
})
}
}

impl Partitioner for S3KeyPartitioner {
type Item = Event;
type Key = Option<S3PartitionKey>;

fn partition(&self, item: &Self::Item) -> Self::Key {
self.partition(item)
}
}

0 comments on commit 869f60b

Please sign in to comment.