Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush queues on collector shutdown #125

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Conversation

maxamins
Copy link
Collaborator

  1. When shutdown is initiated stop more messages from being sent
  2. Drain the shards and send the batches in under 15 seconds

@maxamins maxamins force-pushed the macxamin/flush_queues branch 6 times, most recently from 51048be to 828c352 Compare January 25, 2022 05:00
@maxamins maxamins requested a review from fabxc January 25, 2022 05:03
@andysim3d andysim3d added the bug Something isn't working label Mar 23, 2022
@pintohutch
Copy link
Collaborator

What's the status on this PR?

@maxamins maxamins force-pushed the macxamin/flush_queues branch 2 times, most recently from ad99eb4 to df56956 Compare May 25, 2022 06:03
@maxamins maxamins requested a review from pintohutch May 25, 2022 06:04
pkg/export/export_test.go Outdated Show resolved Hide resolved
@maxamins maxamins force-pushed the macxamin/flush_queues branch 2 times, most recently from c14aba0 to 29e9302 Compare May 25, 2022 21:55
@pintohutch
Copy link
Collaborator

Aside from rebasing - we'll want to benchmark this change to catch any performance regressions - cc @saketjajoo

@maxamins maxamins force-pushed the macxamin/flush_queues branch from 29e9302 to f8cbdad Compare July 17, 2023 20:37
@github-actions github-actions bot requested a review from shishichen July 17, 2023 20:37
@maxamins maxamins force-pushed the macxamin/flush_queues branch from f8cbdad to 326d4a9 Compare July 18, 2023 22:01
1. When shutdown is initiated stop more messages from being sent
2. Drain the shards and send the batches in under 15 seconds
@ridwanmsharif ridwanmsharif requested review from bwplotka and removed request for fabxc and shishichen September 30, 2024 18:09
@ridwanmsharif
Copy link
Collaborator

@pintohutch @bwplotka PTAL, there is GoogleCloudPlatform/prometheus#203 that has some benchmarking results but spoilers: there is no discernible change in performance - which makes sense cause this is exclusively only run on shutdown.

@pintohutch
Copy link
Collaborator

That's great to hear! I'll defer to @bwplotka as the SME to review.

@pintohutch pintohutch removed their request for review October 3, 2024 21:34
// This avoids data loss on shutdown.
cancelTimeout = 15 * time.Second
// Time after the final shards are drained before the exporter is closed on shutdown.
flushTimeout = 100 * time.Millisecond
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 100ms making any difference?

for {
select {
// NOTE(freinartz): we will terminate once context is cancelled and not flush remaining
// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-e.ctx.Done():
// on termination, try to drain the remaining shards within the CancelTimeout.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// on termination, try to drain the remaining shards within the CancelTimeout.
// On termination, try to drain the remaining shards within the CancelTimeout.

}
}
}

for {
select {
// NOTE(freinartz): we will terminate once context is cancelled and not flush remaining
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update this commentary, it's incorrect with the new logic 🤗

@@ -765,13 +783,61 @@ func (e *Exporter) Run() error {
curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
}

// Try to drain the remaining data before exiting or the time limit (15 seconds) expires.
// A sleep timer is added after draining the shards to ensure it has time to be sent.
drainShardsBeforeExiting := func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not a must, but it might be easier to iterate on if we move send and drain to separate Exporter private methods 🤔

Copy link
Collaborator

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Some suggestions might require some work, so let me know if this is something you have time to do, or we accept some imperfection/code spaghetti.

The main issue is to me with proceeding on send work despite reusing e.ctx which was cancelled already. 🤔

for {
select {
// NOTE(freinartz): we will terminate once context is cancelled and not flush remaining
// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-e.ctx.Done():
// on termination, try to drain the remaining shards within the CancelTimeout.
// This is done to prevent data loss during a shutdown.
drainShardsBeforeExiting()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this works? All sends normally take e.ctx context, so they will be cancelled? I assume here you (intentionally?) accept that and drain the buffer with new sends with a separate context.

Would it be cleaner to have a custom, separate context from the beginning? 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact we always use e.ctx even in drain if I look correctly, are we use this code works? 🤔

}
}
if totalRemaining == 0 && !pending {
// NOTE(ridwanmsharif): the sending of the batches happen asyncronously
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, can we improve this? And get that result state from send (or create separate send?)

}
}
}()
for {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we don't do another goroutine and we do all in this for loop? Send is async anyway?

@@ -548,15 +561,15 @@ func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) {

// Export enqueues the samples and exemplars to be written to Cloud Monitoring.
func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemplarMap map[storage.SeriesRef]record.RefExemplar) {
if e.opts.Disable {
gcmExportCalledWhileDisabled.Inc()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's unrelated to the main PR goal, I see Max did this, let's at least update description.

// This is done to prevent data loss during a shutdown.
drainShardsBeforeExiting()
// This channel is used for unit test case.
e.exitc <- struct{}{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we don't have testing specific code in critical, production flow. However, IF we make sure sending propagates some results to us back, we could have a nice production logic that emits log line telling us all was flushed successfully, can we do this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants