Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4067: do not release doc sequence upon timeout error #7016

Merged
merged 5 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ func IsDocNotFoundError(err error) bool {
}
}

// IsTemporaryKvError returns true if a kv operation has an error that is likely to be ephemeral. This represents
// situations where Couchbase Server is under load and would be expected to return a success or failure in a future call.
func IsTemporaryKvError(err error) bool {
if err == nil {
return false
}
// define list of temporary errors
temporaryKVError := []error{ErrTimeout, gocb.ErrAmbiguousTimeout, gocb.ErrUnambiguousTimeout,
gocb.ErrOverload, gocb.ErrTemporaryFailure, gocb.ErrCircuitBreakerOpen}

// iterate through to check incoming error is one of them
for _, tempKVErr := range temporaryKVError {
if errors.Is(err, tempKVErr) {
return true
}
}

return false
}

func IsXattrNotFoundError(err error) bool {
if unwrappedErr := pkgerrors.Cause(err); unwrappedErr == nil {
return false
Expand Down
2 changes: 2 additions & 0 deletions base/leaky_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type LeakyBucketConfig struct {

ForceErrorSetRawKeys []string // Issuing a SetRaw call with a specified key will return an error

ForceTimeoutErrorOnUpdateKeys []string // Specified keys will return timeout error AFTER write is sent to server
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting because this definitely means the write occurred. However, in the case of this ticket, we don't know whether the write went through or not? Is there a reason to actually have this error after it writes and not before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want this behaviour as this is what the customer saw. See ticket linked to this for more info but they had write, server under load and operation times out, but write eventually succeeds but we've timeout and release the document sequence. Then then the successful write is ignored at cache due to duplicate sequence with the released sequence.

It certainly seems odd to do it this way but its the only way I could think of mocking/forcing this situation reliably in a test. We cannot be sure when we hit timeout that the write will eventually go through or not but the changes this PR makes will protect against docs being missed in replications if the write does succeed.


// Returns a partial error the first time ViewCustom is called
FirstTimeViewCustomPartialError bool
PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called
Expand Down
13 changes: 11 additions & 2 deletions base/leaky_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"time"

sgbucket "github.com/couchbase/sg-bucket"
Expand Down Expand Up @@ -153,7 +154,11 @@ func (lds *LeakyDataStore) Update(k string, exp uint32, callback sgbucket.Update
lds.config.UpdateCallback(k)
return updated, expiry, isDelete, err
}
return lds.dataStore.Update(k, exp, wrapperCallback)
casOut, err = lds.dataStore.Update(k, exp, wrapperCallback)
if slices.Contains(lds.config.ForceTimeoutErrorOnUpdateKeys, k) {
return 0, ErrTimeout
}
return casOut, err
}

casOut, err = lds.dataStore.Update(k, exp, callback)
Expand Down Expand Up @@ -267,7 +272,11 @@ func (lds *LeakyDataStore) WriteUpdateWithXattrs(ctx context.Context, k string,
lds.config.UpdateCallback(k)
return updatedDoc, err
}
return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback)
casOut, err = lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback)
if slices.Contains(lds.config.ForceTimeoutErrorOnUpdateKeys, k) {
return 0, ErrTimeout
}
return casOut, err
}
return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback)
}
Expand Down
18 changes: 10 additions & 8 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2096,15 +2096,17 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

// If the WriteUpdate didn't succeed, check whether there are unused, allocated sequences that need to be accounted for
if err != nil {
if docSequence > 0 {
if seqErr := db.sequences().releaseSequence(ctx, docSequence); seqErr != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, seqErr)
}
if !base.IsTemporaryKvError(err) {
if docSequence > 0 {
if seqErr := db.sequences().releaseSequence(ctx, docSequence); seqErr != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, seqErr)
}

}
for _, sequence := range unusedSequences {
if seqErr := db.sequences().releaseSequence(ctx, sequence); seqErr != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", sequence, seqErr)
}
for _, sequence := range unusedSequences {
if seqErr := db.sequences().releaseSequence(ctx, sequence); seqErr != nil {
base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", sequence, seqErr)
}
}
}
}
Expand Down
85 changes: 85 additions & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,3 +1680,88 @@ func TestAssignSequenceReleaseLoop(t *testing.T) {
releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount
assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount)
}

// TestReleaseSequenceOnDocWrite:
// - Define a leaky bucket callback for a conflicting write + define key to return a timeout error for
// - Setup db with leaky bucket config
// - Init a channel cache by calling changes
// - Write a doc that will return timeout error but will successfully persist
// - Wait for it to arrive at change cache
// - Assert we don't release a sequence for it + we have it in changes cache
// - Write new doc with conflict error
// - Assert we release a sequence for this
func TestReleaseSequenceOnDocWriteFailure(t *testing.T) {
defer SuspendSequenceBatching()()
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)

var ctx context.Context
var db *Database
var forceDocConflict bool

const (
conflictDoc = "doc1"
timeoutDoc = "doc"
)

// call back to create a conflict mid write and force a non timeout error upon write attempt
writeUpdateCallback := func(key string) {
if key == conflictDoc && forceDocConflict {
forceDocConflict = false
body := Body{"test": "doc"}
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
_, _, err := collection.Put(ctx, conflictDoc, body)
require.NoError(t, err)
}
}

callbackConfig := base.LeakyBucketConfig{
UpdateCallback: writeUpdateCallback,
ForceTimeoutErrorOnUpdateKeys: []string{timeoutDoc},
}

db, ctx = setupTestLeakyDBWithCacheOptions(t, DefaultCacheOptions(), callbackConfig)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

// init channel cache, this will make changes call after timeout doc is written below fail pre changes made in CBG-4067,
// due to duplicate sequence at the cache with an unused sequence. See steps in ticket CBG-4067 as example.
_, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t))
require.NoError(t, err)

assert.Equal(t, int64(0), db.DbStats.Database().SequenceReleasedCount.Value())

// write doc that will return timeout but will actually be persisted successfully on server
// this mimics what was seen before
_, _, err = collection.Put(ctx, timeoutDoc, Body{"test": "doc"})
require.Error(t, err)

// wait for changes
require.NoError(t, collection.WaitForPendingChanges(ctx))

// assert that no sequences were released + a sequence was cached
require.EventuallyWithT(t, func(c *assert.CollectT) {
db.UpdateCalculatedStats(ctx)
assert.Equal(t, int64(0), db.DbStats.Database().SequenceReleasedCount.Value())
assert.Equal(t, int64(1), db.DbStats.Cache().HighSeqCached.Value())
}, time.Second*10, time.Millisecond*100)

// get cached changes + assert the document is present
changes, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t))
require.NoError(t, err)
require.Len(t, changes, 1)
assert.Equal(t, timeoutDoc, changes[0].ID)

// write doc that will have a conflict error, we should expect the document sequence to be released
forceDocConflict = true
_, _, err = collection.Put(ctx, conflictDoc, Body{"test": "doc"})
require.Error(t, err)

// wait for changes
require.NoError(t, collection.WaitForPendingChanges(ctx))

// assert that a sequence was released after the above write error
require.EventuallyWithT(t, func(c *assert.CollectT) {
db.UpdateCalculatedStats(ctx)
assert.Equal(t, int64(1), db.DbStats.Database().SequenceReleasedCount.Value())
}, time.Second*10, time.Millisecond*100)
}
Loading