Skip to content

Commit

Permalink
Grab one lock in NewWaiter
Browse files Browse the repository at this point in the history
The lock was already used for listener.CurrentCount(keys), so mark
_terminateCheckCounter as needing a lock as well.
  • Loading branch information
torcolvin committed Jul 24, 2024
1 parent 9040048 commit 3c087cf
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions db/change_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

sgbucket "github.com/couchbase/sg-bucket"
Expand All @@ -28,28 +27,28 @@ import (
// A wrapper around a Bucket's TapFeed that allows any number of client goroutines to wait for
// changes.
type changeListener struct {
ctx context.Context
bucket base.Bucket
bucketName string // Used for logging
tapFeed base.TapFeed // Observes changes to bucket
tapNotifier *sync.Cond // Posts notifications when documents are updated
FeedArgs sgbucket.FeedArguments // The Tap Args (backfill, etc)
counter uint64 // Event counter; increments on every doc update
terminateCheckCounter atomic.Uint64 // Termination Event counter; increments on every notifyCheckForTermination
keyCounts map[string]uint64 // Latest count at which each doc key was updated
OnChangeCallback DocChangedFunc
terminator chan bool // Signal to cause DCP feed to exit
sgCfgPrefix string // SG config key prefix
started base.AtomicBool // whether the feed has been started
metaKeys *base.MetadataKeys // Metadata key formatter
ctx context.Context
bucket base.Bucket
bucketName string // Used for logging
tapFeed base.TapFeed // Observes changes to bucket
tapNotifier *sync.Cond // Posts notifications when documents are updated
FeedArgs sgbucket.FeedArguments // The Tap Args (backfill, etc)
counter uint64 // Event counter; increments on every doc update
_terminateCheckCounter uint64 // Termination Event counter; increments on every notifyCheckForTermination
keyCounts map[string]uint64 // Latest count at which each doc key was updated
OnChangeCallback DocChangedFunc
terminator chan bool // Signal to cause DCP feed to exit
sgCfgPrefix string // SG config key prefix
started base.AtomicBool // whether the feed has been started
metaKeys *base.MetadataKeys // Metadata key formatter
}

type DocChangedFunc func(event sgbucket.FeedEvent)

func (listener *changeListener) Init(name string, groupID string, metaKeys *base.MetadataKeys) {
listener.bucketName = name
listener.counter = 1
listener.terminateCheckCounter.Store(0)
listener._terminateCheckCounter = 0
listener.keyCounts = map[string]uint64{}
listener.tapNotifier = sync.NewCond(&sync.Mutex{})
listener.sgCfgPrefix = metaKeys.SGCfgPrefix(groupID)
Expand Down Expand Up @@ -234,10 +233,10 @@ func (listener *changeListener) NotifyCheckForTermination(ctx context.Context, k

// Increment terminateCheckCounter, but loop back to zero
//if we have reached maximum value for uint64 type
if listener.terminateCheckCounter.Load() < math.MaxUint64 {
listener.terminateCheckCounter.Add(1)
if listener._terminateCheckCounter < math.MaxUint64 {
listener._terminateCheckCounter++
} else {
listener.terminateCheckCounter.Store(0)
listener._terminateCheckCounter = 0
}

base.DebugfCtx(ctx, base.KeyChanges, "Notifying to check for _changes feed termination")
Expand All @@ -255,8 +254,8 @@ func (listener *changeListener) Wait(ctx context.Context, keys []string, counter
for {
curCounter := listener._currentCount(keys)

if curCounter != counter || listener.terminateCheckCounter.Load() != terminateCheckCounter {
return curCounter, listener.terminateCheckCounter.Load()
if curCounter != counter || listener._terminateCheckCounter != terminateCheckCounter {
return curCounter, listener._terminateCheckCounter
}

listener.tapNotifier.Wait()
Expand Down Expand Up @@ -305,11 +304,13 @@ type ChangeWaiter struct {

// NewWaiter a new ChangeWaiter that will wait for changes for the given document keys, and will optionally track unused sequences.
func (listener *changeListener) NewWaiter(keys []string, trackUnusedSequences bool) *ChangeWaiter {
listener.tapNotifier.L.Lock()
defer listener.tapNotifier.L.Unlock()
return &ChangeWaiter{
listener: listener,
keys: keys,
lastCounter: listener.CurrentCount(keys),
lastTerminateCheckCounter: listener.terminateCheckCounter.Load(),
lastCounter: listener._currentCount(keys),
lastTerminateCheckCounter: listener._terminateCheckCounter,
trackUnusedSequences: trackUnusedSequences,
}
}
Expand Down

0 comments on commit 3c087cf

Please sign in to comment.