diff --git a/base/error.go b/base/error.go index 1d36905357..cddf0a72fa 100644 --- a/base/error.go +++ b/base/error.go @@ -237,6 +237,26 @@ func IsDocNotFoundError(err error) bool { } } +// IsTemporaryKvError returns true if a kv operation has an error that is likely to be ephemeral. This represents +// situations where Couchbase Server is under load and would be expected to return a success or failure in a future call. +func IsTemporaryKvError(err error) bool { + if err == nil { + return false + } + // define list of temporary errors + temporaryKVError := []error{ErrTimeout, gocb.ErrAmbiguousTimeout, gocb.ErrUnambiguousTimeout, + gocb.ErrOverload, gocb.ErrTemporaryFailure, gocb.ErrCircuitBreakerOpen} + + // iterate through to check incoming error is one of them + for _, tempKVErr := range temporaryKVError { + if errors.Is(err, tempKVErr) { + return true + } + } + + return false +} + func IsXattrNotFoundError(err error) bool { if unwrappedErr := pkgerrors.Cause(err); unwrappedErr == nil { return false diff --git a/base/leaky_bucket.go b/base/leaky_bucket.go index ade1d07a1b..d5042b3b83 100644 --- a/base/leaky_bucket.go +++ b/base/leaky_bucket.go @@ -121,6 +121,8 @@ type LeakyBucketConfig struct { ForceErrorSetRawKeys []string // Issuing a SetRaw call with a specified key will return an error + ForceTimeoutErrorOnUpdateKeys []string // Specified keys will return timeout error AFTER write is sent to server + // Returns a partial error the first time ViewCustom is called FirstTimeViewCustomPartialError bool PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index 968266d155..ef21118b40 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -12,6 +12,7 @@ import ( "context" "errors" "fmt" + "slices" "time" sgbucket "github.com/couchbase/sg-bucket" @@ -153,7 +154,11 @@ func (lds *LeakyDataStore) Update(k string, exp uint32, callback sgbucket.Update lds.config.UpdateCallback(k) return updated, expiry, isDelete, err } - return lds.dataStore.Update(k, exp, wrapperCallback) + casOut, err = lds.dataStore.Update(k, exp, wrapperCallback) + if slices.Contains(lds.config.ForceTimeoutErrorOnUpdateKeys, k) { + return 0, ErrTimeout + } + return casOut, err } casOut, err = lds.dataStore.Update(k, exp, callback) @@ -267,7 +272,11 @@ func (lds *LeakyDataStore) WriteUpdateWithXattrs(ctx context.Context, k string, lds.config.UpdateCallback(k) return updatedDoc, err } - return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback) + casOut, err = lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback) + if slices.Contains(lds.config.ForceTimeoutErrorOnUpdateKeys, k) { + return 0, ErrTimeout + } + return casOut, err } return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) } diff --git a/db/crud.go b/db/crud.go index 06a1c6c1d0..2cc5c8ce1d 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2096,15 +2096,17 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // If the WriteUpdate didn't succeed, check whether there are unused, allocated sequences that need to be accounted for if err != nil { - if docSequence > 0 { - if seqErr := db.sequences().releaseSequence(ctx, docSequence); seqErr != nil { - base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, seqErr) - } + if !base.IsTemporaryKvError(err) { + if docSequence > 0 { + if seqErr := db.sequences().releaseSequence(ctx, docSequence); seqErr != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, seqErr) + } - } - for _, sequence := range unusedSequences { - if seqErr := db.sequences().releaseSequence(ctx, sequence); seqErr != nil { - base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", sequence, seqErr) + } + for _, sequence := range unusedSequences { + if seqErr := db.sequences().releaseSequence(ctx, sequence); seqErr != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", sequence, seqErr) + } } } } diff --git a/db/crud_test.go b/db/crud_test.go index 2a6d7f5fe6..191a690060 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1680,3 +1680,88 @@ func TestAssignSequenceReleaseLoop(t *testing.T) { releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount) } + +// TestReleaseSequenceOnDocWrite: +// - Define a leaky bucket callback for a conflicting write + define key to return a timeout error for +// - Setup db with leaky bucket config +// - Init a channel cache by calling changes +// - Write a doc that will return timeout error but will successfully persist +// - Wait for it to arrive at change cache +// - Assert we don't release a sequence for it + we have it in changes cache +// - Write new doc with conflict error +// - Assert we release a sequence for this +func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { + defer SuspendSequenceBatching()() + base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) + + var ctx context.Context + var db *Database + var forceDocConflict bool + + const ( + conflictDoc = "doc1" + timeoutDoc = "doc" + ) + + // call back to create a conflict mid write and force a non timeout error upon write attempt + writeUpdateCallback := func(key string) { + if key == conflictDoc && forceDocConflict { + forceDocConflict = false + body := Body{"test": "doc"} + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + _, _, err := collection.Put(ctx, conflictDoc, body) + require.NoError(t, err) + } + } + + callbackConfig := base.LeakyBucketConfig{ + UpdateCallback: writeUpdateCallback, + ForceTimeoutErrorOnUpdateKeys: []string{timeoutDoc}, + } + + db, ctx = setupTestLeakyDBWithCacheOptions(t, DefaultCacheOptions(), callbackConfig) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + // init channel cache, this will make changes call after timeout doc is written below fail pre changes made in CBG-4067, + // due to duplicate sequence at the cache with an unused sequence. See steps in ticket CBG-4067 as example. + _, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) + require.NoError(t, err) + + assert.Equal(t, int64(0), db.DbStats.Database().SequenceReleasedCount.Value()) + + // write doc that will return timeout but will actually be persisted successfully on server + // this mimics what was seen before + _, _, err = collection.Put(ctx, timeoutDoc, Body{"test": "doc"}) + require.Error(t, err) + + // wait for changes + require.NoError(t, collection.WaitForPendingChanges(ctx)) + + // assert that no sequences were released + a sequence was cached + require.EventuallyWithT(t, func(c *assert.CollectT) { + db.UpdateCalculatedStats(ctx) + assert.Equal(t, int64(0), db.DbStats.Database().SequenceReleasedCount.Value()) + assert.Equal(t, int64(1), db.DbStats.Cache().HighSeqCached.Value()) + }, time.Second*10, time.Millisecond*100) + + // get cached changes + assert the document is present + changes, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) + require.NoError(t, err) + require.Len(t, changes, 1) + assert.Equal(t, timeoutDoc, changes[0].ID) + + // write doc that will have a conflict error, we should expect the document sequence to be released + forceDocConflict = true + _, _, err = collection.Put(ctx, conflictDoc, Body{"test": "doc"}) + require.Error(t, err) + + // wait for changes + require.NoError(t, collection.WaitForPendingChanges(ctx)) + + // assert that a sequence was released after the above write error + require.EventuallyWithT(t, func(c *assert.CollectT) { + db.UpdateCalculatedStats(ctx) + assert.Equal(t, int64(1), db.DbStats.Database().SequenceReleasedCount.Value()) + }, time.Second*10, time.Millisecond*100) +}