From ad3b6db238e1dbf753fdf51447e7031b2a0b884a Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Thu, 11 Jan 2024 14:59:49 -0500 Subject: [PATCH] changefeedccl: add observability metrics into sarama code Now that this patch (#117544) has been merged, sarama now acknowledges and reacts to kafka server's throttling messages by slowing down. To provide better observability into sarama code, this patch adds a metrics registry interceptor and a new metrics `changefeed.kafka_throttling_hist_nanos` which tracks time (in nanos) spent in sarama's throttling when cockroachdb exceed the kafka quota. Fixes: https://github.com/cockroachdb/cockroach/issues/117618 Release note: changefeed.kafka_throttling_hist_nanos has now been added to metrics to monitor sarama throttling behavior resulting from exceeding kafka quota. --- docs/generated/metrics/metrics.html | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/metrics.go | 111 ++++++++++++++++++++++++++++ pkg/ccl/changefeedccl/sink_kafka.go | 35 ++++++++- pkg/ccl/changefeedccl/telemetry.go | 7 ++ 5 files changed, 152 insertions(+), 3 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 68a9c27d3e3b..5ad567ffe388 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -762,6 +762,7 @@ APPLICATIONchangefeed.forwarded_resolved_messagesResolved timestamps forwarded from the change aggregator to the change frontierMessagesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.frontier_updatesNumber of change frontier updates across all feedsUpdatesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.internal_retry_message_countNumber of messages for which an attempt to retry them within an aggregator node was madeMessagesGAUGECOUNTAVGNONE +APPLICATIONchangefeed.kafka_throttling_hist_nanosTime spent in throttling due to exceeding kafka quotaNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONchangefeed.lagging_rangesThe number of ranges considered to be lagging behindRangesGAUGECOUNTAVGNONE APPLICATIONchangefeed.max_behind_nanos(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the presentNanosecondsGAUGENANOSECONDSAVGNONE APPLICATIONchangefeed.message_size_histMessage size histogramBytesHISTOGRAMBYTESAVGNONE diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 42cd38404dd0..c854a46a0943 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -163,6 +163,7 @@ go_library( "@com_github_klauspost_pgzip//:pgzip", "@com_github_linkedin_goavro_v2//:goavro", "@com_github_shopify_sarama//:sarama", + "@com_github_rcrowley_go_metrics//:go-metrics", "@com_github_xdg_go_scram//:scram", "@com_google_cloud_go_pubsub//:pubsub", "@com_google_cloud_go_pubsub//apiv1", diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 74aa8eca110b..10d941642bd0 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -19,13 +19,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/rcrowley/go-metrics" ) const ( @@ -35,6 +38,7 @@ const ( changefeedIOQueueMaxLatency = 5 * time.Minute admitLatencyMaxValue = 1 * time.Minute commitLatencyMaxValue = 10 * time.Minute + kafkaThrottlingTimeMaxValue = 5 * time.Minute ) // max length for the scope name. @@ -76,6 +80,7 @@ type AggMetrics struct { CheckpointProgress *aggmetric.AggGauge LaggingRanges *aggmetric.AggGauge CloudstorageBufferedBytes *aggmetric.AggGauge + KafkaThrottlingNanos *aggmetric.AggHistogram // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -106,6 +111,7 @@ type metricsRecorder interface { newParallelIOMetricsRecorder() parallelIOMetricsRecorder recordSinkIOInflightChange(int64) makeCloudstorageFileAllocCallback() func(delta int64) + getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram } var _ metricsRecorder = (*sliMetrics)(nil) @@ -145,6 +151,7 @@ type sliMetrics struct { CheckpointProgress *aggmetric.Gauge LaggingRanges *aggmetric.Gauge CloudstorageBufferedBytes *aggmetric.Gauge + KafkaThrottlingNanos *aggmetric.Histogram mu struct { syncutil.Mutex @@ -325,6 +332,80 @@ func (m *sliMetrics) recordSizeBasedFlush() { m.SizeBasedFlushes.Inc(1) } +type kafkaHistogramAdapter struct { + settings *cluster.Settings + wrapped *aggmetric.Histogram +} + +var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil) + +func (k *kafkaHistogramAdapter) Update(valueInMs int64) { + if k != nil { + // valueInMs is passed in from sarama with a unit of milliseconds. To + // convert this value to nanoseconds, valueInMs * 10^6 is recorded here. + k.wrapped.RecordValue(valueInMs * 1000000) + } +} + +func (k *kafkaHistogramAdapter) Clear() { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter") +} + +func (k *kafkaHistogramAdapter) Count() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Count on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Max() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Max on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Mean() (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Mean on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Min() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Min on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentile on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentiles on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sample on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Snapshot on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) StdDev() (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to StdDev on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Sum() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Variance() (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Variance on kafkaHistogramAdapter") + return +} + type parallelIOMetricsRecorder interface { recordPendingQueuePush(numKeys int64) recordPendingQueuePop(numKeys int64, latency time.Duration) @@ -380,6 +461,16 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder { } } +func (m *sliMetrics) getKafkaThrottlingMetrics(settings *cluster.Settings) metrics.Histogram { + if m == nil { + return (*kafkaHistogramAdapter)(nil) + } + return &kafkaHistogramAdapter{ + settings: settings, + wrapped: m.KafkaThrottlingNanos, + } +} + func (m *sliMetrics) recordSinkIOInflightChange(delta int64) { if m == nil { return @@ -470,6 +561,12 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric return w.inner.newParallelIOMetricsRecorder() } +func (w *wrappingCostController) getKafkaThrottlingMetrics( + settings *cluster.Settings, +) metrics.Histogram { + return w.inner.getKafkaThrottlingMetrics(settings) +} + var ( metaChangefeedForwardedResolvedMessages = metric.Metadata{ Name: "changefeed.forwarded_resolved_messages", @@ -721,6 +818,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Bytes", Unit: metric.Unit_COUNT, } + metaChangefeedKafkaThrottlingNanos := metric.Metadata{ + Name: "changefeed.kafka_throttling_hist_nanos", + Help: "Time spent in throttling due to exceeding kafka quota", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -813,6 +916,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn), LaggingRanges: b.Gauge(metaLaggingRangePercentage), CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes), + KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{ + Metadata: metaChangefeedKafkaThrottlingNanos, + Duration: histogramWindow, + MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(), + SigFigs: 2, + BucketConfig: metric.BatchProcessLatencyBuckets, + }), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -878,6 +988,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { SchemaRegistrations: a.SchemaRegistrations.AddChild(scope), LaggingRanges: a.LaggingRanges.AddChild(scope), CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope), + KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope), } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 74327b98ca96..5881454773e3 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" + "github.com/rcrowley/go-metrics" "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" ) @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) { } func buildKafkaConfig( - ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig, + ctx context.Context, + u sinkURL, + jsonStr changefeedbase.SinkSpecificJSONConfig, + kafkaThrottlingMetrics metrics.Histogram, ) (*sarama.Config, error) { dialConfig, err := buildDialConfig(u) if err != nil { @@ -1090,6 +1094,7 @@ func buildKafkaConfig( config.Producer.Partitioner = newChangefeedPartitioner // Do not fetch metadata for all topics but just for the necessary ones. config.Metadata.Full = false + config.MetricRegistry = newMetricsRegistryInterceptor(kafkaThrottlingMetrics) if dialConfig.tlsEnabled { config.Net.TLS.Enable = true @@ -1176,7 +1181,8 @@ func makeKafkaSink( return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic) } - config, err := buildKafkaConfig(ctx, u, jsonStr) + m := mb(requiresResourceAccounting) + config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings)) if err != nil { return nil, err } @@ -1195,7 +1201,7 @@ func makeKafkaSink( ctx: ctx, kafkaCfg: config, bootstrapAddrs: u.Host, - metrics: mb(requiresResourceAccounting), + metrics: m, topics: topics, disableInternalRetry: !internalRetryEnabled, } @@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string { atomic.LoadInt64(&s.largestMessageSize), ) } + +type metricsRegistryInterceptor struct { + metrics.Registry + kafkaThrottlingNanos metrics.Histogram +} + +var _ metrics.Registry = (*metricsRegistryInterceptor)(nil) + +func newMetricsRegistryInterceptor(kafkaMetrics metrics.Histogram) *metricsRegistryInterceptor { + return &metricsRegistryInterceptor{ + Registry: metrics.NewRegistry(), + kafkaThrottlingNanos: kafkaMetrics, + } +} + +func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} { + const throttleTimeMsMetricsPrefix = "throttle-time-in-ms" + if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) { + return mri.kafkaThrottlingNanos + } else { + return mri.Registry.GetOrRegister(name, i) + } +} diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 8ca6badcbad5..b4fd9c566f7e 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/rcrowley/go-metrics" ) type sinkTelemetryData struct { @@ -216,6 +217,12 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr return r.inner.newParallelIOMetricsRecorder() } +func (r *telemetryMetricsRecorder) getKafkaThrottlingMetrics( + settings *cluster.Settings, +) metrics.Histogram { + return r.inner.getKafkaThrottlingMetrics(settings) +} + // continuousTelemetryInterval determines the interval at which each node emits telemetry events // during the lifespan of each enterprise changefeed. var continuousTelemetryInterval = settings.RegisterDurationSetting(