diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index e986a54fd8c8..787b1c6f32c8 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -762,6 +762,7 @@ APPLICATIONchangefeed.forwarded_resolved_messagesResolved timestamps forwarded from the change aggregator to the change frontierMessagesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.frontier_updatesNumber of change frontier updates across all feedsUpdatesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.internal_retry_message_countNumber of messages for which an attempt to retry them within an aggregator node was madeMessagesGAUGECOUNTAVGNONE +APPLICATIONchangefeed.kafka_throttling_hist_nanosTime spent in throttling due to exceeding kafka quotaNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONchangefeed.lagging_rangesThe number of ranges considered to be lagging behindRangesGAUGECOUNTAVGNONE APPLICATIONchangefeed.max_behind_nanos(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the presentNanosecondsGAUGENANOSECONDSAVGNONE APPLICATIONchangefeed.message_size_histMessage size histogramBytesHISTOGRAMBYTESAVGNONE diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 2a2f73852a92..73afd7a04578 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -163,6 +163,7 @@ go_library( "@com_github_klauspost_compress//zstd", "@com_github_klauspost_pgzip//:pgzip", "@com_github_linkedin_goavro_v2//:goavro", + "@com_github_rcrowley_go_metrics//:go-metrics", "@com_github_xdg_go_scram//:scram", "@com_google_cloud_go_pubsub//:pubsub", "@com_google_cloud_go_pubsub//apiv1", diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 74aa8eca110b..a0cf74f65366 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -22,10 +22,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/rcrowley/go-metrics" ) const ( @@ -35,6 +37,7 @@ const ( changefeedIOQueueMaxLatency = 5 * time.Minute admitLatencyMaxValue = 1 * time.Minute commitLatencyMaxValue = 10 * time.Minute + kafkaThrottlingTimeMaxValue = 5 * time.Minute ) // max length for the scope name. @@ -76,6 +79,7 @@ type AggMetrics struct { CheckpointProgress *aggmetric.AggGauge LaggingRanges *aggmetric.AggGauge CloudstorageBufferedBytes *aggmetric.AggGauge + KafkaThrottlingNanos *aggmetric.AggHistogram // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -106,6 +110,7 @@ type metricsRecorder interface { newParallelIOMetricsRecorder() parallelIOMetricsRecorder recordSinkIOInflightChange(int64) makeCloudstorageFileAllocCallback() func(delta int64) + newKafkaMetricsGetter() KafkaMetricsGetter } var _ metricsRecorder = (*sliMetrics)(nil) @@ -145,6 +150,7 @@ type sliMetrics struct { CheckpointProgress *aggmetric.Gauge LaggingRanges *aggmetric.Gauge CloudstorageBufferedBytes *aggmetric.Gauge + KafkaThrottlingNanos *aggmetric.Histogram mu struct { syncutil.Mutex @@ -325,6 +331,94 @@ func (m *sliMetrics) recordSizeBasedFlush() { m.SizeBasedFlushes.Inc(1) } +type kafkaHistogramAdapter struct { + wrapped *aggmetric.Histogram +} + +var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil) + +func (k *kafkaHistogramAdapter) Update(valueInMs int64) { + if k != nil { + // valueInMs is passed in from sarama with a unit of milliseconds. To + // convert this value to nanoseconds, valueInMs * 10^6 is recorded here. + k.wrapped.RecordValue(valueInMs * 1000000) + } +} + +func (k *kafkaHistogramAdapter) Clear() { + log.Error(context.Background(), "clear is not expected to be called on kafkaHistogramAdapter") +} + +func (k *kafkaHistogramAdapter) Count() (_ int64) { + log.Error(context.Background(), "count is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Max() (_ int64) { + log.Error(context.Background(), "max is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Mean() (_ float64) { + log.Error(context.Background(), "mean is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Min() (_ int64) { + log.Error(context.Background(), "min is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) { + log.Error(context.Background(), "percentile is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) { + log.Error(context.Background(), "percentiles is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) { + log.Error(context.Background(), "sample is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) { + log.Error(context.Background(), "snapshot is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) StdDev() (_ float64) { + log.Error(context.Background(), "stdDev is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Sum() (_ int64) { + log.Error(context.Background(), "sum is not expected to be called on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Variance() (_ float64) { + log.Error(context.Background(), "variance is not expected to be called on kafkaHistogramAdapter") + return +} + +type KafkaMetricsGetter interface { + GetKafkaThrottlingNanos() *kafkaHistogramAdapter +} + +type kafkaMetricsGetterImpl struct { + kafkaThrottlingNanos *kafkaHistogramAdapter +} + +func (kg *kafkaMetricsGetterImpl) GetKafkaThrottlingNanos() *kafkaHistogramAdapter { + if kg == nil { + return (*kafkaHistogramAdapter)(nil) + } + return kg.kafkaThrottlingNanos +} + type parallelIOMetricsRecorder interface { recordPendingQueuePush(numKeys int64) recordPendingQueuePop(numKeys int64, latency time.Duration) @@ -380,6 +474,17 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder { } } +func (m *sliMetrics) newKafkaMetricsGetter() KafkaMetricsGetter { + if m == nil { + return (*kafkaMetricsGetterImpl)(nil) + } + return &kafkaMetricsGetterImpl{ + kafkaThrottlingNanos: &kafkaHistogramAdapter{ + wrapped: m.KafkaThrottlingNanos, + }, + } +} + func (m *sliMetrics) recordSinkIOInflightChange(delta int64) { if m == nil { return @@ -470,6 +575,10 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric return w.inner.newParallelIOMetricsRecorder() } +func (w *wrappingCostController) newKafkaMetricsGetter() KafkaMetricsGetter { + return w.inner.newKafkaMetricsGetter() +} + var ( metaChangefeedForwardedResolvedMessages = metric.Metadata{ Name: "changefeed.forwarded_resolved_messages", @@ -721,6 +830,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Bytes", Unit: metric.Unit_COUNT, } + metaChangefeedKafkaThrottlingNanos := metric.Metadata{ + Name: "changefeed.kafka_throttling_hist_nanos", + Help: "Time spent in throttling due to exceeding kafka quota", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -813,6 +928,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn), LaggingRanges: b.Gauge(metaLaggingRangePercentage), CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes), + KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{ + Metadata: metaChangefeedKafkaThrottlingNanos, + Duration: histogramWindow, + MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(), + SigFigs: 2, + BucketConfig: metric.BatchProcessLatencyBuckets, + }), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -878,6 +1000,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { SchemaRegistrations: a.SchemaRegistrations.AddChild(scope), LaggingRanges: a.LaggingRanges.AddChild(scope), CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope), + KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope), } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 699fa7c73867..564540eed24d 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" + "github.com/rcrowley/go-metrics" "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" ) @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) { } func buildKafkaConfig( - ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig, + ctx context.Context, + u sinkURL, + jsonStr changefeedbase.SinkSpecificJSONConfig, + kafkaMetricsGetter KafkaMetricsGetter, ) (*sarama.Config, error) { dialConfig, err := buildDialConfig(u) if err != nil { @@ -1090,6 +1094,7 @@ func buildKafkaConfig( config.Producer.Partitioner = newChangefeedPartitioner // Do not fetch metadata for all topics but just for the necessary ones. config.Metadata.Full = false + config.MetricRegistry = newMetricsRegistryInterceptor(kafkaMetricsGetter) if dialConfig.tlsEnabled { config.Net.TLS.Enable = true @@ -1176,7 +1181,8 @@ func makeKafkaSink( return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic) } - config, err := buildKafkaConfig(ctx, u, jsonStr) + m := mb(requiresResourceAccounting) + config, err := buildKafkaConfig(ctx, u, jsonStr, m.newKafkaMetricsGetter()) if err != nil { return nil, err } @@ -1195,7 +1201,7 @@ func makeKafkaSink( ctx: ctx, kafkaCfg: config, bootstrapAddrs: u.Host, - metrics: mb(requiresResourceAccounting), + metrics: m, topics: topics, disableInternalRetry: !internalRetryEnabled, } @@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string { atomic.LoadInt64(&s.largestMessageSize), ) } + +type metricsRegistryInterceptor struct { + metrics.Registry + kafkaThrottlingNanos metrics.Histogram +} + +var _ metrics.Registry = (*metricsRegistryInterceptor)(nil) + +func newMetricsRegistryInterceptor(kafkaMetrics KafkaMetricsGetter) *metricsRegistryInterceptor { + return &metricsRegistryInterceptor{ + Registry: metrics.NewRegistry(), + kafkaThrottlingNanos: kafkaMetrics.GetKafkaThrottlingNanos(), + } +} + +func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} { + const throttleTimeMsMetricsPrefix = "throttle-time-in-ms" + if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) { + return mri.kafkaThrottlingNanos + } else { + return mri.Registry.GetOrRegister(name, i) + } +} diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 8ca6badcbad5..fe9127ac6996 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -216,6 +216,10 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr return r.inner.newParallelIOMetricsRecorder() } +func (r *telemetryMetricsRecorder) newKafkaMetricsGetter() KafkaMetricsGetter { + return r.inner.newKafkaMetricsGetter() +} + // continuousTelemetryInterval determines the interval at which each node emits telemetry events // during the lifespan of each enterprise changefeed. var continuousTelemetryInterval = settings.RegisterDurationSetting(