diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 4c76570ab197..a443bdb6f3f3 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -142,6 +142,7 @@ go_library( "//pkg/util/mon", "//pkg/util/parquet", "//pkg/util/protoutil", + "//pkg/util/quotapool", "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/span", diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go index 2f759d8df048..6341d7a49503 100644 --- a/pkg/ccl/changefeedccl/batching_sink.go +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -16,12 +16,15 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) // SinkClient is an interface to an external sink, where messages are written @@ -66,9 +69,10 @@ type batchingSink struct { minFlushFrequency time.Duration retryOpts retry.Options - ts timeutil.TimeSource - metrics metricsRecorder - knobs batchingSinkKnobs + ts timeutil.TimeSource + metrics metricsRecorder + settings *cluster.Settings + knobs batchingSinkKnobs // eventCh is the channel used to send requests from the Sink caller routines // to the batching routine. Messages can either be a flushReq or a rowEvent. @@ -157,7 +161,7 @@ func freeRowEvent(e *rowEvent) { eventPool.Put(e) } -var batchPool sync.Pool = sync.Pool{ +var batchPool = sync.Pool{ New: func() interface{} { return new(sinkBatch) }, @@ -331,7 +335,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { s.metrics.recordSinkIOInflightChange(int64(batch.numMessages)) return s.client.Flush(ctx, batch.payload) } - ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics) + ioEmitter := NewParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics, s.settings) defer ioEmitter.Close() // Flushing requires tracking the number of inflight messages and confirming @@ -339,11 +343,12 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { inflight := 0 var sinkFlushWaiter chan struct{} - handleResult := func(result *ioResult) { - batch, _ := result.request.(*sinkBatch) + handleResult := func(result IOResult) { + req, err := result.Consume() + batch, _ := req.(*sinkBatch) - if result.err != nil { - s.handleError(result.err) + if err != nil { + s.handleError(err) } else { s.metrics.recordEmittedBatch( batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress, @@ -352,12 +357,11 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { inflight -= batch.numMessages - if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil { + if (err != nil || inflight == 0) && sinkFlushWaiter != nil { close(sinkFlushWaiter) sinkFlushWaiter = nil } - freeIOResult(result) batch.alloc.Release(ctx) freeSinkBatchEvent(batch) } @@ -373,22 +377,41 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { return err } - // Emitting needs to also handle any incoming results to avoid a deadlock - // with trying to emit while the emitter is blocked on returning a result. - for { + req, send, err := ioEmitter.AdmitRequest(ctx, batchBuffer) + if errors.Is(err, ErrNotEnoughQuota) { + // Quota can only be freed by consuming a result. select { case <-ctx.Done(): return ctx.Err() - case ioEmitter.requestCh <- batchBuffer: - case result := <-ioEmitter.resultCh: - handleResult(result) - continue case <-s.doneCh: + return nil + case result := <-ioEmitter.GetResult(): + handleResult(result) } - break + + // The request should be emitted after freeing quota since this is + // a single producer scenario. + req, send, err = ioEmitter.AdmitRequest(ctx, batchBuffer) + if errors.Is(err, ErrNotEnoughQuota) { + logcrash.ReportOrPanic(ctx, &s.settings.SV, "expected request to be emitted after waiting for quota") + return errors.AssertionFailedf("expected request to be emitted after waiting for quota") + } else if err != nil { + return err + } + } else if err != nil { + return err } - return nil + // The request was admitted, it must be sent. There are no concurrent requests being sent which + // would use up the quota. + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + return nil + case send <- req: + return nil + } } flushAll := func() error { @@ -478,7 +501,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) { default: s.handleError(fmt.Errorf("received unknown request of unknown type: %v", r)) } - case result := <-ioEmitter.resultCh: + case result := <-ioEmitter.GetResult(): handleResult(result) case <-flushTimer.Ch(): flushTimer.MarkRead() @@ -505,6 +528,7 @@ func makeBatchingSink( pacerFactory func() *admission.Pacer, timeSource timeutil.TimeSource, metrics metricsRecorder, + settings *cluster.Settings, ) Sink { sink := &batchingSink{ client: client, @@ -515,6 +539,7 @@ func makeBatchingSink( retryOpts: retryOpts, ts: timeSource, metrics: metrics, + settings: settings, eventCh: make(chan interface{}, flushQueueDepth), wg: ctxgroup.WithContext(ctx), hasher: makeHasher(), diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go index bc351bf35575..0961bdf4e4a6 100644 --- a/pkg/ccl/changefeedccl/parallel_io.go +++ b/pkg/ccl/changefeedccl/parallel_io.go @@ -13,13 +13,17 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) -// parallelIO allows performing blocking "IOHandler" calls on in parallel. +// ParallelIO allows performing blocking "IOHandler" calls on in parallel. // IORequests implement a Keys() function returning keys on which ordering is // preserved. // Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that @@ -28,7 +32,7 @@ import ( // errored, [b,c] would never be sent, and an error would be returned with [c,d] // in an ioResult struct sent to resultCh. After sending an error to resultCh // all workers are torn down and no further requests are received or handled. -type parallelIO struct { +type ParallelIO struct { retryOpts retry.Options wg ctxgroup.Group metrics metricsRecorder @@ -36,8 +40,9 @@ type parallelIO struct { ioHandler IOHandler - requestCh chan IORequest - resultCh chan *ioResult // readers should freeIOResult after handling result events + quota *quotapool.IntPool + requestCh chan AdmittedIORequest + resultCh chan IOResult } // IORequest represents an abstract unit of IO that has a set of keys upon which @@ -47,56 +52,47 @@ type IORequest interface { NumMessages() int } -// ioResult stores the full request that was sent as well as an error if even -// after retries the IOHanlder was unable to succeed. -type ioResult struct { - request IORequest - err error - // Time representing when this result was received from the sink. - arrivalTime time.Time -} - var resultPool = sync.Pool{ New: func() interface{} { - return new(ioResult) + return new(ioRequest) }, } -func newIOResult(req IORequest, err error) *ioResult { - res := resultPool.Get().(*ioResult) - res.request = req - res.err = err - res.arrivalTime = timeutil.Now() +// newIORequest is used to allocate *ioRequest structs using a pool. +func newIORequest(req IORequest, a *quotapool.IntAlloc) *ioRequest { + res := resultPool.Get().(*ioRequest) + res.r = req + res.a = a return res } -func freeIOResult(e *ioResult) { - *e = ioResult{} - resultPool.Put(e) -} -type queuedRequest struct { - req IORequest - admitTime time.Time +// freeIORequest frees the ioRequest and returns it to the pool. +func freeIORequest(e *ioRequest) { + *e = ioRequest{} + resultPool.Put(e) } // IOHandler performs a blocking IO operation on an IORequest type IOHandler func(context.Context, IORequest) error -func newParallelIO( +// NewParallelIO creates a new ParallelIO. +func NewParallelIO( ctx context.Context, retryOpts retry.Options, numWorkers int, handler IOHandler, metrics metricsRecorder, -) *parallelIO { + settings *cluster.Settings, +) *ParallelIO { wg := ctxgroup.WithContext(ctx) - io := ¶llelIO{ + io := &ParallelIO{ retryOpts: retryOpts, wg: wg, metrics: metrics, ioHandler: handler, - requestCh: make(chan IORequest, numWorkers), - resultCh: make(chan *ioResult, numWorkers), + quota: quotapool.NewIntPool("changefeed-parallel-io", uint64(requestQuota.Get(&settings.SV))), + requestCh: make(chan AdmittedIORequest, numWorkers), + resultCh: make(chan IOResult, numWorkers), doneCh: make(chan struct{}), } @@ -109,7 +105,7 @@ func newParallelIO( // Close stops all workers immediately and returns once they shut down. Inflight // requests sent to requestCh may never result in being sent to resultCh. -func (p *parallelIO) Close() { +func (p *ParallelIO) Close() { close(p.doneCh) _ = p.wg.Wait() } @@ -123,6 +119,103 @@ var testingEnableQueuingDelay = func() func() { } } +// ioRequest is a wrapper around the IORequest that handles quota to limit the +// number of in-flight requests. +type ioRequest struct { + // Set upon initialization. + r IORequest + a *quotapool.IntAlloc + + // If the request conflicts with in-flight requests, this is the time at + // which the request is placed in the pending queue. + pendingQueueAdmitTime time.Time + + // err is the result of this request. resultTime is when it was obtained. + err error + resultTime time.Time +} + +// Consume implements IOResult. +func (r *ioRequest) Consume() (IORequest, error) { + result := r.r + err := r.err + + r.a.Release() + freeIORequest(r) + + return result, err +} + +// IOResult contains the original IORequest and its resultant error. +type IOResult interface { + // Consume returns the original request and result error. It removes the + // request from ParallelIO, freeing its resources and budget in the request + // quota. + Consume() (IORequest, error) +} + +// requestQuota is the number of requests which can be admitted into the +// parallelio system before blocking the producer. +var requestQuota = settings.RegisterIntSetting( + settings.ApplicationLevel, + "changefeed.parallel_io.request_quota", + "the number of requests which can be admitted into the parallelio"+ + " system before blocking the producer", + 128, + settings.PositiveInt, + settings.WithVisibility(settings.Reserved), +) + +// ErrNotEnoughQuota indicates that a request was not emitted due to a lack of +// quota. +var ErrNotEnoughQuota = quotapool.ErrNotEnoughQuota + +// AdmitRequest returns a AdmittedIORequest and a channel to send it on +// if there is quota available for the request. Otherwise, it returns an +// ErrNotEnoughQuota. +// +// Quota can be freed by calling GetResult() and Consume()ing the IOResult. +// +// TODO(jayants): This should use an `Acquire` instead of a `TryAcquire`, and +// the API should not use channels. The reasons things are done this way are: +// +// 1.The callers (batching sink) use one goroutine to both produce requests and +// consume results. If it blocks on producing, it will deadlock because that +// goroutine cannot free up quota by consuming. This is why the `TryAcquire` is +// used. The caller should use separate goroutines. One for consuming and one for producing. +// +// 2. Both the batching sink and ParallelIO use a `done` channel to close. They +// should use context cancellation. The `done` channel is problematic because `Acquire` +// cannot select on that channel, and ParallelIO cannot detect if the batching +// sink's channel has been closed. Using contexts and cancelling them fixes +// this problem. +func (p *ParallelIO) AdmitRequest( + ctx context.Context, r IORequest, +) (req AdmittedIORequest, send chan AdmittedIORequest, err error) { + a, err := p.quota.TryAcquire(ctx, 1) + + if errors.Is(err, quotapool.ErrNotEnoughQuota) { + return nil, nil, ErrNotEnoughQuota + } else if err != nil { + return nil, nil, err + } + + ra := newIORequest(r, a) + + return ra, p.requestCh, nil +} + +// AdmittedIORequest is an admitted IORequest. +type AdmittedIORequest interface{} + +var _ AdmittedIORequest = (*ioRequest)(nil) + +// GetResult returns a channel which can be waited upon to read the next +// IOResult. +func (p *ParallelIO) GetResult() chan IOResult { + return p.resultCh +} + // processIO starts numEmitWorkers worker threads to run the IOHandler on // non-conflicting IORequests each retrying according to the retryOpts, then: // - Reads incoming messages from requestCh, sending them to any worker if there @@ -147,8 +240,8 @@ var testingEnableQueuingDelay = func() func() { // // The conflict checking is done via an intset.Fast storing the union of all // keys currently being sent, followed by checking each pending batch's intset. -func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { - emitWithRetries := func(ctx context.Context, payload IORequest) error { +func (p *ParallelIO) processIO(ctx context.Context, numEmitWorkers int) error { + emitWithRetries := func(ctx context.Context, r IORequest) error { if testQueuingDelay > 0*time.Second { select { case <-ctx.Done(): @@ -160,28 +253,29 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { initialSend := true return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error { if !initialSend { - p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false) + p.metrics.recordInternalRetry(int64(r.Keys().Len()), false) } initialSend = false - return p.ioHandler(ctx, payload) + return p.ioHandler(ctx, r) }) } // Multiple worker routines handle the IO operations, retrying when necessary. - workerEmitCh := make(chan IORequest, numEmitWorkers) + workerEmitCh := make(chan *ioRequest, numEmitWorkers) defer close(workerEmitCh) - workerResultCh := make(chan *ioResult, numEmitWorkers) + workerResultCh := make(chan *ioRequest, numEmitWorkers) for i := 0; i < numEmitWorkers; i++ { p.wg.GoCtx(func(ctx context.Context) error { for req := range workerEmitCh { - result := newIOResult(req, emitWithRetries(ctx, req)) + req.err = emitWithRetries(ctx, req.r) + req.resultTime = timeutil.Now() select { case <-ctx.Done(): return ctx.Err() case <-p.doneCh: return nil - case workerResultCh <- result: + case workerResultCh <- req: } } return nil @@ -196,8 +290,8 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { // submitIO -> handleResult -> submitIO -> handleResult chain which is complex // to manage. To avoid this, results are added to a pending list to be handled // separately. - var pendingResults []*ioResult - submitIO := func(req IORequest) error { + var pendingResults []*ioRequest + submitIO := func(req *ioRequest) error { for { select { case <-ctx.Done(): @@ -214,42 +308,42 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { // The main routine keeps track of incoming and completed requests, where // admitted requests yet to be completed have their Keys() tracked in an - // intset, and any incoming request with keys already in the intset are placed + // intset, and any incoming ioRequest with keys already in the intset are placed // in a Queue to be sent to IO workers once the conflicting requests complete. var inflight intsets.Fast - var pending []queuedRequest + var pending []*ioRequest metricsRec := p.metrics.newParallelIOMetricsRecorder() - handleResult := func(res *ioResult) error { + handleResult := func(res *ioRequest) error { if res.err == nil { // Clear out the completed keys to check for newly valid pending requests. - requestKeys := res.request.Keys() + requestKeys := res.r.Keys() inflight.DifferenceWith(requestKeys) metricsRec.setInFlightKeys(int64(inflight.Len())) // Check for a pending request that is now able to be sent i.e. is not // conflicting with any inflight requests or any requests that arrived // earlier than itself in the pending queue. pendingKeys := intsets.Fast{} - for i, pendingReq := range pending { - if !inflight.Intersects(pendingReq.req.Keys()) && !pendingKeys.Intersects(pendingReq.req.Keys()) { - inflight.UnionWith(pendingReq.req.Keys()) + for i, req := range pending { + if !inflight.Intersects(req.r.Keys()) && !pendingKeys.Intersects(req.r.Keys()) { + inflight.UnionWith(req.r.Keys()) metricsRec.setInFlightKeys(int64(inflight.Len())) pending = append(pending[:i], pending[i+1:]...) - metricsRec.recordPendingQueuePop(int64(pendingReq.req.NumMessages()), timeutil.Since(pendingReq.admitTime)) - if err := submitIO(pendingReq.req); err != nil { + metricsRec.recordPendingQueuePop(int64(req.r.NumMessages()), timeutil.Since(req.pendingQueueAdmitTime)) + if err := submitIO(req); err != nil { return err } break } - pendingKeys.UnionWith(pendingReq.req.Keys()) + pendingKeys.UnionWith(req.r.Keys()) } } // Copy the arrival time for the metrics recorder below. // Otherwise, it would be possible for res to be admitted to the - // resultCh and freed before we read rec.arrivalTime. - arrivalTime := res.arrivalTime + // resultCh and freed before we read res.resultTime. + arrivalTime := res.resultTime select { case <-ctx.Done(): return ctx.Err() @@ -268,8 +362,8 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { if inflight.Intersects(keys) { return true } - for _, pendingReq := range pending { - if pendingReq.req.Keys().Intersects(keys) { + for _, req := range pending { + if req.r.Keys().Intersects(keys) { return true } } @@ -287,14 +381,16 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { } select { - case req := <-p.requestCh: - if hasConflictingKeys(req.Keys()) { + case admittedReq := <-p.requestCh: + req := admittedReq.(*ioRequest) + if hasConflictingKeys(req.r.Keys()) { // If a request conflicts with any currently unhandled requests, add it // to the pending queue to be rechecked for validity later. - pending = append(pending, queuedRequest{req: req, admitTime: timeutil.Now()}) - metricsRec.recordPendingQueuePush(int64(req.NumMessages())) + req.pendingQueueAdmitTime = timeutil.Now() + pending = append(pending, req) + metricsRec.recordPendingQueuePush(int64(req.r.NumMessages())) } else { - newInFlightKeys := req.Keys() + newInFlightKeys := req.r.Keys() inflight.UnionWith(newInFlightKeys) metricsRec.setInFlightKeys(int64(inflight.Len())) if err := submitIO(req); err != nil { diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 34c86d862e4a..a4bf4f3b003b 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -244,7 +244,8 @@ func getSink( if WebhookV2Enabled.Get(&serverCfg.Settings.SV) { return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, - numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder) + numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, + metricsBuilder, serverCfg.Settings) }) } else { return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { @@ -258,8 +259,10 @@ func getSink( testingKnobs = knobs } if PubsubV2Enabled.Get(&serverCfg.Settings.SV) { - return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), - numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder, testingKnobs) + return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), + opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg), + newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, + metricsBuilder, serverCfg.Settings, testingKnobs) } else { return makeDeprecatedPubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), metricsBuilder, testingKnobs) } diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go index 48648807c70e..b15d94310c29 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub_v2.go +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -19,6 +19,7 @@ import ( pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -420,6 +421,7 @@ func makePubsubSink( pacerFactory func() *admission.Pacer, source timeutil.TimeSource, mb metricsRecorderBuilder, + settings *cluster.Settings, knobs *TestingKnobs, ) (Sink, error) { batchCfg, retryOpts, err := getSinkConfigFromJson(jsonConfig, sinkJSONConfig{ @@ -463,5 +465,6 @@ func makePubsubSink( pacerFactory, source, mb(requiresResourceAccounting), + settings, ), nil } diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index f0f20ba3a37e..7f1bfa69622e 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -85,7 +86,7 @@ func setupWebhookSinkWithDetails( if err != nil { return nil, err } - sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder) + sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder, cluster.MakeClusterSettings()) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index b7fff6ddeced..d654d94e9f01 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -349,6 +350,7 @@ func makeWebhookSink( pacerFactory func() *admission.Pacer, source timeutil.TimeSource, mb metricsRecorderBuilder, + settings *cluster.Settings, ) (Sink, error) { batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig, sinkJSONConfig{}) if err != nil { @@ -371,5 +373,6 @@ func makeWebhookSink( pacerFactory, source, mb(requiresResourceAccounting), + settings, ), nil } diff --git a/pkg/cmd/roachtest/tests/ruby_pg.go b/pkg/cmd/roachtest/tests/ruby_pg.go index d021707829f7..413a893336f8 100644 --- a/pkg/cmd/roachtest/tests/ruby_pg.go +++ b/pkg/cmd/roachtest/tests/ruby_pg.go @@ -22,12 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" - "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/roachprod/config" rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" "github.com/cockroachdb/cockroach/pkg/roachprod/install" - "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -38,7 +36,7 @@ var testSummaryRegexp = regexp.MustCompile("^([0-9]+) examples, [0-9]+ failures" // WARNING: DO NOT MODIFY the name of the below constant/variable without approval from the docs team. // This is used by docs automation to produce a list of supported versions for ORM's. -var rubyPGVersion = "v1.3.5" +var rubyPGVersion = "v1.4.6" // This test runs Ruby PG's full test suite against a single cockroach node. func registerRubyPG(r registry.Registry) { @@ -239,13 +237,10 @@ func registerRubyPG(r registry.Registry) { } r.Add(registry.TestSpec{ - Name: "ruby-pg", - Timeout: 1 * time.Hour, - Owner: registry.OwnerSQLFoundations, - // TODO(DarrylWong): This test currently fails on Ubuntu 22.04 so we run it on 20.04. - // See: https://github.com/cockroachdb/cockroach/issues/112109 - // Once this issue is fixed we should remove this Ubuntu Version override. - Cluster: r.MakeClusterSpec(1, spec.UbuntuVersion(vm.FocalFossa)), + Name: "ruby-pg", + Timeout: 1 * time.Hour, + Owner: registry.OwnerSQLFoundations, + Cluster: r.MakeClusterSpec(1), Leases: registry.MetamorphicLeases, NativeLibs: registry.LibGEOS, CompatibleClouds: registry.AllExceptAWS, diff --git a/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go b/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go index da2026236449..c622233fdda9 100644 --- a/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go +++ b/pkg/cmd/roachtest/tests/ruby_pg_blocklist.go @@ -41,6 +41,7 @@ var rubyPGBlocklist = blocklist{ `Basic type mapping PG::BasicTypeMapForResults connection wide type mapping should do cidr type conversions`: "unknown", `Basic type mapping PG::BasicTypeMapForResults connection wide type mapping should do text datetime without time zone type conversions`: "unknown", `GC.compact should compact PG::TypeMapByClass #328`: "unknown", + `PG::Connection #get_result should send remaining data before waiting`: "unknown", `PG::Connection accepts nil as the timeout in #wait_for_notify `: "unknown", `PG::Connection allows a query to be cancelled`: "unknown", `PG::Connection calls a block for NOTIFY events if one is given`: "unknown", @@ -48,13 +49,9 @@ var rubyPGBlocklist = blocklist{ `PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts three arguments`: "unknown", `PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts two arguments`: "unknown", `PG::Connection calls the block supplied to wait_for_notify with the notify payload if it doesn't accept arguments`: "unknown", - `PG::Connection can handle client errors in #copy_data for input`: "unknown", - `PG::Connection can handle server errors in #copy_data for input`: "unknown", - `PG::Connection can handle server errors in #copy_data for output`: "unknown", - `PG::Connection can process #copy_data input queries`: "unknown", - `PG::Connection can process #copy_data input queries with lots of data`: "unknown", `PG::Connection can receive notices while waiting for NOTIFY without exceeding the timeout`: "unknown", `PG::Connection can wait for NOTIFY events`: "unknown", + `PG::Connection carries the connection in case of connection errors`: "unknown", `PG::Connection connection information related to SSL can retrieve a single ssl connection attribute`: "unknown", `PG::Connection connection information related to SSL can retrieve connection's ssl state`: "unknown", `PG::Connection connects using URI with UnixSocket host`: "unknown", @@ -63,37 +60,36 @@ var rubyPGBlocklist = blocklist{ `PG::Connection doesn't collapse sequential notifications`: "unknown", `PG::Connection doesn't leave stale server connections after finish`: "unknown", `PG::Connection emits a suitable error_message at connection errors`: "unknown", - `PG::Connection gracefully handle SQL statements while in #copy_data for input`: "unknown", - `PG::Connection gracefully handle SQL statements while in #copy_data for output`: "unknown", `PG::Connection in nonblocking mode can send query with params`: "unknown", `PG::Connection in nonblocking mode needs to flush data after send_query`: "unknown", `PG::Connection in nonblocking mode rejects to send lots of COPY data`: "unknown", `PG::Connection in nonblocking mode returns immediately from get_copy_data(nonblock=true)`: "unknown", + `PG::Connection large objects large object can handle big data`: "unknown", + `PG::Connection large objects not read past the end of a large object`: "unknown", `PG::Connection multinationalization support Ruby 1.9.x default_internal encoding allows users of the async interface to set the client_encoding to the default_internal`: "unknown", `PG::Connection multinationalization support Ruby 1.9.x default_internal encoding honors the Encoding.default_internal if it's set and the synchronous interface is used`: "unknown", `PG::Connection multinationalization support encodes exception messages with the connection's encoding (#96)`: "unknown", `PG::Connection multinationalization support handles clearing result in or after set_notice_receiver`: "unknown", `PG::Connection multinationalization support receives properly encoded messages in the notice callbacks`: "unknown", `PG::Connection multinationalization support receives properly encoded text from wait_for_notify`: "unknown", + `PG::Connection multinationalization support respect and convert character encoding of input strings should convert error string to #put_copy_end`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert query string and parameters to #exec_params`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert query string and parameters to #send_query_params`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #prepare and #exec_prepared`: "unknown", `PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #send_prepare and #send_query_prepared`: "unknown", `PG::Connection multinationalization support returns properly encoded text from notifies`: "unknown", - `PG::Connection multinationalization support rubyforge #22925: m17n support can use an encoding with high index for client encoding`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support raises appropriate error if set_client_encoding is called with invalid arguments`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support returns the results in the correct encoding even if the client_encoding has changed since the results were fetched`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (EUC-JP)`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (iso-8859-1)`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support the connection should return ASCII-8BIT when it's set to SQL_ASCII`: "unknown", - `PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use JOHAB dummy encoding when it's set to JOHAB`: "unknown", + `PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use the BINARY encoding when it's set to JOHAB`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped identifier`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped literal`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped string`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for quote_ident`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for escaped string`: "unknown", `PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for quote_ident`: "unknown", - `PG::Connection not read past the end of a large object`: "unknown", `PG::Connection returns notifications which are already in the queue before wait_for_notify is called without waiting for the socket to become readable`: "unknown", `PG::Connection sends nil as the payload if the notification wasn't given one`: "unknown", `PG::Connection set_single_row_mode should receive rows before entire query is finished`: "unknown", @@ -105,11 +101,24 @@ var rubyPGBlocklist = blocklist{ `PG::Connection type casting with default result type map should respect a type mapping for result`: "unknown", `PG::Connection type casting with default result type map should work with arbitrary number of params in conjunction with type casting`: "unknown", `PG::Connection with async established connection provides the server generated error message`: "unknown", + `PG::Connection with multiple PostgreSQL servers honors target_session_attrs requirements`: "unknown", + `PG::Connection#copy_data can handle client errors in #copy_data for input`: "unknown", + `PG::Connection#copy_data can handle server errors in #copy_data for input`: "unknown", + `PG::Connection#copy_data can handle server errors in #copy_data for output`: "unknown", + `PG::Connection#copy_data can process #copy_data input queries`: "unknown", + `PG::Connection#copy_data can process #copy_data input queries with lots of data`: "unknown", + `PG::Connection#copy_data doesn't lose client error when #copy_data can not be finished`: "unknown", + `PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for input`: "unknown", + `PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for output`: "unknown", + `PG::Connection#discard_results returns false on connection failures`: "unknown", + `PG::Connection#inspect should print host, port and user of a fresh connection, but not more`: "unknown", + `PG::Connection#inspect should tell about non UTF8 client encoding`: "unknown", `PG::Connection#transaction automatically rolls back a transaction if an exception is raised`: "unknown", + `PG::Connection#transaction commits even if the block includes an early break/return`: "unknown", `PG::Connection#transaction doesn't worry about an already finished connection`: "unknown", `PG::Connection#transaction passes the connection to the block and returns the block result`: "unknown", - `PG::Connection#transaction stops a thread that runs a blocking transaction with async_exec`: "unknown", - `PG::Connection#transaction stops a thread that runs a failing transaction with async_exec`: "unknown", + `PG::Connection#transaction stops a thread that runs a blocking transaction with exec`: "unknown", + `PG::Connection#transaction stops a thread that runs a failing transaction with exec`: "unknown", `PG::Connection#transaction stops a thread that runs a no query but a transacted ruby sleep`: "unknown", `PG::Result encapsulates database object names for integrity constraint violations`: "unknown", `PG::Result encapsulates errors in a PG::Error object`: "unknown", @@ -118,6 +127,7 @@ var rubyPGBlocklist = blocklist{ `PG::TypeMapByOid should allow mixed type conversions in binary format`: "unknown", `PG::TypeMapByOid should allow mixed type conversions in text format`: "unknown", `PG::TypeMapByOid should build a TypeMapByColumn when assigned and the number of rows is high enough`: "unknown", + `running with sync_* methods PG::Connection #get_result should send remaining data before waiting`: "unknown", `running with sync_* methods PG::Connection accepts nil as the timeout in #wait_for_notify `: "unknown", `running with sync_* methods PG::Connection allows a query to be cancelled`: "unknown", `running with sync_* methods PG::Connection calls a block for NOTIFY events if one is given`: "unknown", @@ -125,13 +135,9 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts three arguments`: "unknown", `running with sync_* methods PG::Connection calls the block supplied to wait_for_notify with the notify payload if it accepts two arguments`: "unknown", `running with sync_* methods PG::Connection calls the block supplied to wait_for_notify with the notify payload if it doesn't accept arguments`: "unknown", - `running with sync_* methods PG::Connection can handle client errors in #copy_data for input`: "unknown", - `running with sync_* methods PG::Connection can handle server errors in #copy_data for input`: "unknown", - `running with sync_* methods PG::Connection can handle server errors in #copy_data for output`: "unknown", - `running with sync_* methods PG::Connection can process #copy_data input queries`: "unknown", - `running with sync_* methods PG::Connection can process #copy_data input queries with lots of data`: "unknown", `running with sync_* methods PG::Connection can receive notices while waiting for NOTIFY without exceeding the timeout`: "unknown", `running with sync_* methods PG::Connection can wait for NOTIFY events`: "unknown", + `running with sync_* methods PG::Connection carries the connection in case of connection errors`: "unknown", `running with sync_* methods PG::Connection connection information related to SSL can retrieve a single ssl connection attribute`: "unknown", `running with sync_* methods PG::Connection connection information related to SSL can retrieve connection's ssl state`: "unknown", `running with sync_* methods PG::Connection connects using URI with UnixSocket host`: "unknown", @@ -140,12 +146,12 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection doesn't collapse sequential notifications`: "unknown", `running with sync_* methods PG::Connection doesn't leave stale server connections after finish`: "unknown", `running with sync_* methods PG::Connection emits a suitable error_message at connection errors`: "unknown", - `running with sync_* methods PG::Connection gracefully handle SQL statements while in #copy_data for input`: "unknown", - `running with sync_* methods PG::Connection gracefully handle SQL statements while in #copy_data for output`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode can send query with params`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode needs to flush data after send_query`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode rejects to send lots of COPY data`: "unknown", `running with sync_* methods PG::Connection in nonblocking mode returns immediately from get_copy_data(nonblock=true)`: "unknown", + `running with sync_* methods PG::Connection large objects large object can handle big data`: "unknown", + `running with sync_* methods PG::Connection large objects not read past the end of a large object`: "unknown", `running with sync_* methods PG::Connection multinationalization support Ruby 1.9.x default_internal encoding allows users of the async interface to set the client_encoding to the default_internal`: "unknown", `running with sync_* methods PG::Connection multinationalization support Ruby 1.9.x default_internal encoding honors the Encoding.default_internal if it's set and the synchronous interface is used`: "unknown", `running with sync_* methods PG::Connection multinationalization support encodes exception messages with the connection's encoding (#96)`: "unknown", @@ -157,20 +163,18 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #prepare and #exec_prepared`: "unknown", `running with sync_* methods PG::Connection multinationalization support respect and convert character encoding of input strings should convert strings and parameters to #send_prepare and #send_query_prepared`: "unknown", `running with sync_* methods PG::Connection multinationalization support returns properly encoded text from notifies`: "unknown", - `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support can use an encoding with high index for client encoding`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support raises appropriate error if set_client_encoding is called with invalid arguments`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support returns the results in the correct encoding even if the client_encoding has changed since the results were fetched`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (EUC-JP)`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support should return results in the same encoding as the client (iso-8859-1)`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support the connection should return ASCII-8BIT when it's set to SQL_ASCII`: "unknown", - `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use JOHAB dummy encoding when it's set to JOHAB`: "unknown", + `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support the connection should use the BINARY encoding when it's set to JOHAB`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped identifier`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped literal`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for escaped string`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the client encoding for quote_ident`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for escaped string`: "unknown", `running with sync_* methods PG::Connection multinationalization support rubyforge #22925: m17n support uses the previous string encoding for quote_ident`: "unknown", - `running with sync_* methods PG::Connection not read past the end of a large object`: "unknown", `running with sync_* methods PG::Connection returns notifications which are already in the queue before wait_for_notify is called without waiting for the socket to become readable`: "unknown", `running with sync_* methods PG::Connection sends nil as the payload if the notification wasn't given one`: "unknown", `running with sync_* methods PG::Connection set_single_row_mode should receive rows before entire query is finished`: "unknown", @@ -182,12 +186,27 @@ var rubyPGBlocklist = blocklist{ `running with sync_* methods PG::Connection type casting with default result type map should respect a type mapping for result`: "unknown", `running with sync_* methods PG::Connection type casting with default result type map should work with arbitrary number of params in conjunction with type casting`: "unknown", `running with sync_* methods PG::Connection with async established connection provides the server generated error message`: "unknown", + `running with sync_* methods PG::Connection with multiple PostgreSQL servers honors target_session_attrs requirements`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle client errors after all data is consumed in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle client errors in #copy_data for input`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle client errors in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle incomplete #copy_data output queries`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle server errors in #copy_data for input`: "unknown", + `running with sync_* methods PG::Connection#copy_data can handle server errors in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data can process #copy_data input queries`: "unknown", + `running with sync_* methods PG::Connection#copy_data can process #copy_data input queries with lots of data`: "unknown", + `running with sync_* methods PG::Connection#copy_data can process #copy_data output queries`: "unknown", + `running with sync_* methods PG::Connection#copy_data doesn't lose client error when #copy_data can not be finished`: "unknown", + `running with sync_* methods PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for input`: "unknown", + `running with sync_* methods PG::Connection#copy_data gracefully handle SQL statements while in #copy_data for output`: "unknown", + `running with sync_* methods PG::Connection#copy_data should raise an error for non copy statements in #copy_data`: "unknown", + `running with sync_* methods PG::Connection#discard_results discards previous results`: "unknown", + `running with sync_* methods PG::Connection#discard_results returns false on connection failures`: "unknown", + `running with sync_* methods PG::Connection#inspect should print host, port and user of a fresh connection, but not more`: "unknown", + `running with sync_* methods PG::Connection#inspect should tell about non UTF8 client encoding`: "unknown", `running with sync_* methods PG::Connection#transaction automatically rolls back a transaction if an exception is raised`: "unknown", - `running with sync_* methods PG::Connection#transaction doesn't worry about an already finished connection`: "unknown", + `running with sync_* methods PG::Connection#transaction commits even if the block includes an early break/return`: "unknown", `running with sync_* methods PG::Connection#transaction passes the connection to the block and returns the block result`: "unknown", - `running with sync_* methods PG::Connection#transaction stops a thread that runs a blocking transaction with async_exec`: "unknown", - `running with sync_* methods PG::Connection#transaction stops a thread that runs a failing transaction with async_exec`: "unknown", - `running with sync_* methods PG::Connection#transaction stops a thread that runs a no query but a transacted ruby sleep`: "unknown", `with a Fiber scheduler can cancel a query`: "unknown", `with a Fiber scheduler can receive COPY data`: "unknown", `with a Fiber scheduler can retrieve several results`: "unknown", diff --git a/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb b/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb index b8c4c1c87ab1..d9547ad722c4 100644 --- a/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb +++ b/pkg/cmd/roachtest/tests/ruby_pg_helpers.rb @@ -10,8 +10,9 @@ require 'openssl' require_relative 'helpers/scheduler.rb' require_relative 'helpers/tcp_gate_scheduler.rb' +require_relative 'helpers/tcp_gate_switcher.rb' -DEFAULT_TEST_DIR_STR = File.join(Dir.pwd, "tmp_test_specs") +DEFAULT_TEST_DIR_STR = Dir.pwd TEST_DIR_STR = ENV['RUBY_PG_TEST_DIR'] || DEFAULT_TEST_DIR_STR TEST_DIRECTORY = Pathname.new(TEST_DIR_STR) DATA_OBJ_MEMSIZE = 40 @@ -25,7 +26,10 @@ def self::included( mod ) if mod.respond_to?( :around ) mod.before( :all ) do - @conn = connect_testing_db + @port = $pg_server.port + @conninfo = $pg_server.conninfo + @unix_socket = $pg_server.unix_socket + @conn = $pg_server.connect end mod.around( :each ) do |example| @@ -95,96 +99,99 @@ def self::included( mod ) module_function ############### - ### Create a string that contains the ANSI codes specified and return it - def ansi_code( *attributes ) - attributes.flatten! - attributes.collect! {|at| at.to_s } + module Loggable + ### Create a string that contains the ANSI codes specified and return it + def ansi_code( *attributes ) + attributes.flatten! + attributes.collect! {|at| at.to_s } - return '' unless /(?:vt10[03]|xterm(?:-color)?|linux|screen)/i =~ ENV['TERM'] - attributes = ANSI_ATTRIBUTES.values_at( *attributes ).compact.join(';') + return '' unless /(?:vt10[03]|xterm(?:-color)?|linux|screen)/i =~ ENV['TERM'] + attributes = ANSI_ATTRIBUTES.values_at( *attributes ).compact.join(';') - # $stderr.puts " attr is: %p" % [attributes] - if attributes.empty? - return '' - else - return "\e[%sm" % attributes + # $stderr.puts " attr is: %p" % [attributes] + if attributes.empty? + return '' + else + return "\e[%sm" % attributes + end end - end - ### Colorize the given +string+ with the specified +attributes+ and return it, handling - ### line-endings, color reset, etc. - def colorize( *args ) - string = '' + ### Colorize the given +string+ with the specified +attributes+ and return it, handling + ### line-endings, color reset, etc. + def colorize( *args ) + string = '' - if block_given? - string = yield - else - string = args.shift - end + if block_given? + string = yield + else + string = args.shift + end - ending = string[/(\s)$/] || '' - string = string.rstrip + ending = string[/(\s)$/] || '' + string = string.rstrip - return ansi_code( args.flatten ) + string + ansi_code( 'reset' ) + ending - end + return ansi_code( args.flatten ) + string + ansi_code( 'reset' ) + ending + end - ### Output a message with highlighting. - def message( *msg ) - $stderr.puts( colorize(:bold) { msg.flatten.join(' ') } ) - end + ### Output a message with highlighting. + def message( *msg ) + $stderr.puts( colorize(:bold) { msg.flatten.join(' ') } ) + end - ### Output a logging message if $VERBOSE is true - def trace( *msg ) - return unless $VERBOSE - output = colorize( msg.flatten.join(' '), 'yellow' ) - $stderr.puts( output ) - end + ### Output a logging message if $VERBOSE is true + def trace( *msg ) + return unless $VERBOSE + output = colorize( msg.flatten.join(' '), 'yellow' ) + $stderr.puts( output ) + end - ### Return the specified args as a string, quoting any that have a space. - def quotelist( *args ) - return args.flatten.collect {|part| part.to_s =~ /\s/ ? part.to_s.inspect : part.to_s } - end + ### Return the specified args as a string, quoting any that have a space. + def quotelist( *args ) + return args.flatten.collect {|part| part.to_s =~ /\s/ ? part.to_s.inspect : part.to_s } + end - ### Run the specified command +cmd+ with system(), failing if the execution - ### fails. - def run( *cmd ) - cmd.flatten! + ### Run the specified command +cmd+ with system(), failing if the execution + ### fails. + def run( *cmd ) + cmd.flatten! - if cmd.length > 1 - trace( quotelist(*cmd) ) - else - trace( cmd ) - end + if cmd.length > 1 + trace( quotelist(*cmd) ) + else + trace( cmd ) + end - system( *cmd ) - raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? - end + system( *cmd ) + raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? + end - ### Run the specified command +cmd+ after redirecting stdout and stderr to the specified - ### +logpath+, failing if the execution fails. - def log_and_run( logpath, *cmd ) - cmd.flatten! + ### Run the specified command +cmd+ after redirecting stdout and stderr to the specified + ### +logpath+, failing if the execution fails. + def log_and_run( logpath, *cmd ) + cmd.flatten! - if cmd.length > 1 - trace( quotelist(*cmd) ) - else - trace( cmd ) - end + if cmd.length > 1 + trace( quotelist(*cmd) ) + else + trace( cmd ) + end - # Eliminate the noise of creating/tearing down the database by - # redirecting STDERR/STDOUT to a logfile - logfh = File.open( logpath, File::WRONLY|File::CREAT|File::APPEND ) - system( *cmd, [STDOUT, STDERR] => logfh ) + # Eliminate the noise of creating/tearing down the database by + # redirecting STDERR/STDOUT to a logfile + logfh = File.open( logpath, File::WRONLY|File::CREAT|File::APPEND ) + system( *cmd, [STDOUT, STDERR] => logfh ) - raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? + raise "Command failed: [%s]" % [cmd.join(' ')] unless $?.success? + end end + extend Loggable ### Check the current directory for directories that look like they're ### testing directories from previous tests, and tell any postgres instances @@ -212,61 +219,92 @@ def stop_existing_postmasters end end - def define_testing_conninfo - ENV['PGPORT'] ||= "26257" - @port = ENV['PGPORT'].to_i - ENV['PGHOST'] = 'localhost' - ENV['PGUSER'] = 'test_admin' - @conninfo = "user=test_admin host=localhost port=#{@port} dbname=test" - @unix_socket = TEST_DIRECTORY.to_s - end + class PostgresServer + include Loggable - ### Set up a CockroachDB database instance for testing. - def setup_testing_db( description ) - stop_existing_postmasters() + attr_reader :port + attr_reader :conninfo + attr_reader :unix_socket - trace "Setting up test database for #{description}" - @test_pgdata = TEST_DIRECTORY + 'data' - @test_pgdata.mkpath + ### Set up a PostgreSQL database instance for testing. + def initialize( name, port: 26257, postgresql_conf: '' ) + trace "Setting up test database for #{name}" + @name = name + @port = port + @test_dir = TEST_DIRECTORY + "tmp_test_#{@name}" + @test_pgdata = @test_dir + 'data' + @test_pgdata.mkpath - define_testing_conninfo + @logfile = @test_dir + 'setup.log' + trace "Command output logged to #{@logfile}" - @logfile = TEST_DIRECTORY + 'setup.log' - trace "Command output logged to #{@logfile}" + begin + unless (@test_pgdata+"postgresql.conf").exist? + FileUtils.rm_rf( @test_pgdata, :verbose => $DEBUG ) + end - begin - unless (@test_pgdata+"postgresql.conf").exist? - FileUtils.rm_rf( @test_pgdata, :verbose => $DEBUG ) - trace "GG" - trace "Running initdb" + unless @port == 26257 + # The main database instance is started by the roachtest + # script, so skip this step if we're using the default port. + trace "Starting cockroachdb" + log_and_run @logfile, '/home/ubuntu/cockroach', 'start-single-node', '--insecure', + "--store=#{@test_pgdata.to_s}", + "--advertise-addr=localhost", + "--sql-addr=localhost:#{@port}" + sleep(2) + log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'CREATE USER test_admin' + log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'GRANT admin TO test_admin' + end + + td = @test_pgdata + @conninfo = "user=test_admin host=localhost port=#{@port} dbname=test" + @unix_socket = @test_dir.to_s + rescue => err + $stderr.puts "%p during test setup: %s" % [ err.class, err.message ] + $stderr.puts "See #{@logfile} for details." + $stderr.puts err.backtrace if $DEBUG + fail end + end + + def generate_ssl_certs(output_dir) + gen = CertGenerator.new(output_dir) + + trace "create ca-key" + ca_key = gen.create_key('ruby-pg-ca-key') + ca_cert = gen.create_ca_cert('ruby-pg-ca-cert', ca_key, '/CN=ruby-pg root key') + trace "create server cert" + key = gen.create_key('ruby-pg-server-key') + csr = gen.create_signing_request('ruby-pg-server-csr', '/CN=localhost', key) + gen.create_cert_from_csr('ruby-pg-server-cert', csr, ca_cert, ca_key, dns_names: %w[localhost] ) + + trace "create client cert" + key = gen.create_key('ruby-pg-client-key') + csr = gen.create_signing_request('ruby-pg-client-csr', '/CN=ruby-pg client', key) + gen.create_cert_from_csr('ruby-pg-client-cert', csr, ca_cert, ca_key) + end + + def create_test_db trace "Creating the test DB" log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'DROP DATABASE IF EXISTS test' log_and_run @logfile, '/home/ubuntu/cockroach', 'sql', '--insecure', '-e', 'CREATE DATABASE test' - - rescue => err - $stderr.puts "%p during test setup: %s" % [ err.class, err.message ] - $stderr.puts "See #{@logfile} for details." - $stderr.puts err.backtrace if $DEBUG - fail end - end - def connect_testing_db - define_testing_conninfo - conn = PG.connect( @conninfo ) - conn.set_notice_processor do |message| - $stderr.puts( description + ':' + message ) if $DEBUG - end + def connect + conn = PG.connect( @conninfo ) + conn.set_notice_processor do |message| + $stderr.puts( @name + ':' + message ) if $DEBUG + end - return conn - end + return conn + end - def teardown_testing_db - trace "Tearing down test database" - # This is changed to a no-op so that we can inspect the database after the - # test runs. + def teardown + trace "Tearing down test database for #{@name}" + # This is changed to a no-op so that we can inspect the database after the + # test runs. + end end class CertGenerator @@ -378,24 +416,6 @@ def create_cert_from_csr(name, csr, ca_cert, ca_key, valid_years: 10, dns_names: end end - def generate_ssl_certs(output_dir) - gen = CertGenerator.new(output_dir) - - trace "create ca-key" - ca_key = gen.create_key('ruby-pg-ca-key') - ca_cert = gen.create_ca_cert('ruby-pg-ca-cert', ca_key, '/CN=ruby-pg root key') - - trace "create server cert" - key = gen.create_key('ruby-pg-server-key') - csr = gen.create_signing_request('ruby-pg-server-csr', '/CN=localhost', key) - gen.create_cert_from_csr('ruby-pg-server-cert', csr, ca_cert, ca_key, dns_names: %w[localhost] ) - - trace "create client cert" - key = gen.create_key('ruby-pg-client-key') - csr = gen.create_signing_request('ruby-pg-client-csr', '/CN=ruby-pg client', key) - gen.create_cert_from_csr('ruby-pg-client-cert', csr, ca_cert, ca_key) - end - def check_for_lingering_connections( conn ) conn.exec( "SELECT * FROM pg_stat_activity" ) do |res| conns = res.find_all {|row| row['pid'].to_i != conn.backend_pid && ["client backend", nil].include?(row["backend_type"]) } @@ -510,6 +530,97 @@ def wait_for_flush(conn) end end end + + def scheduler_setup + # Run examples with gated scheduler + sched = Helpers::TcpGateScheduler.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i, debug: ENV['PG_DEBUG']=='1') + Fiber.set_scheduler(sched) + @conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{sched.internal_port} sslmode=disable") + + # Run examples with default scheduler + #Fiber.set_scheduler(Helpers::Scheduler.new) + #@conninfo_gate = @conninfo + + # Run examples without scheduler + #def Fiber.schedule; yield; end + #@conninfo_gate = @conninfo + end + + def scheduler_teardown + Fiber.set_scheduler(nil) + end + + def scheduler_stop + if Fiber.scheduler && Fiber.scheduler.respond_to?(:finish) + Fiber.scheduler.finish + end + end + + def thread_with_timeout(timeout) + th = Thread.new do + yield + end + unless th.join(timeout) + th.kill + $scheduler_timeout = true + raise("scheduler timeout in:\n#{th.backtrace.join("\n")}") + end + end + + def run_with_scheduler(timeout=10) + thread_with_timeout(timeout) do + scheduler_setup + Fiber.schedule do + conn = PG.connect(@conninfo_gate) + + yield conn + + conn.finish + scheduler_stop + end + end + scheduler_teardown + end + + def gate_setup + # Run examples with gate + gate = Helpers::TcpGateSwitcher.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i, debug: ENV['PG_DEBUG']=='1') + @conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{gate.internal_port} sslmode=disable") + + # Run examples without gate + #@conninfo_gate = @conninfo + gate + end + + def gate_stop(gate) + gate&.finish + end + + def run_with_gate(timeout=10) + thread_with_timeout(timeout) do + gate = gate_setup + conn = PG.connect(@conninfo_gate) + + yield conn, gate + + conn.finish + gate_stop(gate) + end + end + + # Define environment variables for the time of the given block + # + # All environment variables are restored to the original value or undefined after the block. + def with_env_vars(**kwargs) + kwargs = kwargs.map{|k,v| [k.to_s, v && v.to_s] }.to_h + old_values = kwargs.map{|k,_| [k, ENV[k]] }.to_h + ENV.update(kwargs) + begin + yield + ensure + ENV.update(old_values) + end + end end @@ -538,12 +649,20 @@ def wait_for_flush(conn) config.filter_run_excluding( :unix_socket ) if RUBY_PLATFORM=~/mingw|mswin/i config.filter_run_excluding( :scheduler ) if RUBY_VERSION < "3.0" || !Fiber.respond_to?(:scheduler) config.filter_run_excluding( :scheduler_address_resolve ) if RUBY_VERSION < "3.1" + config.filter_run_excluding( :ipv6 ) if Addrinfo.getaddrinfo("localhost", nil, nil, :STREAM).size < 2 ### Automatically set up and tear down the database config.before(:suite) do |*args| - PG::TestingHelpers.setup_testing_db("the spec suite") + PG::TestingHelpers.stop_existing_postmasters + + ENV['PGHOST'] = 'localhost' + ENV['PGPORT'] ||= "26257" + port = ENV['PGPORT'].to_i + ENV['PGUSER'] = 'test_admin' + $pg_server = PG::TestingHelpers::PostgresServer.new("specs", port: port) + $pg_server.create_test_db end config.after(:suite) do - PG::TestingHelpers.teardown_testing_db + $pg_server.teardown end end