Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sinks): spike out encoding events before batching #18028

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

lukesteensen
Copy link
Member

DO NOT MERGE: This is still missing at least a few important features.

The problem

Vector currently does batching by using the in-memory size of events (via the ByteSizeOf trait), regardless of the encoding that the sink will actually use to write out those events. This is potentially useful if you care about managing Vector's memory use (though somewhat indirect), but surprisingly inaccurate if you're trying to affect the size of the sink's actual output.

For example, a simple config of demo_logs with json formatting piped into the aws_s3 sink will result in 4MB objects (uncompressed) despite a batch size setting of 10MB. Metrics can have a large swing in the other direction due to struct field names not taking up any space in memory, but being a meaningful part of a JSON payload, for example. In general, the issue is heavily data-dependent, but there is very significant impact in many straightforward use cases.

How it works today

The flow of the aws_s3 and many similar sinks is roughly as follows:

  1. The PartitionedBatcher turns the input stream of Events into a stream of (Key, Vec<Event>), using the provided batch settings and ByteSizeOf to calculate the size of each event.
  2. We concurrently map those incoming batches into actual requests via the RequestBuilder trait:
    1. Metadata is split out from the events themselves
    2. The events are encoded all at once into the request payload (kind of, see below)
    3. The full request is built from the metadata and the payload

The encoding stage in step 2.2 is somewhat complex, and we actually serialize each event into a new byte buffer before actually writing it to the final payload. This appears to be done primarily so that we can track the actual number of bytes being written for instrumentation purposes.

This PR

In this spike we experiment with pulling parts of the encoding stage up in the pipeline, before the partitioner, so that we can use the actual correct encoded size to build batches accurately (i.e. relative to what we will output, not the in-memory size). This doesn't fit into the current trait structure (e.g. RequestBuilder), so there's some messiness in the implementation that we could clean up if we decide this is a viable path forward.

To do this, we take a bit of a shortcut in that we keep both the original event and the encoded version together (the new EncodedEvent) and pass them along the pipeline for a while. This is probably not optimal from a memory-use perspective, but limited the amount of rework that needed to be done to prove the concept. An interesting alternative would be to precompute only what is needed downstream (e.g. partition keys, keys for tagging metrics, other semantically meaningful fields for request building) and pass that along instead of the full original event.

The main thing missing from the current implementation is compression, due to some very thorny compiler errors that popped up when I tried to add it. These issues were likely due to the relatively complex state of the spike (lots of nested async and chaining), and shouldn't prove to be a problem if we choose to move forward and properly factor the code.

Open questions

  1. How big of a deal is this? From one angle, it seems to be a serious correctness issue that needs to be addressed. On the other hand, it's not clear what, if any, guarantees we make about batching configuration and request sizes. The best argument that this is a serious bug is sinks like Datadog Logs not being able to respect hard limits on the target API, causing errors for users. Does that mean we need to fix this at all costs for all sinks?
  2. Is there any performance impact from moving event encoding out of a concurrent stage? Would we need to take additional steps to encode chunks of events concurrently prior to batching? Related to the previous question, what kind of hit is acceptable?
  3. We calculate a lot of different numbers about events flowing through a sink (e.g. byte size of events, estimated JSON size of events, pre-compression request size, post-compression request size), but which ones are important? Do we care about the in-memory size of events flowing through sinks despite the fact that they don't really represent the data in the way the user sees it (i.e. encoded)? Why do we estimate the JSON size of events at all, and should we replace use of that with the actual encoded size, or is there something special about JSON that we always want it even if we end up using a different encoding?
  4. How much overlap is there here with IncrementalRequestBuilder? It seems designed to be able to split requests that are too large, which could be enough to solve for hard API failures like Datadog Logs, but does not seem like it'd help at all with use cases like S3 objects being too small. If we do make a larger change, should it replace this version as well?

@netlify
Copy link

netlify bot commented Jul 20, 2023

Deploy Preview for vrl-playground ready!

Name Link
🔨 Latest commit 56853cd
🔍 Latest deploy log https://app.netlify.com/sites/vrl-playground/deploys/64b9dece2dc4b4000890426c
😎 Deploy Preview https://deploy-preview-18028--vrl-playground.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@netlify
Copy link

netlify bot commented Jul 20, 2023

Deploy Preview for vector-project canceled.

Name Link
🔨 Latest commit 56853cd
🔍 Latest deploy log https://app.netlify.com/sites/vector-project/deploys/64b9dece41e9e90008d5f15e

@datadog-vectordotdev
Copy link

Datadog Report

Branch report: encode_then_batch
Commit report: 090a113

vector: 0 Failed, 0 New Flaky, 1921 Passed, 0 Skipped, 1m 20.93s Wall Time

Signed-off-by: Luke Steensen <[email protected]>
Copy link
Contributor

@neuronull neuronull left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't do any nitpicking here as this is a spike draft. That run_inner fn seems like it could be nice to break up, perhaps around the concurrent_map.

It looks like this more or less accomplishes the goal.

An interesting alternative would be to precompute only what is needed downstream (e.g. partition keys, keys for tagging metrics, other semantically meaningful fields for request building) and pass that along instead of the full original event.

That does seem like it could be a nice adjustment.

}
payload.extend_from_slice(combined_encoder.batch_suffix());

let request_metadata = RequestMetadata::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a fringe benefit of this would be that we don't need the RequestMetadataBuilder anymore ? I added that last year and it was kind of annoying, like if we have all the information we need in one place there are definite perks to that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sinks Anything related to the Vector's sinks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants