Skip to content

Commit

Permalink
CBG-4108 use an atomic to avoid data race
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Jul 24, 2024
1 parent 37b0a2e commit 9040048
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions db/change_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

sgbucket "github.com/couchbase/sg-bucket"
Expand All @@ -34,7 +35,7 @@ type changeListener struct {
tapNotifier *sync.Cond // Posts notifications when documents are updated
FeedArgs sgbucket.FeedArguments // The Tap Args (backfill, etc)
counter uint64 // Event counter; increments on every doc update
terminateCheckCounter uint64 // Termination Event counter; increments on every notifyCheckForTermination
terminateCheckCounter atomic.Uint64 // Termination Event counter; increments on every notifyCheckForTermination
keyCounts map[string]uint64 // Latest count at which each doc key was updated
OnChangeCallback DocChangedFunc
terminator chan bool // Signal to cause DCP feed to exit
Expand All @@ -48,7 +49,7 @@ type DocChangedFunc func(event sgbucket.FeedEvent)
func (listener *changeListener) Init(name string, groupID string, metaKeys *base.MetadataKeys) {
listener.bucketName = name
listener.counter = 1
listener.terminateCheckCounter = 0
listener.terminateCheckCounter.Store(0)
listener.keyCounts = map[string]uint64{}
listener.tapNotifier = sync.NewCond(&sync.Mutex{})
listener.sgCfgPrefix = metaKeys.SGCfgPrefix(groupID)
Expand Down Expand Up @@ -233,10 +234,10 @@ func (listener *changeListener) NotifyCheckForTermination(ctx context.Context, k

// Increment terminateCheckCounter, but loop back to zero
//if we have reached maximum value for uint64 type
if listener.terminateCheckCounter < math.MaxUint64 {
listener.terminateCheckCounter++
if listener.terminateCheckCounter.Load() < math.MaxUint64 {
listener.terminateCheckCounter.Add(1)
} else {
listener.terminateCheckCounter = 0
listener.terminateCheckCounter.Store(0)
}

base.DebugfCtx(ctx, base.KeyChanges, "Notifying to check for _changes feed termination")
Expand All @@ -254,8 +255,8 @@ func (listener *changeListener) Wait(ctx context.Context, keys []string, counter
for {
curCounter := listener._currentCount(keys)

if curCounter != counter || listener.terminateCheckCounter != terminateCheckCounter {
return curCounter, listener.terminateCheckCounter
if curCounter != counter || listener.terminateCheckCounter.Load() != terminateCheckCounter {
return curCounter, listener.terminateCheckCounter.Load()
}

listener.tapNotifier.Wait()
Expand Down Expand Up @@ -308,7 +309,7 @@ func (listener *changeListener) NewWaiter(keys []string, trackUnusedSequences bo
listener: listener,
keys: keys,
lastCounter: listener.CurrentCount(keys),
lastTerminateCheckCounter: listener.terminateCheckCounter,
lastTerminateCheckCounter: listener.terminateCheckCounter.Load(),
trackUnusedSequences: trackUnusedSequences,
}
}
Expand Down

0 comments on commit 9040048

Please sign in to comment.