From d41cd777d0662b24535dfce96de18205f64c69ef Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Mon, 7 Oct 2024 15:04:22 +0000 Subject: [PATCH] changefeedccl: improve sink error observability Add sink error metric (`changefeed.sink_errors`) and expand reporting of internal retries metric (`changefeed.internal_retry_message_count`) to all sinks that perform internal retries. Part of: #127784 Release note (enterprise change): Add sink error metric (`changefeed.sink_errors`) and expand reporting of internal retries metric (`changefeed.internal_retry_message_count`) to all sinks that perform internal retries. --- docs/generated/metrics/metrics.html | 1 + pkg/ccl/changefeedccl/event_processing.go | 4 ++++ pkg/ccl/changefeedccl/metrics.go | 10 ++++++++ pkg/ccl/changefeedccl/sink_webhook.go | 28 ++++++++++++++++------- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 14c8697e329e..04a064289784 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -849,6 +849,7 @@ APPLICATIONchangefeed.schemafeed.table_history_scansThe number of table history scans during pollingCountsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.schemafeed.table_metadata_nanosTime blocked while verifying table metadata historiesNanosecondsCOUNTERNANOSECONDSAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.sink_batch_hist_nanosTime spent batched in the sink buffer before being flushed and acknowledgedChangefeedsHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.sink_errorsNumber of changefeed errors caused by the sinkCountCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.sink_io_inflightThe number of keys currently inflight as IO requests being sent to the sinkMessagesGAUGECOUNTAVGNONE APPLICATIONchangefeed.size_based_flushesTotal size based flushes across all feedsFlushesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.total_rangesThe total number of ranges being watched by changefeed aggregatorsRangesGAUGECOUNTAVGNONE diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 75d32d8f4bae..fc39687fbdba 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -450,6 +450,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit( ) }) if err != nil { + if !errors.Is(err, context.Canceled) { + log.Warningf(ctx, `sink failed to emit row: %v`, err) + c.metrics.SinkErrors.Inc(1) + } return err } if log.V(3) { diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index a9bc88d3ba56..c3124be8c32d 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -82,6 +82,7 @@ type AggMetrics struct { TotalRanges *aggmetric.AggGauge CloudstorageBufferedBytes *aggmetric.AggGauge KafkaThrottlingNanos *aggmetric.AggHistogram + SinkErrors *aggmetric.AggCounter Timers *timers.Timers @@ -162,6 +163,7 @@ type sliMetrics struct { TotalRanges *aggmetric.Gauge CloudstorageBufferedBytes *aggmetric.Gauge KafkaThrottlingNanos *aggmetric.Histogram + SinkErrors *aggmetric.Counter Timers *timers.ScopedTimers @@ -965,6 +967,12 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + metaSinkErrors := metric.Metadata{ + Name: "changefeed.sink_errors", + Help: "Number of changefeed errors caused by the sink", + Measurement: "Count", + Unit: metric.Unit_COUNT, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -1066,6 +1074,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag SigFigs: 2, BucketConfig: metric.BatchProcessLatencyBuckets, }), + SinkErrors: b.Counter(metaSinkErrors), Timers: timers.New(histogramWindow), NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"), } @@ -1136,6 +1145,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { TotalRanges: a.TotalRanges.AddChild(scope), CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope), KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope), + SinkErrors: a.SinkErrors.AddChild(scope), Timers: a.Timers.GetOrCreateScopedTimers(scope), diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index b9678afd38b8..e79aae8e4272 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -74,15 +74,17 @@ type webhookSinkPayload struct { } type encodedPayload struct { - data []byte - alloc kvevent.Alloc - emitTime time.Time - mvcc hlc.Timestamp + data []byte + alloc kvevent.Alloc + emitTime time.Time + mvcc hlc.Timestamp + recordCount int } func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ - emitTime: timeutil.Now(), + emitTime: timeutil.Now(), + recordCount: len(messages), } payload := make([]json.RawMessage, len(messages)) @@ -546,7 +548,7 @@ func (s *deprecatedWebhookSink) workerLoop(workerIndex int) { s.exitWorkersWithError(err) return } - if err := s.sendMessageWithRetries(s.workerCtx, encoded.data); err != nil { + if err := s.sendMessageWithRetries(s.workerCtx, encoded.data, encoded.recordCount); err != nil { s.exitWorkersWithError(err) return } @@ -557,9 +559,19 @@ func (s *deprecatedWebhookSink) workerLoop(workerIndex int) { } } -func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessageWithRetries( + ctx context.Context, reqBody []byte, recordCount int, +) error { + firstTry := true requestFunc := func() error { + if firstTry { + firstTry = false + } else { + s.metrics.recordInternalRetry(int64(recordCount), false) + } + return s.sendMessage(ctx, reqBody) + } return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc) } @@ -682,7 +694,7 @@ func (s *deprecatedWebhookSink) EmitResolvedTimestamp( // do worker logic directly here instead (there's no point using workers for // resolved timestamps since there are no keys and everything must be // in order) - if err := s.sendMessageWithRetries(ctx, payload); err != nil { + if err := s.sendMessageWithRetries(ctx, payload, 1); err != nil { s.exitWorkersWithError(err) return err }