diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index b7d6ad479021..8afc59427093 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -77,6 +77,7 @@ func distChangefeedFlow( execCtx sql.JobExecContext, jobID jobspb.JobID, details jobspb.ChangefeedDetails, + description string, localState *cachedState, resultsCh chan<- tree.Datums, ) error { @@ -132,7 +133,7 @@ func distChangefeedFlow( } } return startDistChangefeed( - ctx, execCtx, jobID, schemaTS, details, initialHighWater, localState, resultsCh) + ctx, execCtx, jobID, schemaTS, details, description, initialHighWater, localState, resultsCh) } func fetchTableDescriptors( @@ -227,6 +228,7 @@ func startDistChangefeed( jobID jobspb.JobID, schemaTS hlc.Timestamp, details jobspb.ChangefeedDetails, + description string, initialHighWater hlc.Timestamp, localState *cachedState, resultsCh chan<- tree.Datums, @@ -259,7 +261,7 @@ func startDistChangefeed( if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil { checkpoint = progress.Checkpoint } - p, planCtx, err := makePlan(execCtx, jobID, details, initialHighWater, + p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater, trackedSpans, checkpoint, localState.drainingNodes)(ctx, dsp) if err != nil { return err @@ -373,6 +375,7 @@ func makePlan( execCtx sql.JobExecContext, jobID jobspb.JobID, details jobspb.ChangefeedDetails, + description string, initialHighWater hlc.Timestamp, trackedSpans []roachpb.Span, checkpoint *jobspb.ChangefeedProgress_Checkpoint, @@ -476,12 +479,13 @@ func makePlan( } aggregatorSpecs[i] = &execinfrapb.ChangeAggregatorSpec{ - Watches: watches, - Checkpoint: aggregatorCheckpoint, - Feed: details, - UserProto: execCtx.User().EncodeProto(), - JobID: jobID, - Select: execinfrapb.Expression{Expr: details.Select}, + Watches: watches, + Checkpoint: aggregatorCheckpoint, + Feed: details, + UserProto: execCtx.User().EncodeProto(), + JobID: jobID, + Select: execinfrapb.Expression{Expr: details.Select}, + Description: description, } } @@ -494,6 +498,7 @@ func makePlan( Feed: details, JobID: jobID, UserProto: execCtx.User().EncodeProto(), + Description: description, } if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index bdd6c088dfe7..3311c1cb6e85 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -262,17 +262,31 @@ func (ca *changeAggregator) MustBeStreaming() bool { return true } -// wrapMetricsController wraps the supplied metricsRecorder to emit metrics to telemetry. -// This method modifies ca.cancel(). -func (ca *changeAggregator) wrapMetricsController( +// wrapMetricsRecorderWithTelemetry wraps the supplied metricsRecorder +// so it periodically emits metrics to telemetry. +func (ca *changeAggregator) wrapMetricsRecorderWithTelemetry( ctx context.Context, recorder metricsRecorder, ) (metricsRecorder, error) { - job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, ca.spec.JobID) - if err != nil { - return ca.sliMetrics, err + details := ca.spec.Feed + jobID := ca.spec.JobID + description := ca.spec.Description + // This code exists so that if the spec is created in a mixed-version cluster + // on a node without the new Description field, this node can still fetch + // the description. + if description == "" { + // Don't emit telemetry messages for core changefeeds without a description. + if ca.isSinkless() { + return recorder, nil + } else { + job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, jobID) + if err != nil { + return nil, err + } + description = job.Payload().Description + } } - recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, job, ca.FlowCtx.Cfg.Settings, recorder) + recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, details, description, jobID, ca.FlowCtx.Cfg.Settings, recorder) if err != nil { return ca.sliMetrics, err } @@ -337,18 +351,15 @@ func (ca *changeAggregator) Start(ctx context.Context) { } ca.sliMetricsID = ca.sliMetrics.claimId() - // TODO(jayant): add support for sinkless changefeeds using UUID recorder := metricsRecorder(ca.sliMetrics) - if !ca.isSinkless() { - recorder, err = ca.wrapMetricsController(ctx, recorder) - if err != nil { - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err) - } - ca.MoveToDraining(err) - ca.cancel() - return + recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder) + if err != nil { + if log.V(2) { + log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err) } + ca.MoveToDraining(err) + ca.cancel() + return } ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle, diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 9d7b8485c872..3495f2a12885 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -222,7 +222,7 @@ func changefeedPlanHook( telemetry.Count(`changefeed.create.core`) logChangefeedCreateTelemetry(ctx, jr, changefeedStmt.Select != nil) - err := coreChangefeed(ctx, p, details, progress, resultsCh) + err := coreChangefeed(ctx, p, details, jr.Description, progress, resultsCh) // TODO(yevgeniy): This seems wrong -- core changefeeds always terminate // with an error. Perhaps rename this telemetry to indicate number of // completed feeds. @@ -332,6 +332,7 @@ func coreChangefeed( ctx context.Context, p sql.PlanHookState, details jobspb.ChangefeedDetails, + description string, progress jobspb.Progress, resultsCh chan<- tree.Datums, ) error { @@ -350,7 +351,7 @@ func coreChangefeed( knobs.BeforeDistChangefeed() } - err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, localState, resultsCh) + err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, description, localState, resultsCh) if err == nil { log.Infof(ctx, "core changefeed completed with no error") return nil @@ -658,7 +659,7 @@ func createChangefeedJobRecord( details.Opts = opts.AsMap() // Jobs should not be created for sinkless changefeeds. However, note that // we create and return a job record for sinkless changefeeds below. This is - // because we need the details field to create our sinkless changefeed. + // because we need the job description and details to create our sinkless changefeed. // After this job record is returned, we create our forever running sinkless // changefeed, thus ensuring that no job is created for this changefeed as // desired. @@ -1112,12 +1113,13 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err jobID := b.job.ID() details := b.job.Details().(jobspb.ChangefeedDetails) progress := b.job.Progress() + description := b.job.Payload().Description if err := b.ensureClusterIDMatches(ctx, jobExec.ExtendedEvalContext().ClusterID); err != nil { return err } - err := b.resumeWithRetries(ctx, jobExec, jobID, details, progress, execCfg) + err := b.resumeWithRetries(ctx, jobExec, jobID, details, description, progress, execCfg) if err != nil { return b.handleChangefeedError(ctx, err, details, jobExec) } @@ -1237,6 +1239,7 @@ func (b *changefeedResumer) resumeWithRetries( jobExec sql.JobExecContext, jobID jobspb.JobID, details jobspb.ChangefeedDetails, + description string, initialProgress jobspb.Progress, execCfg *sql.ExecutorConfig, ) error { @@ -1286,7 +1289,7 @@ func (b *changefeedResumer) resumeWithRetries( knobs.BeforeDistChangefeed() } - flowErr = distChangefeedFlow(ctx, jobExec, jobID, details, localState, startedCh) + flowErr = distChangefeedFlow(ctx, jobExec, jobID, details, description, localState, startedCh) if flowErr == nil { return nil // Changefeed completed -- e.g. due to initial_scan=only mode. } diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 22307963b3d2..801861876f20 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -11,7 +11,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" - "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -32,7 +31,6 @@ type sinkTelemetryData struct { type periodicTelemetryLogger struct { ctx context.Context sinkTelemetryData sinkTelemetryData - job *jobs.Job changefeedDetails eventpb.CommonChangefeedEventDetails settings *cluster.Settings @@ -59,12 +57,15 @@ type telemetryLogger interface { var _ telemetryLogger = (*periodicTelemetryLogger)(nil) func makePeriodicTelemetryLogger( - ctx context.Context, job *jobs.Job, s *cluster.Settings, + ctx context.Context, + details jobspb.ChangefeedDetails, + description string, + jobID jobspb.JobID, + s *cluster.Settings, ) (*periodicTelemetryLogger, error) { return &periodicTelemetryLogger{ ctx: ctx, - job: job, - changefeedDetails: makeCommonChangefeedEventDetails(ctx, job.Details().(jobspb.ChangefeedDetails), job.Payload().Description, job.ID()), + changefeedDetails: makeCommonChangefeedEventDetails(ctx, details, description, jobID), sinkTelemetryData: sinkTelemetryData{}, settings: s, }, nil @@ -134,9 +135,14 @@ func (ptl *periodicTelemetryLogger) close() { } func wrapMetricsRecorderWithTelemetry( - ctx context.Context, job *jobs.Job, s *cluster.Settings, mb metricsRecorder, + ctx context.Context, + details jobspb.ChangefeedDetails, + description string, + jobID jobspb.JobID, + s *cluster.Settings, + mb metricsRecorder, ) (*telemetryMetricsRecorder, error) { - logger, err := makePeriodicTelemetryLogger(ctx, job, s) + logger, err := makePeriodicTelemetryLogger(ctx, details, description, jobID, s) if err != nil { return &telemetryMetricsRecorder{}, err } @@ -229,13 +235,13 @@ func (r *telemetryMetricsRecorder) timers() *timers.ScopedTimers { return r.inner.timers() } -// continuousTelemetryInterval determines the interval at which each node emits telemetry events -// during the lifespan of each enterprise changefeed. +// continuousTelemetryInterval determines the interval at which each node emits +// periodic telemetry events during the lifespan of each changefeed. var continuousTelemetryInterval = settings.RegisterDurationSetting( settings.ApplicationLevel, "changefeed.telemetry.continuous_logging.interval", "determines the interval at which each node emits continuous telemetry events"+ - " during the lifespan of every enterprise changefeed; setting a zero value disables", + " during the lifespan of every changefeed; setting a zero value disables logging", 24*time.Hour, settings.NonNegativeDuration, ) diff --git a/pkg/sql/execinfrapb/processors_changefeeds.proto b/pkg/sql/execinfrapb/processors_changefeeds.proto index 158a9b3ff3fb..cc5bdd9000a4 100644 --- a/pkg/sql/execinfrapb/processors_changefeeds.proto +++ b/pkg/sql/execinfrapb/processors_changefeeds.proto @@ -55,6 +55,9 @@ message ChangeAggregatorSpec { // select is the "select clause" for predicate changefeed. optional Expression select = 6 [(gogoproto.nullable) = false]; + + // Description is the description of the changefeed. Used for structured logging. + optional string description = 7 [(gogoproto.nullable) = false]; } // ChangeFrontierSpec is the specification for a processor that receives @@ -79,4 +82,7 @@ message ChangeFrontierSpec { // User who initiated the changefeed. This is used to check access privileges // when using FileTable ExternalStorage. optional string user_proto = 4 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; + + // Description is the description of the changefeed. Used for structured logging. + optional string description = 5 [(gogoproto.nullable) = false]; }