Skip to content

Commit

Permalink
Merge pull request #133288 from asg0451/backport24.3-132572
Browse files Browse the repository at this point in the history
release-24.3: changefeedccl: add timer for inner batching sink client flush
  • Loading branch information
asg0451 authored Oct 24, 2024
2 parents 8921bdb + c59b573 commit 5e09bf2
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 9 deletions.
8 changes: 8 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,14 @@
<tr><td>APPLICATION</td><td>changefeed.sink_errors</td><td>Number of changefeed errors caused by the sink</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.sink_io_inflight</td><td>The number of keys currently inflight as IO requests being sent to the sink</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.size_based_flushes</td><td>Total size based flushes across all feeds</td><td>Flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.checkpoint_job_progress.latency</td><td>Latency of the changefeed stage: checkpointing job progress</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.downstream_client_send.latency</td><td>Latency of the changefeed stage: flushing messages from the sink&#39;s client to its downstream. This includes sends that failed for most but not all sinks.</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.emit_row.latency</td><td>Latency of the changefeed stage: emitting row to sink</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.encode.latency</td><td>Latency of the changefeed stage: encoding data</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.kv_feed_buffer.latency</td><td>Latency of the changefeed stage: waiting to buffer kv events</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.kv_feed_wait_for_table_event.latency</td><td>Latency of the changefeed stage: waiting for a table schema event to join to the kv event</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.rangefeed_buffer_checkpoint.latency</td><td>Latency of the changefeed stage: buffering rangefeed checkpoint events</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.stage.rangefeed_buffer_value.latency</td><td>Latency of the changefeed stage: buffering rangefeed value events</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.total_ranges</td><td>The total number of ranges being watched by changefeed aggregators</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.error_count</td><td>Count of errors encountered while generating usage metrics for changefeeds</td><td>Errors</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.usage.query_duration</td><td>Time taken by the queries used to generate usage metrics for changefeeds</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
batch, _ := req.(*sinkBatch)
defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages))
s.metrics.recordSinkIOInflightChange(int64(batch.numMessages))
defer s.metrics.timers().DownstreamClientSend.Start()()

