From c59b5732ff222c2e8bee5fbd063a118c8b7fdc9d Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Mon, 14 Oct 2024 10:48:32 -0400 Subject: [PATCH] changefeedccl: add timer for inner sink client flush Add a timer for the inner flushes inside sink clients. Where possible (batching_sink users, webhook v1, pubsub v1, cloud storage), this is distinct from changefeed.batch_hist_nanos in that in only captures time spent flushing data to the downstream endpoint, not time spent buffering data. This also fixes a bug where timers were not correctly registered with the metric system. Part of: #127784 Release note (enterprise change): Added a timer for inner sink client flushes. Fixed a bug where timers were not correctly registered with the metric system. --- docs/generated/metrics/metrics.html | 8 ++++++++ pkg/ccl/changefeedccl/batching_sink.go | 2 ++ pkg/ccl/changefeedccl/metrics.go | 13 +++++++++++++ pkg/ccl/changefeedccl/sink_cloudstorage.go | 1 + pkg/ccl/changefeedccl/sink_kafka.go | 12 +++++++++++- pkg/ccl/changefeedccl/sink_pubsub.go | 2 ++ pkg/ccl/changefeedccl/sink_pulsar.go | 9 ++++++++- pkg/ccl/changefeedccl/sink_webhook.go | 4 +++- pkg/ccl/changefeedccl/telemetry.go | 5 +++++ pkg/ccl/changefeedccl/timers/timers.go | 19 +++++++++++++------ 10 files changed, 66 insertions(+), 9 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 4d8f7d88dc8f..a394401e312f 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -929,6 +929,14 @@ APPLICATIONchangefeed.sink_errorsNumber of changefeed errors caused by the sinkCountCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.sink_io_inflightThe number of keys currently inflight as IO requests being sent to the sinkMessagesGAUGECOUNTAVGNONE APPLICATIONchangefeed.size_based_flushesTotal size based flushes across all feedsFlushesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONchangefeed.stage.checkpoint_job_progress.latencyLatency of the changefeed stage: checkpointing job progressLatencyHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.stage.downstream_client_send.latencyLatency of the changefeed stage: flushing messages from the sink's client to its downstream. This includes sends that failed for most but not all sinks.LatencyHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.stage.emit_row.latencyLatency of the changefeed stage: emitting row to sinkLatencyHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.stage.encode.latencyLatency of the changefeed stage: encoding dataLatencyHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.stage.kv_feed_buffer.latencyLatency of the changefeed stage: waiting to buffer kv eventsLatencyHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.stage.kv_feed_wait_for_table_event.latencyLatency of the changefeed stage: waiting for a table schema event to join to the kv eventLatencyHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.stage.rangefeed_buffer_checkpoint.latencyLatency of the changefeed stage: buffering rangefeed checkpoint eventsLatencyHISTOGRAMNANOSECONDSAVGNONE +APPLICATIONchangefeed.stage.rangefeed_buffer_value.latencyLatency of the changefeed stage: buffering rangefeed value eventsLatencyHISTOGRAMNANOSECONDSAVGNONE APPLICATIONchangefeed.total_rangesThe total number of ranges being watched by changefeed aggregatorsRangesGAUGECOUNTAVGNONE APPLICATIONchangefeed.usage.error_countCount of errors encountered while generating usage metrics for changefeedsErrorsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.usage.query_durationTime taken by the queries used to generate usage metrics for changefeedsNanosecondsHISTOGRAMNANOSECONDSAVGNONE diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index 04511f9f7ea0..d4bec012c5cf 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -347,6 +347,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { batch, _ := req.(*sinkBatch) defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages)) s.metrics.recordSinkIOInflightChange(int64(batch.numMessages)) + defer s.metrics.timers().DownstreamClientSend.Start()() + return s.client.Flush(ctx, batch.payload) } ioEmitter := NewParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics, s.settings) diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index c3124be8c32d..d9537bd5590b 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -121,6 +121,7 @@ type metricsRecorder interface { makeCloudstorageFileAllocCallback() func(delta int64) getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram netMetrics() *cidr.NetMetrics + timers() *timers.ScopedTimers } var _ metricsRecorder = (*sliMetrics)(nil) @@ -355,6 +356,14 @@ func (m *sliMetrics) netMetrics() *cidr.NetMetrics { return m.NetMetrics } +func (m *sliMetrics) timers() *timers.ScopedTimers { + if m == nil { + return timers.NoopScopedTimers + } + + return m.Timers +} + // JobScopedUsageMetrics are aggregated metrics keeping track of // per-changefeed usage metrics by job_id. Note that its members are public so // they get registered with the metrics registry, but you should NOT modify them @@ -674,6 +683,10 @@ func (w *wrappingCostController) netMetrics() *cidr.NetMetrics { return w.inner.netMetrics() } +func (w *wrappingCostController) timers() *timers.ScopedTimers { + return w.inner.timers() +} + var ( metaChangefeedForwardedResolvedMessages = metric.Metadata{ Name: "changefeed.forwarded_resolved_messages", diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index b0fd796a671a..c870b1c8cfd3 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -875,6 +875,7 @@ func (f *cloudStorageSinkFile) flushToStorage( ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder, ) error { defer f.releaseAlloc(ctx) + defer m.timers().DownstreamClientSend.Start()() if f.rawSize == 0 { // This method shouldn't be called with an empty file, but be defensive diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index ccc6d690ba26..5e8e6e561b30 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -370,11 +370,21 @@ func (s *kafkaSink) EmitRow( return err } + // Since we cannot distinguish between time spent buffering vs emitting + // inside sarama, set the DownstreamClientSend timer to the same value as + // BatchHistNanos. + recordOneMessageCb := s.metrics.recordOneMessage() + downstreamClientSendCb := s.metrics.timers().DownstreamClientSend.Start() + updateMetrics := func(mvcc hlc.Timestamp, bytes int, compressedBytes int) { + recordOneMessageCb(mvcc, bytes, compressedBytes) + downstreamClientSendCb() + } + msg := &sarama.ProducerMessage{ Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(value), - Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: s.metrics.recordOneMessage()}, + Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: updateMetrics}, } s.stats.startMessage(int64(msg.Key.Length() + msg.Value.Length())) return s.emitMessage(ctx, msg) diff --git a/pkg/ccl/changefeedccl/sink_pubsub.go b/pkg/ccl/changefeedccl/sink_pubsub.go index 2c17b68cbb66..b4827ee7e739 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub.go +++ b/pkg/ccl/changefeedccl/sink_pubsub.go @@ -520,6 +520,8 @@ func (p *deprecatedGcpPubsubClient) close() error { // sendMessage sends a message to the topic func (p *deprecatedGcpPubsubClient) sendMessage(m []byte, topic string, key string) error { + defer p.metrics.timers().DownstreamClientSend.Start()() + t, err := p.getTopicClient(topic) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/sink_pulsar.go b/pkg/ccl/changefeedccl/sink_pulsar.go index 043c9a297823..7aac904c948b 100644 --- a/pkg/ccl/changefeedccl/sink_pulsar.go +++ b/pkg/ccl/changefeedccl/sink_pulsar.go @@ -254,9 +254,16 @@ func (p *pulsarSink) Flush(ctx context.Context) error { func (p *pulsarSink) msgCallback( ctx context.Context, a kvevent.Alloc, mvcc hlc.Timestamp, ) func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + // Since we cannot distinguish between time spent buffering inside the + // pulsar client and time spent sending the message, we just set the value + // of DownstreamClientSend equal to BatchHistNanos. + sendCb := p.metrics.timers().DownstreamClientSend.Start() + oneMsgCb := p.metrics.recordOneMessage() + return func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + sendCb() if err == nil { - p.metrics.recordOneMessage()(mvcc, len(message.Payload), len(message.Payload)) + oneMsgCb(mvcc, len(message.Payload), len(message.Payload)) } else { p.setError(id, message, err) } diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index e79aae8e4272..5d96f7b616da 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -576,7 +576,9 @@ func (s *deprecatedWebhookSink) sendMessageWithRetries( return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc) } -func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) (retErr error) { + defer s.metrics.timers().DownstreamClientSend.Start()() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody)) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 0e2c50187485..22307963b3d2 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -224,6 +225,10 @@ func (r *telemetryMetricsRecorder) netMetrics() *cidr.NetMetrics { return r.inner.netMetrics() } +func (r *telemetryMetricsRecorder) timers() *timers.ScopedTimers { + return r.inner.timers() +} + // continuousTelemetryInterval determines the interval at which each node emits telemetry events // during the lifespan of each enterprise changefeed. var continuousTelemetryInterval = settings.RegisterDurationSetting( diff --git a/pkg/ccl/changefeedccl/timers/timers.go b/pkg/ccl/changefeedccl/timers/timers.go index 414eb5ddd735..a4517e3ed1f5 100644 --- a/pkg/ccl/changefeedccl/timers/timers.go +++ b/pkg/ccl/changefeedccl/timers/timers.go @@ -18,12 +18,17 @@ type Timers struct { CheckpointJobProgress *aggmetric.AggHistogram Encode *aggmetric.AggHistogram EmitRow *aggmetric.AggHistogram + DownstreamClientSend *aggmetric.AggHistogram KVFeedWaitForTableEvent *aggmetric.AggHistogram KVFeedBuffer *aggmetric.AggHistogram RangefeedBufferValue *aggmetric.AggHistogram RangefeedBufferCheckpoint *aggmetric.AggHistogram } +func (*Timers) MetricStruct() {} + +var _ metric.Struct = &Timers{} + func New(histogramWindow time.Duration) *Timers { histogramOptsFor := func(name, desc string) metric.HistogramOptions { return metric.HistogramOptions{ @@ -44,6 +49,7 @@ func New(histogramWindow time.Duration) *Timers { CheckpointJobProgress: b.Histogram(histogramOptsFor("changefeed.stage.checkpoint_job_progress.latency", "Latency of the changefeed stage: checkpointing job progress")), Encode: b.Histogram(histogramOptsFor("changefeed.stage.encode.latency", "Latency of the changefeed stage: encoding data")), EmitRow: b.Histogram(histogramOptsFor("changefeed.stage.emit_row.latency", "Latency of the changefeed stage: emitting row to sink")), + DownstreamClientSend: b.Histogram(histogramOptsFor("changefeed.stage.downstream_client_send.latency", "Latency of the changefeed stage: flushing messages from the sink's client to its downstream. This includes sends that failed for most but not all sinks.")), KVFeedWaitForTableEvent: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_wait_for_table_event.latency", "Latency of the changefeed stage: waiting for a table schema event to join to the kv event")), KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")), RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")), @@ -56,6 +62,7 @@ func (ts *Timers) GetOrCreateScopedTimers(scope string) *ScopedTimers { CheckpointJobProgress: &timer{ts.CheckpointJobProgress.AddChild(scope)}, Encode: &timer{ts.Encode.AddChild(scope)}, EmitRow: &timer{ts.EmitRow.AddChild(scope)}, + DownstreamClientSend: &timer{ts.DownstreamClientSend.AddChild(scope)}, KVFeedWaitForTableEvent: &timer{ts.KVFeedWaitForTableEvent.AddChild(scope)}, KVFeedBuffer: &timer{ts.KVFeedBuffer.AddChild(scope)}, RangefeedBufferValue: &timer{ts.RangefeedBufferValue.AddChild(scope)}, @@ -67,24 +74,24 @@ type ScopedTimers struct { CheckpointJobProgress *timer Encode *timer EmitRow *timer + DownstreamClientSend *timer KVFeedWaitForTableEvent *timer KVFeedBuffer *timer RangefeedBufferValue *timer RangefeedBufferCheckpoint *timer } -func (ts *ScopedTimers) StartTimer(stage *aggmetric.Histogram) func() { - start := timeutil.Now() - return func() { - stage.RecordValue(timeutil.Since(start).Nanoseconds()) - } -} +var NoopScopedTimers = &ScopedTimers{} type timer struct { hist *aggmetric.Histogram } func (t *timer) Start() (end func()) { + if t == nil { + return func() {} + } + start := timeutil.Now() return func() { t.hist.RecordValue(timeutil.Since(start).Nanoseconds())