Skip to content

Commit

Permalink
Merge pull request #132353 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.2-132092

release-24.2: changefeedccl: improve sink error observability
  • Loading branch information
asg0451 authored Oct 14, 2024
2 parents b3469bc + d41cd77 commit d48843d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@
<tr><td>APPLICATION</td><td>changefeed.schemafeed.table_history_scans</td><td>The number of table history scans during polling</td><td>Counts</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.schemafeed.table_metadata_nanos</td><td>Time blocked while verifying table metadata histories</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.sink_batch_hist_nanos</td><td>Time spent batched in the sink buffer before being flushed and acknowledged</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.sink_errors</td><td>Number of changefeed errors caused by the sink</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.sink_io_inflight</td><td>The number of keys currently inflight as IO requests being sent to the sink</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.size_based_flushes</td><td>Total size based flushes across all feeds</td><td>Flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.total_ranges</td><td>The total number of ranges being watched by changefeed aggregators</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
)
})
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Warningf(ctx, `sink failed to emit row: %v`, err)
c.metrics.SinkErrors.Inc(1)
}
return err
}
if log.V(3) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type AggMetrics struct {
TotalRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram
SinkErrors *aggmetric.AggCounter

Timers *timers.Timers

Expand Down Expand Up @@ -162,6 +163,7 @@ type sliMetrics struct {
TotalRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram
SinkErrors *aggmetric.Counter

Timers *timers.ScopedTimers

Expand Down Expand Up @@ -965,6 +967,12 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaSinkErrors := metric.Metadata{
Name: "changefeed.sink_errors",
Help: "Number of changefeed errors caused by the sink",
Measurement: "Count",
Unit: metric.Unit_COUNT,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand Down Expand Up @@ -1066,6 +1074,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
SinkErrors: b.Counter(metaSinkErrors),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
Expand Down Expand Up @@ -1136,6 +1145,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
TotalRanges: a.TotalRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
SinkErrors: a.SinkErrors.AddChild(scope),

Timers: a.Timers.GetOrCreateScopedTimers(scope),

Expand Down
28 changes: 20 additions & 8 deletions pkg/ccl/changefeedccl/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ type webhookSinkPayload struct {
}

type encodedPayload struct {
data []byte
alloc kvevent.Alloc
emitTime time.Time
mvcc hlc.Timestamp
data []byte
alloc kvevent.Alloc
emitTime time.Time
mvcc hlc.Timestamp
recordCount int
}

func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) {
result := encodedPayload{
emitTime: timeutil.Now(),
emitTime: timeutil.Now(),
recordCount: len(messages),
}

payload := make([]json.RawMessage, len(messages))
Expand Down Expand Up @@ -546,7 +548,7 @@ func (s *deprecatedWebhookSink) workerLoop(workerIndex int) {
s.exitWorkersWithError(err)
return
}
if err := s.sendMessageWithRetries(s.workerCtx, encoded.data); err != nil {
if err := s.sendMessageWithRetries(s.workerCtx, encoded.data, encoded.recordCount); err != nil {
s.exitWorkersWithError(err)
return
}
Expand All @@ -557,9 +559,19 @@ func (s *deprecatedWebhookSink) workerLoop(workerIndex int) {
}
}

func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error {
func (s *deprecatedWebhookSink) sendMessageWithRetries(
ctx context.Context, reqBody []byte, recordCount int,
) error {
firstTry := true
requestFunc := func() error {
if firstTry {
firstTry = false
} else {
s.metrics.recordInternalRetry(int64(recordCount), false)
}

return s.sendMessage(ctx, reqBody)

}
return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc)
}
Expand Down Expand Up @@ -682,7 +694,7 @@ func (s *deprecatedWebhookSink) EmitResolvedTimestamp(
// do worker logic directly here instead (there's no point using workers for
// resolved timestamps since there are no keys and everything must be
// in order)
if err := s.sendMessageWithRetries(ctx, payload); err != nil {
if err := s.sendMessageWithRetries(ctx, payload, 1); err != nil {
s.exitWorkersWithError(err)
return err
}
Expand Down

0 comments on commit d48843d

Please sign in to comment.