Skip to content

Commit

Permalink
Flush queues on collector shutdown
Browse files Browse the repository at this point in the history
1. When shutdown is initiated stop more messages from being sent
2. Drain the shards and send the batches in under 15 seconds
  • Loading branch information
maxamins committed May 25, 2022
1 parent 91d1731 commit ad99eb4
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 13 deletions.
92 changes: 81 additions & 11 deletions pkg/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ var (
Name: "gcm_export_shard_process_pending_total",
Help: "Number of shard retrievals with an empty result.",
})
gcmExportCalledWhileDisabled = prometheus.NewCounter(prometheus.CounterOpts{
Name: "gcm_export_called_while_disabled_total",
Help: "Number of calls to export while metric exporting was disabled.",
})
shardProcessSamplesTaken = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gcm_export_shard_process_samples_taken",
Help: "Number of samples taken when processing a shard.",
Expand Down Expand Up @@ -101,6 +105,9 @@ type Exporter struct {
// be processed.
nextc chan struct{}

// Channel for signaling to exit the exporter. Used to indicate
// data is done exporting during unit tests.
exitc chan struct{}
// The external labels may be updated asynchronously by configuration changes
// and must be locked with mtx.
mtx sync.Mutex
Expand All @@ -121,7 +128,9 @@ const (
// Time after an accumulating batch is flushed to GCM. This avoids data being
// held indefinititely if not enough new data flows in to fill up the batch.
batchDelayMax = 50 * time.Millisecond

// Time after context is cancelled that we use to flush the remaining buffered data.
// This avoids data loss on shutdown.
cancelTimeout = 15 * time.Second
// Prefix for GCM metric.
MetricTypePrefix = "prometheus.googleapis.com"
)
Expand Down Expand Up @@ -270,6 +279,7 @@ func New(logger log.Logger, reg prometheus.Registerer, opts ExporterOpts) (*Expo
pendingRequests,
projectsPerBatch,
samplesPerRPCBatch,
gcmExportCalledWhileDisabled,
)
}

Expand All @@ -295,6 +305,7 @@ func New(logger log.Logger, reg prometheus.Registerer, opts ExporterOpts) (*Expo
opts: opts,
metricClient: metricClient,
nextc: make(chan struct{}, 1),
exitc: make(chan struct{}, 1),
shards: make([]*shard, shardCount),
warnedUntypedMetrics: map[string]struct{}{},
}
Expand Down Expand Up @@ -378,9 +389,22 @@ func (e *Exporter) SetLabelsByIDFunc(f func(uint64) labels.Labels) {
e.seriesCache.getLabelsByRef = f
}

func (e *Exporter) isDisabled() bool {
e.mtx.Lock()
defer e.mtx.Unlock()
return e.opts.Disable
}

func (e *Exporter) disable() {
e.mtx.Lock()
defer e.mtx.Unlock()
e.opts.Disable = true
}

