From ad99eb497d38e91f91d10e21871846b3dfeb3b91 Mon Sep 17 00:00:00 2001 From: Max Amin Date: Tue, 19 Oct 2021 13:41:31 +0000 Subject: [PATCH] Flush queues on collector shutdown 1. When shutdown is initiated stop more messages from being sent 2. Drain the shards and send the batches in under 15 seconds --- pkg/export/export.go | 92 ++++++++++++++++++++++++++++++++++----- pkg/export/export_test.go | 62 +++++++++++++++++++++++++- vendor/modules.txt | 1 - 3 files changed, 142 insertions(+), 13 deletions(-) diff --git a/pkg/export/export.go b/pkg/export/export.go index 50255c3562..d3ff5f4a2f 100644 --- a/pkg/export/export.go +++ b/pkg/export/export.go @@ -65,6 +65,10 @@ var ( Name: "gcm_export_shard_process_pending_total", Help: "Number of shard retrievals with an empty result.", }) + gcmExportCalledWhileDisabled = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gcm_export_called_while_disabled_total", + Help: "Number of calls to export while metric exporting was disabled.", + }) shardProcessSamplesTaken = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "gcm_export_shard_process_samples_taken", Help: "Number of samples taken when processing a shard.", @@ -101,6 +105,9 @@ type Exporter struct { // be processed. nextc chan struct{} + // Channel for signaling to exit the exporter. Used to indicate + // data is done exporting during unit tests. + exitc chan struct{} // The external labels may be updated asynchronously by configuration changes // and must be locked with mtx. mtx sync.Mutex @@ -121,7 +128,9 @@ const ( // Time after an accumulating batch is flushed to GCM. This avoids data being // held indefinititely if not enough new data flows in to fill up the batch. batchDelayMax = 50 * time.Millisecond - + // Time after context is cancelled that we use to flush the remaining buffered data. + // This avoids data loss on shutdown. + cancelTimeout = 15 * time.Second // Prefix for GCM metric. MetricTypePrefix = "prometheus.googleapis.com" ) @@ -270,6 +279,7 @@ func New(logger log.Logger, reg prometheus.Registerer, opts ExporterOpts) (*Expo pendingRequests, projectsPerBatch, samplesPerRPCBatch, + gcmExportCalledWhileDisabled, ) } @@ -295,6 +305,7 @@ func New(logger log.Logger, reg prometheus.Registerer, opts ExporterOpts) (*Expo opts: opts, metricClient: metricClient, nextc: make(chan struct{}, 1), + exitc: make(chan struct{}, 1), shards: make([]*shard, shardCount), warnedUntypedMetrics: map[string]struct{}{}, } @@ -378,9 +389,22 @@ func (e *Exporter) SetLabelsByIDFunc(f func(uint64) labels.Labels) { e.seriesCache.getLabelsByRef = f } +func (e *Exporter) isDisabled() bool { + e.mtx.Lock() + defer e.mtx.Unlock() + return e.opts.Disable +} + +func (e *Exporter) disable() { + e.mtx.Lock() + defer e.mtx.Unlock() + e.opts.Disable = true +} + // Export enqueues the samples to be written to Cloud Monitoring. func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample) { - if e.opts.Disable { + if e.isDisabled() { + gcmExportCalledWhileDisabled.Inc() return } @@ -434,7 +458,7 @@ func sampleInRange(sample *monitoring_pb.TimeSeries, start, end time.Time) bool } func (e *Exporter) enqueue(hash uint64, sample *monitoring_pb.TimeSeries) { - idx := hash % uint64(len(e.shards)) + idx := (hash) % uint64(len(e.shards)) e.shards[idx].enqueue(hash, sample) } @@ -454,9 +478,12 @@ const ( // Run sends exported samples to Google Cloud Monitoring. Must be called at most once. // ApplyConfig must be called once prior to calling Run. func (e *Exporter) Run(ctx context.Context) error { + exportCtx, exportCancel := context.WithCancel(context.Background()) + defer exportCancel() + defer e.metricClient.Close() - go e.seriesCache.run(ctx) - go e.opts.Lease.Run(ctx) + go e.seriesCache.run(exportCtx) + go e.opts.Lease.Run(exportCtx) timer := time.NewTimer(batchDelayMax) stopTimer := func() { @@ -505,7 +532,7 @@ func (e *Exporter) Run(ctx context.Context) error { // We could only trigger if we didn't fully empty shards in this batch. // Benchmarking showed no beneficial impact of this optimization. e.triggerNext() - }(ctx, curBatch) + }(exportCtx, curBatch) // Reset state for new batch. stopTimer() @@ -514,14 +541,54 @@ func (e *Exporter) Run(ctx context.Context) error { curBatch = newBatch(e.logger, e.opts.BatchSize) } + // Try to drain the remaining data before exiting or the timelimit (15 seconds) expires. + // A sleep timer is added after draining the shards to ensure it has time to be sent. + drainShardsBeforeExiting := func() { + level.Info(e.logger).Log("msg", "Exiting Exporter - will attempt to send remaining data in the next 15 seconds.") + exitTimer := time.NewTimer(cancelTimeout) + drained := make(chan struct{}, 1) + go func() { + for { + totalRemaining := 0 + pending := false + for _, shard := range e.shards { + _, remaining := shard.fill(curBatch) + totalRemaining += remaining + pending = pending || shard.pending + if !curBatch.empty() { + send() + } + } + if totalRemaining == 0 && !pending { + // Wait for the final shards to send. + time.Sleep(50 * time.Millisecond) + drained <- struct{}{} + } + } + }() + for { + select { + case <-exitTimer.C: + samplesDropped.WithLabelValues("Data wasn't sent within the timeout limit.") + return + case <-drained: + return + } + } + } + for { select { - // NOTE(freinartz): we will terminate once context is cancelled and not flush remaining - // buffered data. In-flight requests will be aborted as well. - // This is fine once we persist data submitted via Export() but for now there may be some - // data loss on shutdown. case <-ctx.Done(): - return nil + // On Termination: + // 1. Stop the exporter from recieving new data. + // 2. Try to drain the remaining shards within the CancelTimeout. + // This is done to prevent data loss during a shutdown. + e.disable() + drainShardsBeforeExiting() + // This channel is used for unit test case. + e.exitc <- struct{}{} + // This is activated for each new sample that arrives case <-e.nextc: sendIterations.Inc() @@ -548,6 +615,9 @@ func (e *Exporter) Run(ctx context.Context) error { } else { timer.Reset(batchDelayMax) } + + case <-e.exitc: + return nil } } } diff --git a/pkg/export/export_test.go b/pkg/export/export_test.go index fc76be258d..4be1c5e9cb 100644 --- a/pkg/export/export_test.go +++ b/pkg/export/export_test.go @@ -380,10 +380,70 @@ func TestExporter_drainBacklog(t *testing.T) { // As our samples are all for the same series, each batch can only contain a single sample. // The exporter waits for the batch delay duration before sending it. // We sleep for an appropriate multiple of it to allow it to drain the shard. - time.Sleep(55 * batchDelayMax) // Check that we received all samples that went in. if got, want := len(metricServer.samples), 50; got != want { t.Fatalf("got %d, want %d", got, want) } } + +func TestExporter_shutdown(t *testing.T) { + var ( + srv = grpc.NewServer() + listener = bufconn.Listen(1e6) + metricServer = &testMetricService{} + ) + monitoring_pb.RegisterMetricServiceServer(srv, metricServer) + + go func() { srv.Serve(listener) }() + defer srv.Stop() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bufDialer := func(context.Context, string) (net.Conn, error) { + return listener.Dial() + } + metricClient, err := monitoring.NewMetricClient(ctx, + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithInsecure()), + option.WithGRPCDialOption(grpc.WithContextDialer(bufDialer)), + ) + if err != nil { + t.Fatalf("creating metric client failed: %s", err) + } + + e, err := New(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{}) + if err != nil { + t.Fatalf("Creating Exporter failed: %s", err) + } + + e.metricClient = metricClient + e.SetLabelsByIDFunc(func(i uint64) labels.Labels { + return labels.FromStrings("project_id", "test", "location", "test") + }) + + exportCtx, cancelExport := context.WithCancel(context.Background()) + + for i := 0; i < 50; i++ { + e.Export(nil, []record.RefSample{ + {Ref: uint64(i), T: int64(i), V: float64(i)}, + }) + } + go e.Run(exportCtx) + + cancelExport() + // Time delay is added to ensure exporter is disabled. + time.Sleep(50 * time.Millisecond) + + // These samples will be rejected since the exporter has been cancelled. + for i := 0; i < 10; i++ { + e.Export(nil, []record.RefSample{ + {Ref: uint64(i), T: int64(i), V: float64(i)}, + }) + } + // Wait for exporter to finish flushing shards. + <-e.exitc + // Check that we received all samples that went in. + if got, want := len(metricServer.samples), 50; got != want { + t.Fatalf("got %d, want %d", got, want) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 01a24cf482..5bf5a247e0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -114,7 +114,6 @@ github.com/golang/protobuf/proto github.com/golang/protobuf/ptypes github.com/golang/protobuf/ptypes/any github.com/golang/protobuf/ptypes/duration -github.com/golang/protobuf/ptypes/empty github.com/golang/protobuf/ptypes/timestamp # github.com/golang/snappy v0.0.3 ## explicit