-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add byte-based ingestion limits to the queue and output #39776
base: main
Are you sure you want to change the base?
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
This pull request is now in conflicts. Could you fix it? 🙏
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to do a thorough review, but a couple test cases came to mind and I don't think we currently have tests to cover them
- queue is exactly full (from byte perspective) and we try to add another event
- queue is empty and we try to add an event that is larger than queue can hold
- queue is partially full and we try to add an event that would put us over the limit
I am also interested in seeing if there are any performance changes with for these two scenarios:
|
@@ -80,12 +81,12 @@ func NetworkClients(netclients []NetworkClient) []Client { | |||
// The first argument is expected to contain a queue config.Namespace. | |||
// The queue config is passed to assign the queue factory when | |||
// elastic-agent reloads the output. | |||
func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { | |||
func SuccessNet(cfg config.Namespace, loadbalance bool, batchEvents, batchBytes, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth defining a wrapper struct for batchEvents
and batchBytes
and passing through a copy of that so that nobody can ever accidentally reverse them in the argument list anywhere they are passed together like this?
|
||
func (c *config) Validate() error { | ||
if c.MaxGetRequest > c.Events { | ||
return errors.New("flush.min_events must be less events") | ||
if c.Bytes != nil && *c.Bytes < minQueueBytes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation and configuration could be unit tested, it is now more complex than before.
if broker.useByteLimits() { | ||
// The queue is using byte limits, start with a buffer of 2^10 and | ||
// we will expand it as needed. | ||
eventBufSize = 1 << 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If your memory no longer has space for the powers of two, this reads bigger than it is. Why not just use 1024 directly?
// The buffer position is the entry's index modulo the buffer size: for | ||
// a queue with buffer size N, the entries stored in buf[0] will have | ||
// entry indices 0, N, 2*N, 3*N, ... | ||
func (l *runLoop) growEventBuffer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems reasonable but seems like a good reason to introduce benchmark tests (testing.B
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Benchmarking growEventBuffer
specifically? We could do that, although (since buffer growth is one-way) we expect this to be called a ~constant number of times on any run. (If the settings and event sizes are such that the queue needs to store a million events simultaneously, that's still only 10 calls to growEventBuffer
over the lifetime of the program, the main bottlenecks are still in actually processing the events.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily growEventBuffer, I just wanted us to stop and evaluate if there was any place microbenchmarking would be beneficial and save us time going through iterations of the end to end performance framework. If the answer is they aren't beneficial that is fine.
I think in this case the most impactful choice is probably the starting size of the buffer since that defines how many initial doublings need to happen to get to whatever steady state is.
As discussed, exposing these configurations immediately may complicate some of our future plans. We could still merge this while we finalize those so the code doesn't rot, but not document the options and explicitly mark them as technical preview so that we are free to change them if we need to. |
I think if we merge this, any use of bytes based options should log a warning that the feature is in technical preview. |
Allow the memory queue's size to be specified in bytes rather than event count. Add
bulk_max_bytes
to the Elasticsearch output config, to specify ingest request sizes in bytes.The main technical difficulties in this change were:
circularBuffer
helper that handles the index arithmetic as the buffer grows, so that event indices can be used unchanged no matter the buffer's current size.FIFO
helper.Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.