// Export enqueues the samples to be written to Cloud Monitoring.
func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample) {
if e.opts.Disable {
if e.isDisabled() {
gcmExportCalledWhileDisabled.Inc()
return
}

Expand Down Expand Up @@ -434,7 +458,7 @@ func sampleInRange(sample *monitoring_pb.TimeSeries, start, end time.Time) bool
}

func (e *Exporter) enqueue(hash uint64, sample *monitoring_pb.TimeSeries) {
idx := hash % uint64(len(e.shards))
idx := (hash) % uint64(len(e.shards))
e.shards[idx].enqueue(hash, sample)
}

Expand All @@ -454,9 +478,12 @@ const (
// Run sends exported samples to Google Cloud Monitoring. Must be called at most once.
// ApplyConfig must be called once prior to calling Run.
func (e *Exporter) Run(ctx context.Context) error {
exportCtx, exportCancel := context.WithCancel(context.Background())
defer exportCancel()

defer e.metricClient.Close()
go e.seriesCache.run(ctx)
go e.opts.Lease.Run(ctx)
go e.seriesCache.run(exportCtx)
go e.opts.Lease.Run(exportCtx)

timer := time.NewTimer(batchDelayMax)
stopTimer := func() {
Expand Down Expand Up @@ -505,7 +532,7 @@ func (e *Exporter) Run(ctx context.Context) error {
// We could only trigger if we didn't fully empty shards in this batch.
// Benchmarking showed no beneficial impact of this optimization.
e.triggerNext()
}(ctx, curBatch)
}(exportCtx, curBatch)

// Reset state for new batch.
stopTimer()
Expand All @@ -514,14 +541,54 @@ func (e *Exporter) Run(ctx context.Context) error {
curBatch = newBatch(e.logger, e.opts.BatchSize)
}

// Try to drain the remaining data before exiting or the timelimit (15 seconds) expires.
// A sleep timer is added after draining the shards to ensure it has time to be sent.
drainShardsBeforeExiting := func() {
level.Info(e.logger).Log("msg", "Exiting Exporter - will attempt to send remaining data in the next 15 seconds.")
exitTimer := time.NewTimer(cancelTimeout)
drained := make(chan struct{}, 1)
go func() {
for {
totalRemaining := 0
pending := false
for _, shard := range e.shards {
_, remaining := shard.fill(curBatch)
totalRemaining += remaining
pending = pending || shard.pending
if !curBatch.empty() {
send()
}
}
if totalRemaining == 0 && !pending {
// Wait for the final shards to send.
time.Sleep(50 * time.Millisecond)
drained <- struct{}{}
}
}
}()
for {
select {
case <-exitTimer.C:
samplesDropped.WithLabelValues("Data wasn't sent within the timeout limit.")
return
case <-drained:
return
}
}
}

for {
select {
// NOTE(freinartz): we will terminate once context is cancelled and not flush remaining
// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-ctx.Done():
return nil
// On Termination:
// 1. Stop the exporter from recieving new data.
// 2. Try to drain the remaining shards within the CancelTimeout.
// This is done to prevent data loss during a shutdown.
e.disable()
drainShardsBeforeExiting()
// This channel is used for unit test case.
e.exitc <- struct{}{}

// This is activated for each new sample that arrives
case <-e.nextc:
sendIterations.Inc()
Expand All @@ -548,6 +615,9 @@ func (e *Exporter) Run(ctx context.Context) error {
} else {
timer.Reset(batchDelayMax)
}

case <-e.exitc:
return nil
}
}
}
Expand Down
62 changes: 61 additions & 1 deletion pkg/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,70 @@ func TestExporter_drainBacklog(t *testing.T) {
// As our samples are all for the same series, each batch can only contain a single sample.
// The exporter waits for the batch delay duration before sending it.
// We sleep for an appropriate multiple of it to allow it to drain the shard.
time.Sleep(55 * batchDelayMax)

// Check that we received all samples that went in.
if got, want := len(metricServer.samples), 50; got != want {
t.Fatalf("got %d, want %d", got, want)
}
}

func TestExporter_shutdown(t *testing.T) {
var (
srv = grpc.NewServer()
listener = bufconn.Listen(1e6)
metricServer = &testMetricService{}
)
monitoring_pb.RegisterMetricServiceServer(srv, metricServer)

go func() { srv.Serve(listener) }()
defer srv.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bufDialer := func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}
metricClient, err := monitoring.NewMetricClient(ctx,
option.WithoutAuthentication(),
option.WithGRPCDialOption(grpc.WithInsecure()),
option.WithGRPCDialOption(grpc.WithContextDialer(bufDialer)),
)
if err != nil {
t.Fatalf("creating metric client failed: %s", err)
}

e, err := New(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{})
if err != nil {
t.Fatalf("Creating Exporter failed: %s", err)
}

e.metricClient = metricClient
e.SetLabelsByIDFunc(func(i uint64) labels.Labels {
return labels.FromStrings("project_id", "test", "location", "test")
})

exportCtx, cancelExport := context.WithCancel(context.Background())

for i := 0; i < 50; i++ {
e.Export(nil, []record.RefSample{
{Ref: uint64(i), T: int64(i), V: float64(i)},
})
}
go e.Run(exportCtx)

cancelExport()
// Time delay is added to ensure exporter is disabled.
time.Sleep(50 * time.Millisecond)

// These samples will be rejected since the exporter has been cancelled.
for i := 0; i < 10; i++ {
e.Export(nil, []record.RefSample{
{Ref: uint64(i), T: int64(i), V: float64(i)},
})
}
// Wait for exporter to finish flushing shards.
<-e.exitc
// Check that we received all samples that went in.
if got, want := len(metricServer.samples), 50; got != want {
t.Fatalf("got %d, want %d", got, want)
}
}
1 change: 0 additions & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ github.com/golang/protobuf/proto
github.com/golang/protobuf/ptypes
github.com/golang/protobuf/ptypes/any
github.com/golang/protobuf/ptypes/duration
github.com/golang/protobuf/ptypes/empty
github.com/golang/protobuf/ptypes/timestamp
# github.com/golang/snappy v0.0.3
## explicit
Expand Down

0 comments on commit ad99eb4

Please sign in to comment.