diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 660b6199d56e..9428ec81adc9 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -142,6 +142,7 @@ go_library( "//pkg/util/mon", "//pkg/util/parquet", "//pkg/util/protoutil", + "//pkg/util/quotapool", "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/span", diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index 2f759d8df048..6341d7a49503 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -16,12 +16,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) // SinkClient is an interface to an external sink, where messages are written @@ -66,9 +69,10 @@ type batchingSink struct { minFlushFrequency time.Duration retryOpts retry.Options - ts timeutil.TimeSource - metrics metricsRecorder - knobs batchingSinkKnobs + ts timeutil.TimeSource + metrics metricsRecorder + settings *cluster.Settings + knobs batchingSinkKnobs // eventCh is the channel used to send requests from the Sink caller routines // to the batching routine. Messages can either be a flushReq or a rowEvent. @@ -157,7 +161,7 @@ func freeRowEvent(e *rowEvent) { eventPool.Put(e) } -var batchPool sync.Pool = sync.Pool{ +var batchPool = sync.Pool{ New: func() interface{} { return new(sinkBatch) }, @@ -331,7 +335,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { s.metrics.recordSinkIOInflightChange(int64(batch.numMessages)) return s.client.Flush(ctx, batch.payload) } - ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics) + ioEmitter := NewParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics, s.settings) defer ioEmitter.Close() // Flushing requires tracking the number of inflight messages and confirming @@ -339,11 +343,12 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { inflight := 0 var sinkFlushWaiter chan struct{} - handleResult := func(result *ioResult) { - batch, _ := result.request.(*sinkBatch) + handleResult := func(result IOResult) { + req, err := result.Consume() + batch, _ := req.(*sinkBatch) - if result.err != nil { - s.handleError(result.err) + if err != nil { + s.handleError(err) } else { s.metrics.recordEmittedBatch( batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress, @@ -352,12 +357,11 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { inflight -= batch.numMessages - if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil { + if (err != nil || inflight == 0) && sinkFlushWaiter != nil { close(sinkFlushWaiter) sinkFlushWaiter = nil } - freeIOResult(result) batch.alloc.Release(ctx) freeSinkBatchEvent(batch) } @@ -373,22 +377,41 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { return err } - // Emitting needs to also handle any incoming results to avoid a deadlock - // with trying to emit while the emitter is blocked on returning a result. - for { + req, send, err := ioEmitter.AdmitRequest(ctx, batchBuffer) + if errors.Is(err, ErrNotEnoughQuota) { + // Quota can only be freed by consuming a result. select { case <-ctx.Done(): return ctx.Err() - case ioEmitter.requestCh <- batchBuffer: - case result := <-ioEmitter.resultCh: - handleResult(result) - continue case <-s.doneCh: + return nil + case result := <-ioEmitter.GetResult(): + handleResult(result) } - break + + // The request should be emitted after freeing quota since this is + // a single producer scenario. + req, send, err = ioEmitter.AdmitRequest(ctx, batchBuffer) + if errors.Is(err, ErrNotEnoughQuota) { + logcrash.ReportOrPanic(ctx, &s.settings.SV, "expected request to be emitted after waiting for quota") + return errors.AssertionFailedf("expected request to be emitted after waiting for quota") + } else if err != nil { + return err + } + } else if err != nil { + return err } - return nil + // The request was admitted, it must be sent. There are no concurrent requests being sent which + // would use up the quota. + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + return nil + case send <- req: + return nil + } } flushAll := func() error { @@ -478,7 +501,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { default: s.handleError(fmt.Errorf("received unknown request of unknown type: %v", r)) } - case result := <-ioEmitter.resultCh: + case result := <-ioEmitter.GetResult(): handleResult(result) case <-flushTimer.Ch(): flushTimer.MarkRead() @@ -505,6 +528,7 @@ func makeBatchingSink( pacerFactory func() *admission.Pacer, timeSource timeutil.TimeSource, metrics metricsRecorder, + settings *cluster.Settings, ) Sink { sink := &batchingSink{ client: client, @@ -515,6 +539,7 @@ func makeBatchingSink( retryOpts: retryOpts, ts: timeSource, metrics: metrics, + settings: settings, eventCh: make(chan interface{}, flushQueueDepth), wg: ctxgroup.WithContext(ctx), hasher: makeHasher(), diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go index bc351bf35575..0961bdf4e4a6 100644 --- a/pkg/ccl/changefeedccl/parallel_io.go +++ b/pkg/ccl/changefeedccl/parallel_io.go @@ -13,13 +13,17 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) -// parallelIO allows performing blocking "IOHandler" calls on in parallel. +// ParallelIO allows performing blocking "IOHandler" calls on in parallel. // IORequests implement a Keys() function returning keys on which ordering is // preserved. // Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that @@ -28,7 +32,7 @@ import ( // errored, [b,c] would never be sent, and an error would be returned with [c,d] // in an ioResult struct sent to resultCh. After sending an error to resultCh // all workers are torn down and no further requests are received or handled. -type parallelIO struct { +type ParallelIO struct { retryOpts retry.Options wg ctxgroup.Group metrics metricsRecorder @@ -36,8 +40,9 @@ type parallelIO struct { ioHandler IOHandler - requestCh chan IORequest - resultCh chan *ioResult // readers should freeIOResult after handling result events + quota *quotapool.IntPool + requestCh chan AdmittedIORequest + resultCh chan IOResult } // IORequest represents an abstract unit of IO that has a set of keys upon which @@ -47,56 +52,47 @@ type IORequest interface { NumMessages() int } -// ioResult stores the full request that was sent as well as an error if even -// after retries the IOHanlder was unable to succeed. -type ioResult struct { - request IORequest - err error - // Time representing when this result was received from the sink. - arrivalTime time.Time -} - var resultPool = sync.Pool{ New: func() interface{} { - return new(ioResult) + return new(ioRequest) }, } -func newIOResult(req IORequest, err error) *ioResult { - res := resultPool.Get().(*ioResult) - res.request = req - res.err = err - res.arrivalTime = timeutil.Now() +// newIORequest is used to allocate *ioRequest structs using a pool. +func newIORequest(req IORequest, a *quotapool.IntAlloc) *ioRequest { + res := resultPool.Get().(*ioRequest) + res.r = req + res.a = a return res } -func freeIOResult(e *ioResult) { - *e = ioResult{} - resultPool.Put(e) -} -type queuedRequest struct { - req IORequest - admitTime time.Time +// freeIORequest frees the ioRequest and returns it to the pool. +func freeIORequest(e *ioRequest) { + *e = ioRequest{} + resultPool.Put(e) } // IOHandler performs a blocking IO operation on an IORequest type IOHandler func(context.Context, IORequest) error -func newParallelIO( +// NewParallelIO creates a new ParallelIO. +func NewParallelIO( ctx context.Context, retryOpts retry.Options, numWorkers int, handler IOHandler, metrics metricsRecorder, -) *parallelIO { + settings *cluster.Settings, +) *ParallelIO { wg := ctxgroup.WithContext(ctx) - io := ¶llelIO{ + io := &ParallelIO{ retryOpts: retryOpts, wg: wg, metrics: metrics, ioHandler: handler, - requestCh: make(chan IORequest, numWorkers), - resultCh: make(chan *ioResult, numWorkers), + quota: quotapool.NewIntPool("changefeed-parallel-io", uint64(requestQuota.Get(&settings.SV))), + requestCh: make(chan AdmittedIORequest, numWorkers), + resultCh: make(chan IOResult, numWorkers), doneCh: make(chan struct{}), } @@ -109,7 +105,7 @@ func newParallelIO( // Close stops all workers immediately and returns once they shut down. Inflight // requests sent to requestCh may never result in being sent to resultCh. -func (p *parallelIO) Close() { +func (p *ParallelIO) Close() { close(p.doneCh) _ = p.wg.Wait() } @@ -123,6 +119,103 @@ var testingEnableQueuingDelay = func() func() { } } +// ioRequest is a wrapper around the IORequest that handles quota to limit the +// number of in-flight requests. +type ioRequest struct { + // Set upon initialization. + r IORequest + a *quotapool.IntAlloc + + // If the request conflicts with in-flight requests, this is the time at + // which the request is placed in the pending queue. + pendingQueueAdmitTime time.Time + + // err is the result of this request. resultTime is when it was obtained. + err error + resultTime time.Time +} + +// Consume implements IOResult. +func (r *ioRequest) Consume() (IORequest, error) { + result := r.r + err := r.err + + r.a.Release() + freeIORequest(r) + + return result, err +} + +// IOResult contains the original IORequest and its resultant error. +type IOResult interface { + // Consume returns the original request and result error. It removes the + // request from ParallelIO, freeing its resources and budget in the request + // quota. + Consume() (IORequest, error) +} + +// requestQuota is the number of requests which can be admitted into the +// parallelio system before blocking the producer. +var requestQuota = settings.RegisterIntSetting( + settings.ApplicationLevel, + "changefeed.parallel_io.request_quota", + "the number of requests which can be admitted into the parallelio"+ + " system before blocking the producer", + 128, + settings.PositiveInt, + settings.WithVisibility(settings.Reserved), +) + +// ErrNotEnoughQuota indicates that a request was not emitted due to a lack of +// quota. +var ErrNotEnoughQuota = quotapool.ErrNotEnoughQuota + +// AdmitRequest returns a AdmittedIORequest and a channel to send it on +// if there is quota available for the request. Otherwise, it returns an +// ErrNotEnoughQuota. +// +// Quota can be freed by calling GetResult() and Consume()ing the IOResult. +// +// TODO(jayants): This should use an `Acquire` instead of a `TryAcquire`, and +// the API should not use channels. The reasons things are done this way are: +// +// 1.The callers (batching sink) use one goroutine to both produce requests and +// consume results. If it blocks on producing, it will deadlock because that +// goroutine cannot free up quota by consuming. This is why the `TryAcquire` is +// used. The caller should use separate goroutines. One for consuming and one for producing. +// +// 2. Both the batching sink and ParallelIO use a `done` channel to close. They +// should use context cancellation. The `done` channel is problematic because `Acquire` +// cannot select on that channel, and ParallelIO cannot detect if the batching +// sink's channel has been closed. Using contexts and cancelling them fixes +// this problem. +func (p *ParallelIO) AdmitRequest( + ctx context.Context, r IORequest, +) (req AdmittedIORequest, send chan AdmittedIORequest, err error) { + a, err := p.quota.TryAcquire(ctx, 1) + + if errors.Is(err, quotapool.ErrNotEnoughQuota) { + return nil, nil, ErrNotEnoughQuota + } else if err != nil { + return nil, nil, err + } + + ra := newIORequest(r, a) + + return ra, p.requestCh, nil +} + +// AdmittedIORequest is an admitted IORequest. +type AdmittedIORequest interface{} + +var _ AdmittedIORequest = (*ioRequest)(nil) + +// GetResult returns a channel which can be waited upon to read the next +// IOResult. +func (p *ParallelIO) GetResult() chan IOResult { + return p.resultCh +} + // processIO starts numEmitWorkers worker threads to run the IOHandler on // non-conflicting IORequests each retrying according to the retryOpts, then: // - Reads incoming messages from requestCh, sending them to any worker if there @@ -147,8 +240,8 @@ var testingEnableQueuingDelay = func() func() { // // The conflict checking is done via an intset.Fast storing the union of all // keys currently being sent, followed by checking each pending batch's intset. -func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { - emitWithRetries := func(ctx context.Context, payload IORequest) error { +func (p *ParallelIO) processIO(ctx context.Context, numEmitWorkers int) error { + emitWithRetries := func(ctx context.Context, r IORequest) error { if testQueuingDelay > 0*time.Second { select { case <-ctx.Done(): @@ -160,28 +253,29 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { initialSend := true return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error { if !initialSend { - p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false) + p.metrics.recordInternalRetry(int64(r.Keys().Len()), false) } initialSend = false - return p.ioHandler(ctx, payload) + return p.ioHandler(ctx, r) }) } // Multiple worker routines handle the IO operations, retrying when necessary. - workerEmitCh := make(chan IORequest, numEmitWorkers) + workerEmitCh := make(chan *ioRequest, numEmitWorkers) defer close(workerEmitCh) - workerResultCh := make(chan *ioResult, numEmitWorkers) + workerResultCh := make(chan *ioRequest, numEmitWorkers) for i := 0; i < numEmitWorkers; i++ { p.wg.GoCtx(func(ctx context.Context) error { for req := range workerEmitCh { - result := newIOResult(req, emitWithRetries(ctx, req)) + req.err = emitWithRetries(ctx, req.r) + req.resultTime = timeutil.Now() select { case <-ctx.Done(): return ctx.Err() case <-p.doneCh: return nil - case workerResultCh <- result: + case workerResultCh <- req: } } return nil @@ -196,8 +290,8 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { // submitIO -> handleResult -> submitIO -> handleResult chain which is complex // to manage. To avoid this, results are added to a pending list to be handled // separately. - var pendingResults []*ioResult - submitIO := func(req IORequest) error { + var pendingResults []*ioRequest + submitIO := func(req *ioRequest) error { for { select { case <-ctx.Done(): @@ -214,42 +308,42 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { // The main routine keeps track of incoming and completed requests, where // admitted requests yet to be completed have their Keys() tracked in an - // intset, and any incoming request with keys already in the intset are placed + // intset, and any incoming ioRequest with keys already in the intset are placed // in a Queue to be sent to IO workers once the conflicting requests complete. var inflight intsets.Fast - var pending []queuedRequest + var pending []*ioRequest metricsRec := p.metrics.newParallelIOMetricsRecorder() - handleResult := func(res *ioResult) error { + handleResult := func(res *ioRequest) error { if res.err == nil { // Clear out the completed keys to check for newly valid pending requests. - requestKeys := res.request.Keys() + requestKeys := res.r.Keys() inflight.DifferenceWith(requestKeys) metricsRec.setInFlightKeys(int64(inflight.Len())) // Check for a pending request that is now able to be sent i.e. is not // conflicting with any inflight requests or any requests that arrived // earlier than itself in the pending queue. pendingKeys := intsets.Fast{} - for i, pendingReq := range pending { - if !inflight.Intersects(pendingReq.req.Keys()) && !pendingKeys.Intersects(pendingReq.req.Keys()) { - inflight.UnionWith(pendingReq.req.Keys()) + for i, req := range pending { + if !inflight.Intersects(req.r.Keys()) && !pendingKeys.Intersects(req.r.Keys()) { + inflight.UnionWith(req.r.Keys()) metricsRec.setInFlightKeys(int64(inflight.Len())) pending = append(pending[:i], pending[i+1:]...) - metricsRec.recordPendingQueuePop(int64(pendingReq.req.NumMessages()), timeutil.Since(pendingReq.admitTime)) - if err := submitIO(pendingReq.req); err != nil { + metricsRec.recordPendingQueuePop(int64(req.r.NumMessages()), timeutil.Since(req.pendingQueueAdmitTime)) + if err := submitIO(req); err != nil { return err } break } - pendingKeys.UnionWith(pendingReq.req.Keys()) + pendingKeys.UnionWith(req.r.Keys()) } } // Copy the arrival time for the metrics recorder below. // Otherwise, it would be possible for res to be admitted to the - // resultCh and freed before we read rec.arrivalTime. - arrivalTime := res.arrivalTime + // resultCh and freed before we read res.resultTime. + arrivalTime := res.resultTime select { case <-ctx.Done(): return ctx.Err() @@ -268,8 +362,8 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { if inflight.Intersects(keys) { return true } - for _, pendingReq := range pending { - if pendingReq.req.Keys().Intersects(keys) { + for _, req := range pending { + if req.r.Keys().Intersects(keys) { return true } } @@ -287,14 +381,16 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { } select { - case req := <-p.requestCh: - if hasConflictingKeys(req.Keys()) { + case admittedReq := <-p.requestCh: + req := admittedReq.(*ioRequest) + if hasConflictingKeys(req.r.Keys()) { // If a request conflicts with any currently unhandled requests, add it // to the pending queue to be rechecked for validity later. - pending = append(pending, queuedRequest{req: req, admitTime: timeutil.Now()}) - metricsRec.recordPendingQueuePush(int64(req.NumMessages())) + req.pendingQueueAdmitTime = timeutil.Now() + pending = append(pending, req) + metricsRec.recordPendingQueuePush(int64(req.r.NumMessages())) } else { - newInFlightKeys := req.Keys() + newInFlightKeys := req.r.Keys() inflight.UnionWith(newInFlightKeys) metricsRec.setInFlightKeys(int64(inflight.Len())) if err := submitIO(req); err != nil { diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 34c86d862e4a..a4bf4f3b003b 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -244,7 +244,8 @@ func getSink( if WebhookV2Enabled.Get(&serverCfg.Settings.SV) { return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, - numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder) + numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, + metricsBuilder, serverCfg.Settings) }) } else { return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { @@ -258,8 +259,10 @@ func getSink( testingKnobs = knobs } if PubsubV2Enabled.Get(&serverCfg.Settings.SV) { - return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), - numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder, testingKnobs) + return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), + opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg), + newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, + metricsBuilder, serverCfg.Settings, testingKnobs) } else { return makeDeprecatedPubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), metricsBuilder, testingKnobs) } diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go index 48648807c70e..b15d94310c29 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub_v2.go +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -19,6 +19,7 @@ import ( pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -420,6 +421,7 @@ func makePubsubSink( pacerFactory func() *admission.Pacer, source timeutil.TimeSource, mb metricsRecorderBuilder, + settings *cluster.Settings, knobs *TestingKnobs, ) (Sink, error) { batchCfg, retryOpts, err := getSinkConfigFromJson(jsonConfig, sinkJSONConfig{ @@ -463,5 +465,6 @@ func makePubsubSink( pacerFactory, source, mb(requiresResourceAccounting), + settings, ), nil } diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index f0f20ba3a37e..7f1bfa69622e 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -85,7 +86,7 @@ func setupWebhookSinkWithDetails( if err != nil { return nil, err } - sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder) + sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder, cluster.MakeClusterSettings()) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index b7fff6ddeced..d654d94e9f01 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -349,6 +350,7 @@ func makeWebhookSink( pacerFactory func() *admission.Pacer, source timeutil.TimeSource, mb metricsRecorderBuilder, + settings *cluster.Settings, ) (Sink, error) { batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig, sinkJSONConfig{}) if err != nil { @@ -371,5 +373,6 @@ func makeWebhookSink( pacerFactory, source, mb(requiresResourceAccounting), + settings, ), nil }