Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117117: kv: log slow requests on replica level in addition to range level r=shralex a=shralex

Previously, slow requests were only logged at the range level, but the logs did not indicate which replica is slow. Moreover, the SlowRPC metric attempted to represent the number of requests currently being retried, however it was done on the range level and therefore missed a second level of replica-level retries being done underneath.

This PR adds logging on the replica level, removes a confusing log line, and changes the metric to count the number of slow requests in a simpler manner.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-33510
Fixes: #114431

117693: changefeedccl: add observability metrics into sarama code r=rharding6373 a=wenyihu6

Now that this patch (#117544) has been merged, sarama now acknowledges and
reacts to kafka server's throttling messages by slowing down. To provide better
observability into sarama code, this patch adds a metrics registry interceptor
and a new metrics `changefeed.kafka_throttling_hist_nanos` which tracks time (in
nanos) spent in sarama's throttling when cockroachdb exceed the kafka quota.

Fixes: #117618

Release note: changefeed.kafka_throttling_hist_nanos has now been added to
metrics to monitor sarama throttling behavior resulting from exceeding kafka
quota.

118372: sql: fix flake in TestTxnContentionEventsTable r=yuzefovich a=michae2

In causeContention we deliberately hold a transaction open using pg_sleep to block an update statement. The timing we're trying to achieve is:

1. transaction insert
2. update starts and blocks
3. transaction held open using pg_sleep

We were using a WaitGroup to order (2) after (1), but there was no synchronization to ensure (3) came after (2).

This commit adds a retry loop that checks `crdb_internal.cluster_queries` to ensure (3) comes after (2).

Fixes: #118236

Release note: None

118760: builtins: allow VIEWACTIVITY priv to use crdb_internal.request_statem… r=xinhaoz a=xinhaoz

…ent_bundle

Previously only those with the VIEWACTIVITY role could use the crdb_internal.request_statement_bundle builtin. We should allow the VIEWACTIVITY privilege as well since role options are now deprecated. This allow also allow stmt bundle requests to be made from db-console for users with this granted privilege.

Epic: none
Fixes: #118759

Release note (bug fix): Those with VIEWACTIVITY privilege can now request statement bundles using crdb_internal.requets_statement_bundle or via db-console's sql activity page.

118767: release: confirm yum install r=celiala a=rail

This adds `-y` flag to install `yum` without user prompt.

Epic: none
Release note: None

118789: jobs,application_api: replace calls to `skip.Stress` with `skip.Duress` r=celiala a=rickystewart

`skip.Duress()` seems like it should have been used in this case as it gives more time under both `race` and `deadlock`. This will give these tests some extra time if they run in a heavyweight configuration but not "under stress".

Epic: CRDB-8308
Release note: None

118792: kvfollowerreadsccl: skip test under `race` not `stressrace` r=celiala a=rickystewart

Epic: CRDB-8308
Release note: None

118797: bincheck: do not run geos tests on Windows r=celiala a=rail

In #106642 we stopped shipping libgeos on Windows, but didn't update the bincheck test to reflect the change.

Epic: none
Release note: None

Co-authored-by: shralex <[email protected]>
Co-authored-by: Wenyi Hu <[email protected]>
Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: Rail Aliiev <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
7 people committed Feb 6, 2024
9 parents aee73ac + b034714 + a5bcfca + 199a586 + ce81ca1 + 88080b1 + c211e8f + 63cd423 + d623844 commit 804d37e
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 38 deletions.
2 changes: 1 addition & 1 deletion build/deploy-redhat/Dockerfile.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM @repository@:@tag@

RUN microdnf install yum && \
RUN microdnf install -y yum && \
yum -v -y update --all && \
microdnf clean all && \
rm -rf /var/cache/yum
Expand Down
2 changes: 1 addition & 1 deletion build/release/bincheck/bincheck
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ EOF
diff -u expected actual

# Verify libgeos functionality on all platforms except MacOS ARM64 and Windows
if [[ $(uname -om) == "Darwin arm64" ]]; then
if [[ $(uname -om) == "Darwin arm64" || $(uname -o) == "Msys" ]]; then
echo "Skipping libgeos tests"
else
echo "Testing libgeos functionality"
Expand Down
2 changes: 0 additions & 2 deletions build/release/bincheck/download_binary.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ download_and_extract() {
else
curl -sSfL "${binary_url}" > cockroach.zip
7z e -omnt cockroach.zip
mkdir -p mnt/lib
mv mnt/*.dll mnt/lib/
fi

echo "Downloaded ${binary_url}"
Expand Down
4 changes: 3 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@
<tr><td>APPLICATION</td><td>changefeed.forwarded_resolved_messages</td><td>Resolved timestamps forwarded from the change aggregator to the change frontier</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.frontier_updates</td><td>Number of change frontier updates across all feeds</td><td>Updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.internal_retry_message_count</td><td>Number of messages for which an attempt to retry them within an aggregator node was made</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.kafka_throttling_hist_nanos</td><td>Time spent in throttling due to exceeding kafka quota</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.lagging_ranges</td><td>The number of ranges considered to be lagging behind</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>(Deprecated in favor of checkpoint_progress) The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.message_size_hist</td><td>Message size histogram</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down Expand Up @@ -924,6 +925,7 @@
<tr><td>APPLICATION</td><td>distsender.rpc.transferlease.sent</td><td>Number of TransferLease requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.truncatelog.sent</td><td>Number of TruncateLog requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.writebatch.sent</td><td>Number of WriteBatch requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.slow.replicarpcs</td><td>Number of slow replica-bound RPCs.<br/><br/>Note that this is not a good signal for KV health. The remote side of the<br/>RPCs tracked here may experience contention, so an end user can easily<br/>cause values for this metric to be emitted by leaving a transaction open<br/>for a long time and contending with it using a second transaction.</td><td>Requests</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.adopt_iterations</td><td>number of job-adopt iterations performed by the registry</td><td>iterations</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.auto_config_env_runner.currently_idle</td><td>Number of auto_config_env_runner jobs currently considered Idle and can be freely shut down</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.auto_config_env_runner.currently_paused</td><td>Number of auto_config_env_runner jobs currently considered Paused</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down Expand Up @@ -1244,7 +1246,7 @@
<tr><td>APPLICATION</td><td>physical_replication.resolved_events_ingested</td><td>Resolved events ingested by all replication jobs</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.running</td><td>Number of currently running replication streams</td><td>Replication Streams</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.sst_bytes</td><td>SST bytes (compressed) sent to KV by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>requests.slow.distsender</td><td>Number of replica-bound RPCs currently stuck or retrying for a long time.<br/><br/>Note that this is not a good signal for KV health. The remote side of the<br/>RPCs tracked here may experience contention, so an end user can easily<br/>cause values for this metric to be emitted by leaving a transaction open<br/>for a long time and contending with it using a second transaction.</td><td>Requests</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>requests.slow.distsender</td><td>Number of range-bound RPCs currently stuck or retrying for a long time.<br/><br/>Note that this is not a good signal for KV health. The remote side of the<br/>RPCs tracked here may experience contention, so an end user can easily<br/>cause values for this metric to be emitted by leaving a transaction open<br/>for a long time and contending with it using a second transaction.</td><td>Requests</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>round-trip-latency</td><td>Distribution of round-trip latencies with other nodes.<br/><br/>This only reflects successful heartbeats and measures gRPC overhead as well as<br/>possible head-of-line blocking. Elevated values in this metric may hint at<br/>network issues and/or saturation, but they are no proof of them. CPU overload<br/>can similarly elevate this metric. The operator should look towards OS-level<br/>metrics such as packet loss, retransmits, etc, to conclusively diagnose network<br/>issues. Heartbeats are not very frequent (~seconds), so they may not capture<br/>rare or short-lived degradations.<br/></td><td>Round-trip time</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.avg_round_trip_latency</td><td>Sum of exponentially weighted moving average of round-trip latencies, as measured through a gRPC RPC.<br/><br/>Dividing this Gauge by rpc.connection.healthy gives an approximation of average<br/>latency, but the top-level round-trip-latency histogram is more useful. Instead,<br/>users should consult the label families of this metric if they are available<br/>(which requires prometheus and the cluster setting &#39;server.child_metrics.enabled&#39;);<br/>these provide per-peer moving averages.<br/><br/>This metric does not track failed connection. A failed connection&#39;s contribution<br/>is reset to zero.<br/></td><td>Latency</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.failures</td><td>Counter of failed connections.<br/><br/>This includes both the event in which a healthy connection terminates as well as<br/>unsuccessful reconnection attempts.<br/><br/>Connections that are terminated as part of local node shutdown are excluded.<br/>Decommissioned peers are excluded.<br/></td><td>Connections</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ go_library(
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
Expand Down
111 changes: 111 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

const (
Expand All @@ -35,6 +38,7 @@ const (
changefeedIOQueueMaxLatency = 5 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
kafkaThrottlingTimeMaxValue = 5 * time.Minute
)

// max length for the scope name.
Expand Down Expand Up @@ -76,6 +80,7 @@ type AggMetrics struct {
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand Down Expand Up @@ -106,6 +111,7 @@ type metricsRecorder interface {
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -145,6 +151,7 @@ type sliMetrics struct {
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -325,6 +332,80 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}

type kafkaHistogramAdapter struct {
settings *cluster.Settings
wrapped *aggmetric.Histogram
}

var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)

func (k *kafkaHistogramAdapter) Update(valueInMs int64) {
if k != nil {
// valueInMs is passed in from sarama with a unit of milliseconds. To
// convert this value to nanoseconds, valueInMs * 10^6 is recorded here.
k.wrapped.RecordValue(valueInMs * 1000000)
}
}

func (k *kafkaHistogramAdapter) Clear() {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Count() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Count on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Max() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Max on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Mean() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Mean on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Min() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Min on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentile on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentiles on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sample on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Snapshot on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) StdDev() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to StdDev on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sum() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Variance() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Variance on kafkaHistogramAdapter")
return
}

type parallelIOMetricsRecorder interface {
recordPendingQueuePush(numKeys int64)
recordPendingQueuePop(numKeys int64, latency time.Duration)
Expand Down Expand Up @@ -380,6 +461,16 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
}
}

func (m *sliMetrics) getKafkaThrottlingMetrics(settings *cluster.Settings) metrics.Histogram {
if m == nil {
return (*kafkaHistogramAdapter)(nil)
}
return &kafkaHistogramAdapter{
settings: settings,
wrapped: m.KafkaThrottlingNanos,
}
}

func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
Expand Down Expand Up @@ -470,6 +561,12 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric
return w.inner.newParallelIOMetricsRecorder()
}

func (w *wrappingCostController) getKafkaThrottlingMetrics(
settings *cluster.Settings,
) metrics.Histogram {
return w.inner.getKafkaThrottlingMetrics(settings)
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down Expand Up @@ -721,6 +818,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaChangefeedKafkaThrottlingNanos := metric.Metadata{
Name: "changefeed.kafka_throttling_hist_nanos",
Help: "Time spent in throttling due to exceeding kafka quota",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand Down Expand Up @@ -813,6 +916,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedKafkaThrottlingNanos,
Duration: histogramWindow,
MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(),
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -878,6 +988,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
Expand Down
35 changes: 32 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
}

func buildKafkaConfig(
ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig,
ctx context.Context,
u sinkURL,
jsonStr changefeedbase.SinkSpecificJSONConfig,
kafkaThrottlingMetrics metrics.Histogram,
) (*sarama.Config, error) {
dialConfig, err := buildDialConfig(u)
if err != nil {
Expand All @@ -1090,6 +1094,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
config.MetricRegistry = newMetricsRegistryInterceptor(kafkaThrottlingMetrics)

if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
Expand Down Expand Up @@ -1176,7 +1181,8 @@ func makeKafkaSink(
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
}

config, err := buildKafkaConfig(ctx, u, jsonStr)
m := mb(requiresResourceAccounting)
config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings))
if err != nil {
return nil, err
}
Expand All @@ -1195,7 +1201,7 @@ func makeKafkaSink(
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
metrics: mb(requiresResourceAccounting),
metrics: m,
topics: topics,
disableInternalRetry: !internalRetryEnabled,
}
Expand Down Expand Up @@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}

type metricsRegistryInterceptor struct {
metrics.Registry
kafkaThrottlingNanos metrics.Histogram
}

var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)

func newMetricsRegistryInterceptor(kafkaMetrics metrics.Histogram) *metricsRegistryInterceptor {
return &metricsRegistryInterceptor{
Registry: metrics.NewRegistry(),
kafkaThrottlingNanos: kafkaMetrics,
}
}

func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
return mri.kafkaThrottlingNanos
} else {
return mri.Registry.GetOrRegister(name, i)
}
}
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

type sinkTelemetryData struct {
Expand Down Expand Up @@ -216,6 +217,12 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr
return r.inner.newParallelIOMetricsRecorder()
}

func (r *telemetryMetricsRecorder) getKafkaThrottlingMetrics(
settings *cluster.Settings,
) metrics.Histogram {
return r.inner.getKafkaThrottlingMetrics(settings)
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()

skip.UnderStressRace(t, "times out")
skip.UnderRace(t, "times out")
skip.UnderDeadlock(t)

for _, testCase := range []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func TestWaitWithRetryableError(t *testing.T) {
registry.WaitForJobs(
ctx, []jobspb.JobID{id},
))
if !skip.Stress() {
if !skip.Duress() {
require.Equalf(t, int64(targetNumberOfRetries), numberOfTimesDetected.Load(), "jobs query did not retry")
} else {
// For stress be lenient since we are relying on timing for leasing
Expand Down
Loading

0 comments on commit 804d37e

Please sign in to comment.