return s.client.Flush(ctx, batch.payload)
}
ioEmitter := NewParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics, s.settings)
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type metricsRecorder interface {
makeCloudstorageFileAllocCallback() func(delta int64)
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
netMetrics() *cidr.NetMetrics
timers() *timers.ScopedTimers
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -355,6 +356,14 @@ func (m *sliMetrics) netMetrics() *cidr.NetMetrics {
return m.NetMetrics
}

func (m *sliMetrics) timers() *timers.ScopedTimers {
if m == nil {
return timers.NoopScopedTimers
}

return m.Timers
}

// JobScopedUsageMetrics are aggregated metrics keeping track of
// per-changefeed usage metrics by job_id. Note that its members are public so
// they get registered with the metrics registry, but you should NOT modify them
Expand Down Expand Up @@ -674,6 +683,10 @@ func (w *wrappingCostController) netMetrics() *cidr.NetMetrics {
return w.inner.netMetrics()
}

func (w *wrappingCostController) timers() *timers.ScopedTimers {
return w.inner.timers()
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ func (f *cloudStorageSinkFile) flushToStorage(
ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder,
) error {
defer f.releaseAlloc(ctx)
defer m.timers().DownstreamClientSend.Start()()

if f.rawSize == 0 {
// This method shouldn't be called with an empty file, but be defensive
Expand Down
12 changes: 11 additions & 1 deletion pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,21 @@ func (s *kafkaSink) EmitRow(
return err
}

// Since we cannot distinguish between time spent buffering vs emitting
// inside sarama, set the DownstreamClientSend timer to the same value as
// BatchHistNanos.
recordOneMessageCb := s.metrics.recordOneMessage()
downstreamClientSendCb := s.metrics.timers().DownstreamClientSend.Start()
updateMetrics := func(mvcc hlc.Timestamp, bytes int, compressedBytes int) {
recordOneMessageCb(mvcc, bytes, compressedBytes)
downstreamClientSendCb()
}

msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: s.metrics.recordOneMessage()},
Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: updateMetrics},
}
s.stats.startMessage(int64(msg.Key.Length() + msg.Value.Length()))
return s.emitMessage(ctx, msg)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/sink_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ func (p *deprecatedGcpPubsubClient) close() error {

// sendMessage sends a message to the topic
func (p *deprecatedGcpPubsubClient) sendMessage(m []byte, topic string, key string) error {
defer p.metrics.timers().DownstreamClientSend.Start()()

t, err := p.getTopicClient(topic)
if err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/sink_pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,16 @@ func (p *pulsarSink) Flush(ctx context.Context) error {
func (p *pulsarSink) msgCallback(
ctx context.Context, a kvevent.Alloc, mvcc hlc.Timestamp,
) func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
// Since we cannot distinguish between time spent buffering inside the
// pulsar client and time spent sending the message, we just set the value
// of DownstreamClientSend equal to BatchHistNanos.
sendCb := p.metrics.timers().DownstreamClientSend.Start()
oneMsgCb := p.metrics.recordOneMessage()

return func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
sendCb()
if err == nil {
p.metrics.recordOneMessage()(mvcc, len(message.Payload), len(message.Payload))
oneMsgCb(mvcc, len(message.Payload), len(message.Payload))
} else {
p.setError(id, message, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,9 @@ func (s *deprecatedWebhookSink) sendMessageWithRetries(
return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc)
}

func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) (retErr error) {
defer s.metrics.timers().DownstreamClientSend.Start()()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody))
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -224,6 +225,10 @@ func (r *telemetryMetricsRecorder) netMetrics() *cidr.NetMetrics {
return r.inner.netMetrics()
}

func (r *telemetryMetricsRecorder) timers() *timers.ScopedTimers {
return r.inner.timers()
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down
19 changes: 13 additions & 6 deletions pkg/ccl/changefeedccl/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ type Timers struct {
CheckpointJobProgress *aggmetric.AggHistogram
Encode *aggmetric.AggHistogram
EmitRow *aggmetric.AggHistogram
DownstreamClientSend *aggmetric.AggHistogram
KVFeedWaitForTableEvent *aggmetric.AggHistogram
KVFeedBuffer *aggmetric.AggHistogram
RangefeedBufferValue *aggmetric.AggHistogram
RangefeedBufferCheckpoint *aggmetric.AggHistogram
}

func (*Timers) MetricStruct() {}

var _ metric.Struct = &Timers{}

func New(histogramWindow time.Duration) *Timers {
histogramOptsFor := func(name, desc string) metric.HistogramOptions {
return metric.HistogramOptions{
Expand All @@ -44,6 +49,7 @@ func New(histogramWindow time.Duration) *Timers {
CheckpointJobProgress: b.Histogram(histogramOptsFor("changefeed.stage.checkpoint_job_progress.latency", "Latency of the changefeed stage: checkpointing job progress")),
Encode: b.Histogram(histogramOptsFor("changefeed.stage.encode.latency", "Latency of the changefeed stage: encoding data")),
EmitRow: b.Histogram(histogramOptsFor("changefeed.stage.emit_row.latency", "Latency of the changefeed stage: emitting row to sink")),
DownstreamClientSend: b.Histogram(histogramOptsFor("changefeed.stage.downstream_client_send.latency", "Latency of the changefeed stage: flushing messages from the sink's client to its downstream. This includes sends that failed for most but not all sinks.")),
KVFeedWaitForTableEvent: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_wait_for_table_event.latency", "Latency of the changefeed stage: waiting for a table schema event to join to the kv event")),
KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")),
RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")),
Expand All @@ -56,6 +62,7 @@ func (ts *Timers) GetOrCreateScopedTimers(scope string) *ScopedTimers {
CheckpointJobProgress: &timer{ts.CheckpointJobProgress.AddChild(scope)},
Encode: &timer{ts.Encode.AddChild(scope)},
EmitRow: &timer{ts.EmitRow.AddChild(scope)},
DownstreamClientSend: &timer{ts.DownstreamClientSend.AddChild(scope)},
KVFeedWaitForTableEvent: &timer{ts.KVFeedWaitForTableEvent.AddChild(scope)},
KVFeedBuffer: &timer{ts.KVFeedBuffer.AddChild(scope)},
RangefeedBufferValue: &timer{ts.RangefeedBufferValue.AddChild(scope)},
Expand All @@ -67,24 +74,24 @@ type ScopedTimers struct {
CheckpointJobProgress *timer
Encode *timer
EmitRow *timer
DownstreamClientSend *timer
KVFeedWaitForTableEvent *timer
KVFeedBuffer *timer
RangefeedBufferValue *timer
RangefeedBufferCheckpoint *timer
}

func (ts *ScopedTimers) StartTimer(stage *aggmetric.Histogram) func() {
start := timeutil.Now()
return func() {
stage.RecordValue(timeutil.Since(start).Nanoseconds())
}
}
var NoopScopedTimers = &ScopedTimers{}

type timer struct {
hist *aggmetric.Histogram
}

func (t *timer) Start() (end func()) {
if t == nil {
return func() {}
}

start := timeutil.Now()
return func() {
t.hist.RecordValue(timeutil.Since(start).Nanoseconds())
Expand Down

0 comments on commit 5e09bf2

Please sign in to comment.