diff --git a/db/change_listener.go b/db/change_listener.go index 8316cdad54..44c8b6fc89 100644 --- a/db/change_listener.go +++ b/db/change_listener.go @@ -16,6 +16,7 @@ import ( "math" "strings" "sync" + "sync/atomic" "time" sgbucket "github.com/couchbase/sg-bucket" @@ -34,7 +35,7 @@ type changeListener struct { tapNotifier *sync.Cond // Posts notifications when documents are updated FeedArgs sgbucket.FeedArguments // The Tap Args (backfill, etc) counter uint64 // Event counter; increments on every doc update - terminateCheckCounter uint64 // Termination Event counter; increments on every notifyCheckForTermination + terminateCheckCounter atomic.Uint64 // Termination Event counter; increments on every notifyCheckForTermination keyCounts map[string]uint64 // Latest count at which each doc key was updated OnChangeCallback DocChangedFunc terminator chan bool // Signal to cause DCP feed to exit @@ -48,7 +49,7 @@ type DocChangedFunc func(event sgbucket.FeedEvent) func (listener *changeListener) Init(name string, groupID string, metaKeys *base.MetadataKeys) { listener.bucketName = name listener.counter = 1 - listener.terminateCheckCounter = 0 + listener.terminateCheckCounter.Store(0) listener.keyCounts = map[string]uint64{} listener.tapNotifier = sync.NewCond(&sync.Mutex{}) listener.sgCfgPrefix = metaKeys.SGCfgPrefix(groupID) @@ -233,10 +234,10 @@ func (listener *changeListener) NotifyCheckForTermination(ctx context.Context, k // Increment terminateCheckCounter, but loop back to zero //if we have reached maximum value for uint64 type - if listener.terminateCheckCounter < math.MaxUint64 { - listener.terminateCheckCounter++ + if listener.terminateCheckCounter.Load() < math.MaxUint64 { + listener.terminateCheckCounter.Add(1) } else { - listener.terminateCheckCounter = 0 + listener.terminateCheckCounter.Store(0) } base.DebugfCtx(ctx, base.KeyChanges, "Notifying to check for _changes feed termination") @@ -254,8 +255,8 @@ func (listener *changeListener) Wait(ctx context.Context, keys []string, counter for { curCounter := listener._currentCount(keys) - if curCounter != counter || listener.terminateCheckCounter != terminateCheckCounter { - return curCounter, listener.terminateCheckCounter + if curCounter != counter || listener.terminateCheckCounter.Load() != terminateCheckCounter { + return curCounter, listener.terminateCheckCounter.Load() } listener.tapNotifier.Wait() @@ -308,7 +309,7 @@ func (listener *changeListener) NewWaiter(keys []string, trackUnusedSequences bo listener: listener, keys: keys, lastCounter: listener.CurrentCount(keys), - lastTerminateCheckCounter: listener.terminateCheckCounter, + lastTerminateCheckCounter: listener.terminateCheckCounter.Load(), trackUnusedSequences: trackUnusedSequences, } }