Skip to content

Commit

Permalink
changefeedccl: make changefeed_emitted_bytes work for core changefeeds
Browse files Browse the repository at this point in the history
This patch makes `changefeed_emitted_bytes` structured log events (and
any future changefeed structured log events) work for core changefeeds.

Release note: None
  • Loading branch information
andyyang890 committed Nov 15, 2024
1 parent c18ccff commit f2528af
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 28 deletions.
43 changes: 26 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,29 @@ func (ca *changeAggregator) MustBeStreaming() bool {
return true
}

// wrapMetricsController wraps the supplied metricsRecorder to emit metrics to telemetry.
// This method modifies ca.cancel().
func (ca *changeAggregator) wrapMetricsController(
// wrapMetricsRecorderWithTelemetry wraps the supplied metricsRecorder
// so it periodically emits metrics to telemetry.
func (ca *changeAggregator) wrapMetricsRecorderWithTelemetry(
ctx context.Context, recorder metricsRecorder,
) (metricsRecorder, error) {
job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, ca.spec.JobID)
if err != nil {
return ca.sliMetrics, err
details := ca.spec.Feed
jobID := ca.spec.JobID
var description string
if ca.isSinkless() {
// Don't emit telemetry messages for core changefeeds without a description.
if details.SinklessInfo == nil {
return recorder, nil
}
description = details.SinklessInfo.Description
} else {
job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, jobID)
if err != nil {
return nil, err
}
description = job.Payload().Description
}

recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, job, ca.FlowCtx.Cfg.Settings, recorder)
recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, details, description, jobID, ca.FlowCtx.Cfg.Settings, recorder)
if err != nil {
return ca.sliMetrics, err
}
Expand Down Expand Up @@ -337,18 +349,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}
ca.sliMetricsID = ca.sliMetrics.claimId()

// TODO(jayant): add support for sinkless changefeeds using UUID
recorder := metricsRecorder(ca.sliMetrics)
if !ca.isSinkless() {
recorder, err = ca.wrapMetricsController(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
}

ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle,
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,14 @@ func createChangefeedJobRecord(
}

details.Opts = opts.AsMap()

details.SinklessInfo = &jobspb.ChangefeedDetails_SinklessChangefeedInfo{
Description: jobDescription,
}

// Jobs should not be created for sinkless changefeeds. However, note that
// we create and return a job record for sinkless changefeeds below. This is
// because we need the details field to create our sinkless changefeed.
// because we need the job description and details to create our sinkless changefeed.
// After this job record is returned, we create our forever running sinkless
// changefeed, thus ensuring that no job is created for this changefeed as
// desired.
Expand Down
26 changes: 16 additions & 10 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -32,7 +31,6 @@ type sinkTelemetryData struct {
type periodicTelemetryLogger struct {
ctx context.Context
sinkTelemetryData sinkTelemetryData
job *jobs.Job
changefeedDetails eventpb.CommonChangefeedEventDetails
settings *cluster.Settings

Expand All @@ -59,12 +57,15 @@ type telemetryLogger interface {
var _ telemetryLogger = (*periodicTelemetryLogger)(nil)

func makePeriodicTelemetryLogger(
ctx context.Context, job *jobs.Job, s *cluster.Settings,
ctx context.Context,
details jobspb.ChangefeedDetails,
description string,
jobID jobspb.JobID,
s *cluster.Settings,
) (*periodicTelemetryLogger, error) {
return &periodicTelemetryLogger{
ctx: ctx,
job: job,
changefeedDetails: makeCommonChangefeedEventDetails(ctx, job.Details().(jobspb.ChangefeedDetails), job.Payload().Description, job.ID()),
changefeedDetails: makeCommonChangefeedEventDetails(ctx, details, description, jobID),
sinkTelemetryData: sinkTelemetryData{},
settings: s,
}, nil
Expand Down Expand Up @@ -134,9 +135,14 @@ func (ptl *periodicTelemetryLogger) close() {
}

func wrapMetricsRecorderWithTelemetry(
ctx context.Context, job *jobs.Job, s *cluster.Settings, mb metricsRecorder,
ctx context.Context,
details jobspb.ChangefeedDetails,
description string,
jobID jobspb.JobID,
s *cluster.Settings,
mb metricsRecorder,
) (*telemetryMetricsRecorder, error) {
logger, err := makePeriodicTelemetryLogger(ctx, job, s)
logger, err := makePeriodicTelemetryLogger(ctx, details, description, jobID, s)
if err != nil {
return &telemetryMetricsRecorder{}, err
}
Expand Down Expand Up @@ -229,13 +235,13 @@ func (r *telemetryMetricsRecorder) timers() *timers.ScopedTimers {
return r.inner.timers()
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
// continuousTelemetryInterval determines the interval at which each node emits
// periodic telemetry events during the lifespan of each changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.telemetry.continuous_logging.interval",
"determines the interval at which each node emits continuous telemetry events"+
" during the lifespan of every enterprise changefeed; setting a zero value disables",
" during the lifespan of every changefeed; setting a zero value disables logging",
24*time.Hour,
settings.NonNegativeDuration,
)
10 changes: 10 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,16 @@ message ChangefeedDetails {
sessiondatapb.SessionData session_data = 11;
reserved 1, 2, 5;
reserved "targets";

// SinklessChangefeedInfo contains some metadata for sinkless changefeeds that
// is normally stored in the job payload for changefeeds with sinks.
message SinklessChangefeedInfo {
string description = 1;
}

// SinklessInfo will only be set for sinkless changefeeds and contains metadata
// that is normally stored in the job payload for changefeeds with sinks.
SinklessChangefeedInfo sinkless_info = 12;
}

message ResolvedSpan {
Expand Down

0 comments on commit f2528af

Please sign in to comment.