Skip to content

Commit

Permalink
changefeedccl: add timer for inner sink client flush
Browse files Browse the repository at this point in the history
Add a timer for the inner flushes inside sink
clients. Where possible (batching_sink users,
webhook v1, pubsub v1, cloud storage), this is
distinct from changefeed.batch_hist_nanos in that
in only captures time spent flushing data to the
downstream endpoint, not time spent buffering
data.

Part of: #127784

Release note (enterprise change): Added a timer
for inner sink client flushes.
  • Loading branch information
asg0451 committed Oct 22, 2024
1 parent 9e12f67 commit 6689963
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 9 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
batch, _ := req.(*sinkBatch)
defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages))
s.metrics.recordSinkIOInflightChange(int64(batch.numMessages))
defer s.metrics.timers().DownstreamClientSend.Start()()

return s.client.Flush(ctx, batch.payload)
}
ioEmitter := NewParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics, s.settings)
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type metricsRecorder interface {
makeCloudstorageFileAllocCallback() func(delta int64)
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
netMetrics() *cidr.NetMetrics
timers() *timers.ScopedTimers
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -355,6 +356,14 @@ func (m *sliMetrics) netMetrics() *cidr.NetMetrics {
return m.NetMetrics
}

func (m *sliMetrics) timers() *timers.ScopedTimers {
if m == nil {
return timers.NoopScopedTimers
}

return m.Timers
}

// JobScopedUsageMetrics are aggregated metrics keeping track of
// per-changefeed usage metrics by job_id. Note that its members are public so
// they get registered with the metrics registry, but you should NOT modify them
Expand Down Expand Up @@ -674,6 +683,10 @@ func (w *wrappingCostController) netMetrics() *cidr.NetMetrics {
return w.inner.netMetrics()
}

func (w *wrappingCostController) timers() *timers.ScopedTimers {
return w.inner.timers()
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ func (f *cloudStorageSinkFile) flushToStorage(
ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder,
) error {
defer f.releaseAlloc(ctx)
defer m.timers().DownstreamClientSend.Start()()

if f.rawSize == 0 {
// This method shouldn't be called with an empty file, but be defensive
Expand Down
12 changes: 11 additions & 1 deletion pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,21 @@ func (s *kafkaSink) EmitRow(
return err
}

// Since we cannot distinguish between time spent buffering vs emitting
// inside sarama, set the DownstreamClientSend timer to the same value as
// BatchHistNanos.
recordOneMessageCb := s.metrics.recordOneMessage()
downstreamClientSendCb := s.metrics.timers().DownstreamClientSend.Start()
updateMetrics := func(mvcc hlc.Timestamp, bytes int, compressedBytes int) {
recordOneMessageCb(mvcc, bytes, compressedBytes)
downstreamClientSendCb()
}

msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: s.metrics.recordOneMessage()},
Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: updateMetrics},
}
s.stats.startMessage(int64(msg.Key.Length() + msg.Value.Length()))
return s.emitMessage(ctx, msg)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/sink_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ func (p *deprecatedGcpPubsubClient) close() error {

// sendMessage sends a message to the topic
func (p *deprecatedGcpPubsubClient) sendMessage(m []byte, topic string, key string) error {
defer p.metrics.timers().DownstreamClientSend.Start()()

t, err := p.getTopicClient(topic)
if err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/sink_pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,16 @@ func (p *pulsarSink) Flush(ctx context.Context) error {
func (p *pulsarSink) msgCallback(
ctx context.Context, a kvevent.Alloc, mvcc hlc.Timestamp,
) func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
// Since we cannot distinguish between time spent buffering inside the
// pulsar client and time spent sending the message, we just set the value
// of DownstreamClientSend equal to BatchHistNanos.
sendCb := p.metrics.timers().DownstreamClientSend.Start()
oneMsgCb := p.metrics.recordOneMessage()

return func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
sendCb()
if err == nil {
p.metrics.recordOneMessage()(mvcc, len(message.Payload), len(message.Payload))
oneMsgCb(mvcc, len(message.Payload), len(message.Payload))
} else {
p.setError(id, message, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,9 @@ func (s *deprecatedWebhookSink) sendMessageWithRetries(
return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc)
}

func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) (retErr error) {
defer s.metrics.timers().DownstreamClientSend.Start()()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody))
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -224,6 +225,10 @@ func (r *telemetryMetricsRecorder) netMetrics() *cidr.NetMetrics {
return r.inner.netMetrics()
}

func (r *telemetryMetricsRecorder) timers() *timers.ScopedTimers {
return r.inner.timers()
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/changefeedccl/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Timers struct {
CheckpointJobProgress *aggmetric.AggHistogram
Encode *aggmetric.AggHistogram
EmitRow *aggmetric.AggHistogram
DownstreamClientSend *aggmetric.AggHistogram
KVFeedWaitForTableEvent *aggmetric.AggHistogram
KVFeedBuffer *aggmetric.AggHistogram
RangefeedBufferValue *aggmetric.AggHistogram
Expand All @@ -44,6 +45,7 @@ func New(histogramWindow time.Duration) *Timers {
CheckpointJobProgress: b.Histogram(histogramOptsFor("changefeed.stage.checkpoint_job_progress.latency", "Latency of the changefeed stage: checkpointing job progress")),
Encode: b.Histogram(histogramOptsFor("changefeed.stage.encode.latency", "Latency of the changefeed stage: encoding data")),
EmitRow: b.Histogram(histogramOptsFor("changefeed.stage.emit_row.latency", "Latency of the changefeed stage: emitting row to sink")),
DownstreamClientSend: b.Histogram(histogramOptsFor("changefeed.stage.sink_client_send.latency", "Latency of the changefeed stage: flushing messages from the sink's client to its downstream. This includes sends that failed for most but not all sinks.")),
KVFeedWaitForTableEvent: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_wait_for_table_event.latency", "Latency of the changefeed stage: waiting for a table schema event to join to the kv event")),
KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")),
RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")),
Expand All @@ -56,6 +58,7 @@ func (ts *Timers) GetOrCreateScopedTimers(scope string) *ScopedTimers {
CheckpointJobProgress: &timer{ts.CheckpointJobProgress.AddChild(scope)},
Encode: &timer{ts.Encode.AddChild(scope)},
EmitRow: &timer{ts.EmitRow.AddChild(scope)},
DownstreamClientSend: &timer{ts.DownstreamClientSend.AddChild(scope)},
KVFeedWaitForTableEvent: &timer{ts.KVFeedWaitForTableEvent.AddChild(scope)},
KVFeedBuffer: &timer{ts.KVFeedBuffer.AddChild(scope)},
RangefeedBufferValue: &timer{ts.RangefeedBufferValue.AddChild(scope)},
Expand All @@ -67,24 +70,24 @@ type ScopedTimers struct {
CheckpointJobProgress *timer
Encode *timer
EmitRow *timer
DownstreamClientSend *timer
KVFeedWaitForTableEvent *timer
KVFeedBuffer *timer
RangefeedBufferValue *timer
RangefeedBufferCheckpoint *timer
}

func (ts *ScopedTimers) StartTimer(stage *aggmetric.Histogram) func() {
start := timeutil.Now()
return func() {
stage.RecordValue(timeutil.Since(start).Nanoseconds())
}
}
var NoopScopedTimers = &ScopedTimers{}

type timer struct {
hist *aggmetric.Histogram
}

func (t *timer) Start() (end func()) {
if t == nil {
return func() {}
}

start := timeutil.Now()
return func() {
t.hist.RecordValue(timeutil.Since(start).Nanoseconds())
Expand Down

0 comments on commit 6689963

Please sign in to comment.