Skip to content

Commit

Permalink
changefeedccl: add timer for inner batching sink client flush
Browse files Browse the repository at this point in the history
Add a timer for the inner flushes inside
batchingSink SinkClients, including kafka,
webhook, and pubsub.

Part of: #127784

Release note (enterprise change): Added a timer
for inner batching sink client flushes.
  • Loading branch information
asg0451 committed Oct 14, 2024
1 parent 9e12f67 commit 3118a5e
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 17 deletions.
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type metricsRecorder interface {
makeCloudstorageFileAllocCallback() func(delta int64)
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
netMetrics() *cidr.NetMetrics
timers() *timers.ScopedTimers
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -355,6 +356,14 @@ func (m *sliMetrics) netMetrics() *cidr.NetMetrics {
return m.NetMetrics
}

func (m *sliMetrics) timers() *timers.ScopedTimers {
if m == nil {
return nil
}

return m.Timers
}

// JobScopedUsageMetrics are aggregated metrics keeping track of
// per-changefeed usage metrics by job_id. Note that its members are public so
// they get registered with the metrics registry, but you should NOT modify them
Expand Down Expand Up @@ -674,6 +683,10 @@ func (w *wrappingCostController) netMetrics() *cidr.NetMetrics {
return w.inner.netMetrics()
}

func (w *wrappingCostController) timers() *timers.ScopedTimers {
return w.inner.timers()
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/changefeedccl/sink_kafka_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ type kafkaSinkClientV2 struct {

knobs kafkaSinkV2Knobs
canTryResizing bool
recordResize func(numRecords int64)

topicsForConnectionCheck []string

metrics metricsRecorder

// we need to fetch and keep track of this ourselves since kgo doesnt expose metadata to us
metadataMu struct {
syncutil.Mutex
Expand Down Expand Up @@ -99,12 +100,9 @@ func newKafkaSinkClientV2(
}),
}

recordResize := func(numRecords int64) {}
if m := mb(requiresResourceAccounting); m != nil { // `m` can be nil in tests.
baseOpts = append(baseOpts, kgo.WithHooks(&kgoMetricsAdapter{throttling: m.getKafkaThrottlingMetrics(settings)}))
recordResize = func(numRecords int64) {
m.recordInternalRetry(numRecords, true)
}
metrics := mb(requiresResourceAccounting)
if metrics != nil {
baseOpts = append(baseOpts, kgo.WithHooks(&kgoMetricsAdapter{throttling: metrics.getKafkaThrottlingMetrics(settings)}))
}

clientOpts = append(baseOpts, clientOpts...)
Expand All @@ -129,8 +127,8 @@ func newKafkaSinkClientV2(
knobs: knobs,
batchCfg: batchCfg,
canTryResizing: changefeedbase.BatchReductionRetryEnabled.Get(&settings.SV),
recordResize: recordResize,
topicsForConnectionCheck: topicsForConnectionCheck,
metrics: metrics,
}
c.metadataMu.allTopicPartitions = make(map[string][]int32)

Expand All @@ -145,6 +143,8 @@ func (k *kafkaSinkClientV2) Close() error {

// Flush implements SinkClient. Does not retry -- retries will be handled either by kafka or ParallelIO.
func (k *kafkaSinkClientV2) Flush(ctx context.Context, payload SinkPayload) (retErr error) {
defer k.metrics.timers().BatchingSinkClientFlush.Start()()

msgs := payload.([]*kgo.Record)

var flushMsgs func(msgs []*kgo.Record) error
Expand All @@ -160,11 +160,11 @@ func (k *kafkaSinkClientV2) Flush(ctx context.Context, payload SinkPayload) (ret
// Still, it should probably help.
// Ideally users would set kafka-side max bytes appropriately
// with respect to their average message sizes.
k.recordResize(int64(len(a)))
k.metrics.recordInternalRetry(int64(len(a)), true)
if err := flushMsgs(a); err != nil {
return err
}
k.recordResize(int64(len(b)))
k.metrics.recordInternalRetry(int64(len(b)), true)
if err := flushMsgs(b); err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type pubsubSinkClient struct {
format changefeedbase.FormatType
batchCfg sinkBatchConfig
withTableNameAttribute bool
metrics metricsRecorder
mu struct {
syncutil.RWMutex

Expand Down Expand Up @@ -131,6 +132,7 @@ func makePubsubSinkClient(
batchCfg: batchCfg,
projectID: projectID,
withTableNameAttribute: withTableNameAttribute,
metrics: m,
}
sinkClient.mu.topicCache = make(map[string]struct{})

Expand Down Expand Up @@ -193,6 +195,8 @@ func (sc *pubsubSinkClient) maybeCreateTopic(topic string) error {

// Flush implements the SinkClient interface
func (sc *pubsubSinkClient) Flush(ctx context.Context, payload SinkPayload) error {
defer sc.metrics.timers().BatchingSinkClientFlush.Start()()

publishRequest := payload.(*pb.PublishRequest)

err := sc.maybeCreateTopic(publishRequest.Topic)
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/sink_webhook_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type webhookSinkClient struct {
authHeader string
batchCfg sinkBatchConfig
client *httputil.Client
metrics metricsRecorder
}

var _ SinkClient = (*webhookSinkClient)(nil)
Expand All @@ -77,6 +78,7 @@ func makeWebhookSinkClient(
authHeader: opts.AuthHeader,
format: encodingOpts.Format,
batchCfg: batchCfg,
metrics: m,
}

var connTimeout time.Duration
Expand Down Expand Up @@ -211,6 +213,8 @@ func (sc *webhookSinkClient) FlushResolvedPayload(

// Flush implements the SinkClient interface
func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error {
defer sc.metrics.timers().BatchingSinkClientFlush.Start()()

req := batch.(*http.Request)
b, err := req.GetBody()
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -224,6 +225,10 @@ func (r *telemetryMetricsRecorder) netMetrics() *cidr.NetMetrics {
return r.inner.netMetrics()
}

func (r *telemetryMetricsRecorder) timers() *timers.ScopedTimers {
return r.inner.timers()
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down
15 changes: 8 additions & 7 deletions pkg/ccl/changefeedccl/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Timers struct {
CheckpointJobProgress *aggmetric.AggHistogram
Encode *aggmetric.AggHistogram
EmitRow *aggmetric.AggHistogram
BatchingSinkClientFlush *aggmetric.AggHistogram
KVFeedWaitForTableEvent *aggmetric.AggHistogram
KVFeedBuffer *aggmetric.AggHistogram
RangefeedBufferValue *aggmetric.AggHistogram
Expand All @@ -44,6 +45,7 @@ func New(histogramWindow time.Duration) *Timers {
CheckpointJobProgress: b.Histogram(histogramOptsFor("changefeed.stage.checkpoint_job_progress.latency", "Latency of the changefeed stage: checkpointing job progress")),
Encode: b.Histogram(histogramOptsFor("changefeed.stage.encode.latency", "Latency of the changefeed stage: encoding data")),
EmitRow: b.Histogram(histogramOptsFor("changefeed.stage.emit_row.latency", "Latency of the changefeed stage: emitting row to sink")),
BatchingSinkClientFlush: b.Histogram(histogramOptsFor("changefeed.stage.batching_sink_client_flush.latency", "Latency of the changefeed stage: flushing the inner sink client, for sinks using batching_sink")),
KVFeedWaitForTableEvent: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_wait_for_table_event.latency", "Latency of the changefeed stage: waiting for a table schema event to join to the kv event")),
KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")),
RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")),
Expand All @@ -56,6 +58,7 @@ func (ts *Timers) GetOrCreateScopedTimers(scope string) *ScopedTimers {
CheckpointJobProgress: &timer{ts.CheckpointJobProgress.AddChild(scope)},
Encode: &timer{ts.Encode.AddChild(scope)},
EmitRow: &timer{ts.EmitRow.AddChild(scope)},
BatchingSinkClientFlush: &timer{ts.BatchingSinkClientFlush.AddChild(scope)},
KVFeedWaitForTableEvent: &timer{ts.KVFeedWaitForTableEvent.AddChild(scope)},
KVFeedBuffer: &timer{ts.KVFeedBuffer.AddChild(scope)},
RangefeedBufferValue: &timer{ts.RangefeedBufferValue.AddChild(scope)},
Expand All @@ -67,24 +70,22 @@ type ScopedTimers struct {
CheckpointJobProgress *timer
Encode *timer
EmitRow *timer
BatchingSinkClientFlush *timer
KVFeedWaitForTableEvent *timer
KVFeedBuffer *timer
RangefeedBufferValue *timer
RangefeedBufferCheckpoint *timer
}

func (ts *ScopedTimers) StartTimer(stage *aggmetric.Histogram) func() {
start := timeutil.Now()
return func() {
stage.RecordValue(timeutil.Since(start).Nanoseconds())
}
}

type timer struct {
hist *aggmetric.Histogram
}

func (t *timer) Start() (end func()) {
if t == nil {
return func() {}
}

start := timeutil.Now()
return func() {
t.hist.RecordValue(timeutil.Since(start).Nanoseconds())
Expand Down

0 comments on commit 3118a5e

Please sign in to comment.