Skip to content

Commit

Permalink
changefeedccl: make changefeed_emitted_bytes work for core changefeeds
Browse files Browse the repository at this point in the history
This patch makes `changefeed_emitted_bytes` structured log events (and
any future changefeed structured log events) work for core changefeeds
by including the changefeed description in the processor specs.

Release note: None
  • Loading branch information
andyyang890 committed Nov 15, 2024
1 parent c18ccff commit cfa59ad
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 40 deletions.
21 changes: 13 additions & 8 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func distChangefeedFlow(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
description string,
localState *cachedState,
resultsCh chan<- tree.Datums,
) error {
Expand Down Expand Up @@ -132,7 +133,7 @@ func distChangefeedFlow(
}
}
return startDistChangefeed(
ctx, execCtx, jobID, schemaTS, details, initialHighWater, localState, resultsCh)
ctx, execCtx, jobID, schemaTS, details, description, initialHighWater, localState, resultsCh)
}

func fetchTableDescriptors(
Expand Down Expand Up @@ -227,6 +228,7 @@ func startDistChangefeed(
jobID jobspb.JobID,
schemaTS hlc.Timestamp,
details jobspb.ChangefeedDetails,
description string,
initialHighWater hlc.Timestamp,
localState *cachedState,
resultsCh chan<- tree.Datums,
Expand Down Expand Up @@ -259,7 +261,7 @@ func startDistChangefeed(
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
checkpoint = progress.Checkpoint
}
p, planCtx, err := makePlan(execCtx, jobID, details, initialHighWater,
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
trackedSpans, checkpoint, localState.drainingNodes)(ctx, dsp)
if err != nil {
return err
Expand Down Expand Up @@ -373,6 +375,7 @@ func makePlan(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
description string,
initialHighWater hlc.Timestamp,
trackedSpans []roachpb.Span,
checkpoint *jobspb.ChangefeedProgress_Checkpoint,
Expand Down Expand Up @@ -476,12 +479,13 @@ func makePlan(
}

aggregatorSpecs[i] = &execinfrapb.ChangeAggregatorSpec{
Watches: watches,
Checkpoint: aggregatorCheckpoint,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
JobID: jobID,
Select: execinfrapb.Expression{Expr: details.Select},
Watches: watches,
Checkpoint: aggregatorCheckpoint,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
JobID: jobID,
Select: execinfrapb.Expression{Expr: details.Select},
Description: description,
}
}

Expand All @@ -494,6 +498,7 @@ func makePlan(
Feed: details,
JobID: jobID,
UserProto: execCtx.User().EncodeProto(),
Description: description,
}

if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {
Expand Down
45 changes: 28 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,31 @@ func (ca *changeAggregator) MustBeStreaming() bool {
return true
}

// wrapMetricsController wraps the supplied metricsRecorder to emit metrics to telemetry.
// This method modifies ca.cancel().
func (ca *changeAggregator) wrapMetricsController(
// wrapMetricsRecorderWithTelemetry wraps the supplied metricsRecorder
// so it periodically emits metrics to telemetry.
func (ca *changeAggregator) wrapMetricsRecorderWithTelemetry(
ctx context.Context, recorder metricsRecorder,
) (metricsRecorder, error) {
job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, ca.spec.JobID)
if err != nil {
return ca.sliMetrics, err
details := ca.spec.Feed
jobID := ca.spec.JobID
description := ca.spec.Description
// This code exists so that if the spec is created in a mixed-version cluster
// on a node without the new Description field, this node can still fetch
// the description.
if description == "" {
// Don't emit telemetry messages for core changefeeds without a description.
if ca.isSinkless() {
return recorder, nil
} else {
job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, jobID)
if err != nil {
return nil, err
}
description = job.Payload().Description
}
}

recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, job, ca.FlowCtx.Cfg.Settings, recorder)
recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, details, description, jobID, ca.FlowCtx.Cfg.Settings, recorder)
if err != nil {
return ca.sliMetrics, err
}
Expand Down Expand Up @@ -337,18 +351,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}
ca.sliMetricsID = ca.sliMetrics.claimId()

// TODO(jayant): add support for sinkless changefeeds using UUID
recorder := metricsRecorder(ca.sliMetrics)
if !ca.isSinkless() {
recorder, err = ca.wrapMetricsController(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
}

ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle,
Expand Down
13 changes: 8 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func changefeedPlanHook(
telemetry.Count(`changefeed.create.core`)
logChangefeedCreateTelemetry(ctx, jr, changefeedStmt.Select != nil)

err := coreChangefeed(ctx, p, details, progress, resultsCh)
err := coreChangefeed(ctx, p, details, jr.Description, progress, resultsCh)
// TODO(yevgeniy): This seems wrong -- core changefeeds always terminate
// with an error. Perhaps rename this telemetry to indicate number of
// completed feeds.
Expand Down Expand Up @@ -332,6 +332,7 @@ func coreChangefeed(
ctx context.Context,
p sql.PlanHookState,
details jobspb.ChangefeedDetails,
description string,
progress jobspb.Progress,
resultsCh chan<- tree.Datums,
) error {
Expand All @@ -350,7 +351,7 @@ func coreChangefeed(
knobs.BeforeDistChangefeed()
}

err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, localState, resultsCh)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, description, localState, resultsCh)
if err == nil {
log.Infof(ctx, "core changefeed completed with no error")
return nil
Expand Down Expand Up @@ -658,7 +659,7 @@ func createChangefeedJobRecord(
details.Opts = opts.AsMap()
// Jobs should not be created for sinkless changefeeds. However, note that
// we create and return a job record for sinkless changefeeds below. This is
// because we need the details field to create our sinkless changefeed.
// because we need the job description and details to create our sinkless changefeed.
// After this job record is returned, we create our forever running sinkless
// changefeed, thus ensuring that no job is created for this changefeed as
// desired.
Expand Down Expand Up @@ -1112,12 +1113,13 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
jobID := b.job.ID()
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()
description := b.job.Payload().Description

if err := b.ensureClusterIDMatches(ctx, jobExec.ExtendedEvalContext().ClusterID); err != nil {
return err
}

err := b.resumeWithRetries(ctx, jobExec, jobID, details, progress, execCfg)
err := b.resumeWithRetries(ctx, jobExec, jobID, details, description, progress, execCfg)
if err != nil {
return b.handleChangefeedError(ctx, err, details, jobExec)
}
Expand Down Expand Up @@ -1237,6 +1239,7 @@ func (b *changefeedResumer) resumeWithRetries(
jobExec sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
description string,
initialProgress jobspb.Progress,
execCfg *sql.ExecutorConfig,
) error {
Expand Down Expand Up @@ -1286,7 +1289,7 @@ func (b *changefeedResumer) resumeWithRetries(
knobs.BeforeDistChangefeed()
}

flowErr = distChangefeedFlow(ctx, jobExec, jobID, details, localState, startedCh)
flowErr = distChangefeedFlow(ctx, jobExec, jobID, details, description, localState, startedCh)
if flowErr == nil {
return nil // Changefeed completed -- e.g. due to initial_scan=only mode.
}
Expand Down
26 changes: 16 additions & 10 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -32,7 +31,6 @@ type sinkTelemetryData struct {
type periodicTelemetryLogger struct {
ctx context.Context
sinkTelemetryData sinkTelemetryData
job *jobs.Job
changefeedDetails eventpb.CommonChangefeedEventDetails
settings *cluster.Settings

Expand All @@ -59,12 +57,15 @@ type telemetryLogger interface {
var _ telemetryLogger = (*periodicTelemetryLogger)(nil)

func makePeriodicTelemetryLogger(
ctx context.Context, job *jobs.Job, s *cluster.Settings,
ctx context.Context,
details jobspb.ChangefeedDetails,
description string,
jobID jobspb.JobID,
s *cluster.Settings,
) (*periodicTelemetryLogger, error) {
return &periodicTelemetryLogger{
ctx: ctx,
job: job,
changefeedDetails: makeCommonChangefeedEventDetails(ctx, job.Details().(jobspb.ChangefeedDetails), job.Payload().Description, job.ID()),
changefeedDetails: makeCommonChangefeedEventDetails(ctx, details, description, jobID),
sinkTelemetryData: sinkTelemetryData{},
settings: s,
}, nil
Expand Down Expand Up @@ -134,9 +135,14 @@ func (ptl *periodicTelemetryLogger) close() {
}

func wrapMetricsRecorderWithTelemetry(
ctx context.Context, job *jobs.Job, s *cluster.Settings, mb metricsRecorder,
ctx context.Context,
details jobspb.ChangefeedDetails,
description string,
jobID jobspb.JobID,
s *cluster.Settings,
mb metricsRecorder,
) (*telemetryMetricsRecorder, error) {
logger, err := makePeriodicTelemetryLogger(ctx, job, s)
logger, err := makePeriodicTelemetryLogger(ctx, details, description, jobID, s)
if err != nil {
return &telemetryMetricsRecorder{}, err
}
Expand Down Expand Up @@ -229,13 +235,13 @@ func (r *telemetryMetricsRecorder) timers() *timers.ScopedTimers {
return r.inner.timers()
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
// continuousTelemetryInterval determines the interval at which each node emits
// periodic telemetry events during the lifespan of each changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.telemetry.continuous_logging.interval",
"determines the interval at which each node emits continuous telemetry events"+
" during the lifespan of every enterprise changefeed; setting a zero value disables",
" during the lifespan of every changefeed; setting a zero value disables logging",
24*time.Hour,
settings.NonNegativeDuration,
)
6 changes: 6 additions & 0 deletions pkg/sql/execinfrapb/processors_changefeeds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ message ChangeAggregatorSpec {

// select is the "select clause" for predicate changefeed.
optional Expression select = 6 [(gogoproto.nullable) = false];

// Description is the description of the changefeed. Used for structured logging.
optional string description = 7 [(gogoproto.nullable) = false];
}

// ChangeFrontierSpec is the specification for a processor that receives
Expand All @@ -79,4 +82,7 @@ message ChangeFrontierSpec {
// User who initiated the changefeed. This is used to check access privileges
// when using FileTable ExternalStorage.
optional string user_proto = 4 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"];

// Description is the description of the changefeed. Used for structured logging.
optional string description = 5 [(gogoproto.nullable) = false];
}

0 comments on commit cfa59ad

Please sign in to comment.