From 86b3285c63ec6a6659c3aa710a7eba2669ef3dd8 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Fri, 26 Jul 2024 10:24:49 +0100 Subject: [PATCH 1/5] CBG-4067: do not release doc sequence upon timeout error --- base/error.go | 16 ++++++++ base/leaky_bucket.go | 2 + base/leaky_datastore.go | 8 +++- db/crud.go | 18 +++++---- db/crud_test.go | 85 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 9 deletions(-) diff --git a/base/error.go b/base/error.go index 1d36905357..9b5c8ff472 100644 --- a/base/error.go +++ b/base/error.go @@ -237,6 +237,22 @@ func IsDocNotFoundError(err error) bool { } } +func IsTimeoutError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, ErrTimeout) { + return true + } + if errors.Is(err, gocb.ErrAmbiguousTimeout) { + return true + } + if errors.Is(err, gocb.ErrUnambiguousTimeout) { + return true + } + return false +} + func IsXattrNotFoundError(err error) bool { if unwrappedErr := pkgerrors.Cause(err); unwrappedErr == nil { return false diff --git a/base/leaky_bucket.go b/base/leaky_bucket.go index ade1d07a1b..d5042b3b83 100644 --- a/base/leaky_bucket.go +++ b/base/leaky_bucket.go @@ -121,6 +121,8 @@ type LeakyBucketConfig struct { ForceErrorSetRawKeys []string // Issuing a SetRaw call with a specified key will return an error + ForceTimeoutErrorOnUpdateKeys []string // Specified keys will return timeout error AFTER write is sent to server + // Returns a partial error the first time ViewCustom is called FirstTimeViewCustomPartialError bool PostQueryCallback func(ddoc, viewName string, params map[string]interface{}) // Issues callback after issuing query when bucket.ViewQuery is called diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index 968266d155..fbc7732f6f 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -267,7 +267,13 @@ func (lds *LeakyDataStore) WriteUpdateWithXattrs(ctx context.Context, k string, lds.config.UpdateCallback(k) return updatedDoc, err } - return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback) + casOut, err = lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback) + for _, errorKey := range lds.config.ForceTimeoutErrorOnUpdateKeys { + if k == errorKey { + return 0, ErrTimeout + } + } + return casOut, err } return lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) } diff --git a/db/crud.go b/db/crud.go index 06a1c6c1d0..16082db332 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2096,15 +2096,17 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // If the WriteUpdate didn't succeed, check whether there are unused, allocated sequences that need to be accounted for if err != nil { - if docSequence > 0 { - if seqErr := db.sequences().releaseSequence(ctx, docSequence); seqErr != nil { - base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, seqErr) - } + if !base.IsTimeoutError(err) { + if docSequence > 0 { + if seqErr := db.sequences().releaseSequence(ctx, docSequence); seqErr != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, seqErr) + } - } - for _, sequence := range unusedSequences { - if seqErr := db.sequences().releaseSequence(ctx, sequence); seqErr != nil { - base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", sequence, seqErr) + } + for _, sequence := range unusedSequences { + if seqErr := db.sequences().releaseSequence(ctx, sequence); seqErr != nil { + base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", sequence, seqErr) + } } } } diff --git a/db/crud_test.go b/db/crud_test.go index 2a6d7f5fe6..77236b6e80 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1680,3 +1680,88 @@ func TestAssignSequenceReleaseLoop(t *testing.T) { releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount) } + +// TestReleaseSequenceOnDocWrite: +// - Define a leaky bucket callback for a conflicting write + define key to return a timeout error for +// - Setup db with leaky bucket config +// - Init a channel cache by calling changes +// - Write a doc that will return timeout error but will successfully persist +// - Wait for it to arrive at change cache +// - Assert we don;t release a sequence for it + we hav eit in changes +// - Write new doc with conflict error +// - Assert we release a sequence for this +func TestReleaseSequenceOnDocWrite(t *testing.T) { + defer SuspendSequenceBatching()() + base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) + + var ctx context.Context + var db *Database + var enable bool + + const ( + conflictDoc = "doc1" + timeoutDoc = "doc" + ) + + // call back to create a conflict mid write and force a non timeout error upon write attempt + writeUpdateCallback := func(key string) { + if key == "doc1" && enable { + enable = false + body := Body{"test": "doc"} + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + _, _, err := collection.Put(ctx, conflictDoc, body) + assert.NoError(t, err) + } + } + + // Use leaky bucket to inject callback in query invocation + callbackConfig := base.LeakyBucketConfig{ + UpdateCallback: writeUpdateCallback, + ForceTimeoutErrorOnUpdateKeys: []string{timeoutDoc}, + } + + db, ctx = setupTestLeakyDBWithCacheOptions(t, DefaultCacheOptions(), callbackConfig) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + // init channel cache + _, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) + require.NoError(t, err) + + assert.Equal(t, int64(0), db.DbStats.Database().SequenceReleasedCount.Value()) + + // write doc that will return timeout but will actually be persisted successfully on server + // this mimics what was seen in CBSE-17458 + _, _, err = collection.Put(ctx, timeoutDoc, Body{"test": "doc"}) + require.Error(t, err) + + // wait for changes + require.NoError(t, collection.WaitForPendingChanges(ctx)) + + // assert that no sequences were released + a sequence was cached + require.EventuallyWithT(t, func(c *assert.CollectT) { + db.UpdateCalculatedStats(ctx) + assert.Equal(t, int64(0), db.DbStats.Database().SequenceReleasedCount.Value()) + assert.Equal(t, int64(1), db.DbStats.Cache().HighSeqCached.Value()) + }, time.Second*10, time.Millisecond*100) + + // get cached changes + assert the document is present + changes, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) + require.NoError(t, err) + assert.Len(t, changes, 1) + assert.Equal(t, timeoutDoc, changes[0].ID) + + // write doc that will have a conflict error, we should expect the document sequence to be released + enable = true + _, _, err = collection.Put(ctx, conflictDoc, Body{"test": "doc"}) + require.Error(t, err) + + // wait for changes + require.NoError(t, collection.WaitForPendingChanges(ctx)) + + // assert that a sequence was released after the above write error + require.EventuallyWithT(t, func(c *assert.CollectT) { + db.UpdateCalculatedStats(ctx) + assert.Equal(t, int64(1), db.DbStats.Database().SequenceReleasedCount.Value()) + }, time.Second*10, time.Millisecond*100) +} From 22dc0a431e83b30c087b58a8c921e99821371049 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Fri, 26 Jul 2024 10:32:19 +0100 Subject: [PATCH 2/5] test cleanup + require length 1 to avoid test panic --- db/crud_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/crud_test.go b/db/crud_test.go index 77236b6e80..e1114a6b44 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1714,7 +1714,6 @@ func TestReleaseSequenceOnDocWrite(t *testing.T) { } } - // Use leaky bucket to inject callback in query invocation callbackConfig := base.LeakyBucketConfig{ UpdateCallback: writeUpdateCallback, ForceTimeoutErrorOnUpdateKeys: []string{timeoutDoc}, @@ -1748,7 +1747,7 @@ func TestReleaseSequenceOnDocWrite(t *testing.T) { // get cached changes + assert the document is present changes, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) require.NoError(t, err) - assert.Len(t, changes, 1) + require.Len(t, changes, 1) assert.Equal(t, timeoutDoc, changes[0].ID) // write doc that will have a conflict error, we should expect the document sequence to be released From 8159aac4071dc0731696685df2bfd3c6b24ca815 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Fri, 26 Jul 2024 10:51:20 +0100 Subject: [PATCH 3/5] update test name + make test pass with non xattrs --- base/leaky_datastore.go | 8 +++++++- db/crud_test.go | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index fbc7732f6f..0ab95e603b 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -153,7 +153,13 @@ func (lds *LeakyDataStore) Update(k string, exp uint32, callback sgbucket.Update lds.config.UpdateCallback(k) return updated, expiry, isDelete, err } - return lds.dataStore.Update(k, exp, wrapperCallback) + casOut, err = lds.dataStore.Update(k, exp, wrapperCallback) + for _, errorKey := range lds.config.ForceTimeoutErrorOnUpdateKeys { + if k == errorKey { + return 0, ErrTimeout + } + } + return casOut, err } casOut, err = lds.dataStore.Update(k, exp, callback) diff --git a/db/crud_test.go b/db/crud_test.go index e1114a6b44..5d8e2c6930 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1690,7 +1690,7 @@ func TestAssignSequenceReleaseLoop(t *testing.T) { // - Assert we don;t release a sequence for it + we hav eit in changes // - Write new doc with conflict error // - Assert we release a sequence for this -func TestReleaseSequenceOnDocWrite(t *testing.T) { +func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { defer SuspendSequenceBatching()() base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) From 59120a3b605c046eed0aaea8141887cbd3581108 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Fri, 26 Jul 2024 16:26:57 +0100 Subject: [PATCH 4/5] change from timeout to temporary failures --- base/error.go | 12 +++++++++++- db/crud.go | 2 +- db/crud_test.go | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/base/error.go b/base/error.go index 9b5c8ff472..8ecf7af2a6 100644 --- a/base/error.go +++ b/base/error.go @@ -237,7 +237,7 @@ func IsDocNotFoundError(err error) bool { } } -func IsTimeoutError(err error) bool { +func IsTemporaryKvError(err error) bool { if err == nil { return false } @@ -250,6 +250,16 @@ func IsTimeoutError(err error) bool { if errors.Is(err, gocb.ErrUnambiguousTimeout) { return true } + if errors.Is(err, gocb.ErrOverload) { + return true + } + if errors.Is(err, gocb.ErrTemporaryFailure) { + return true + } + if errors.Is(err, gocb.ErrCircuitBreakerOpen) { + return true + } + return false } diff --git a/db/crud.go b/db/crud.go index 16082db332..2cc5c8ce1d 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2096,7 +2096,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // If the WriteUpdate didn't succeed, check whether there are unused, allocated sequences that need to be accounted for if err != nil { - if !base.IsTimeoutError(err) { + if !base.IsTemporaryKvError(err) { if docSequence > 0 { if seqErr := db.sequences().releaseSequence(ctx, docSequence); seqErr != nil { base.WarnfCtx(ctx, "Error returned when releasing sequence %d. Falling back to skipped sequence handling. Error:%v", docSequence, seqErr) diff --git a/db/crud_test.go b/db/crud_test.go index 5d8e2c6930..fad6f2bb36 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1730,7 +1730,7 @@ func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { assert.Equal(t, int64(0), db.DbStats.Database().SequenceReleasedCount.Value()) // write doc that will return timeout but will actually be persisted successfully on server - // this mimics what was seen in CBSE-17458 + // this mimics what was seen before _, _, err = collection.Put(ctx, timeoutDoc, Body{"test": "doc"}) require.Error(t, err) From d0fb8491e26b6f6800c74ebb8966b250dada8c7b Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Mon, 29 Jul 2024 10:37:21 +0100 Subject: [PATCH 5/5] updates off review --- base/error.go | 28 +++++++++++----------------- base/leaky_datastore.go | 13 +++++-------- db/crud_test.go | 15 ++++++++------- 3 files changed, 24 insertions(+), 32 deletions(-) diff --git a/base/error.go b/base/error.go index 8ecf7af2a6..cddf0a72fa 100644 --- a/base/error.go +++ b/base/error.go @@ -237,27 +237,21 @@ func IsDocNotFoundError(err error) bool { } } +// IsTemporaryKvError returns true if a kv operation has an error that is likely to be ephemeral. This represents +// situations where Couchbase Server is under load and would be expected to return a success or failure in a future call. func IsTemporaryKvError(err error) bool { if err == nil { return false } - if errors.Is(err, ErrTimeout) { - return true - } - if errors.Is(err, gocb.ErrAmbiguousTimeout) { - return true - } - if errors.Is(err, gocb.ErrUnambiguousTimeout) { - return true - } - if errors.Is(err, gocb.ErrOverload) { - return true - } - if errors.Is(err, gocb.ErrTemporaryFailure) { - return true - } - if errors.Is(err, gocb.ErrCircuitBreakerOpen) { - return true + // define list of temporary errors + temporaryKVError := []error{ErrTimeout, gocb.ErrAmbiguousTimeout, gocb.ErrUnambiguousTimeout, + gocb.ErrOverload, gocb.ErrTemporaryFailure, gocb.ErrCircuitBreakerOpen} + + // iterate through to check incoming error is one of them + for _, tempKVErr := range temporaryKVError { + if errors.Is(err, tempKVErr) { + return true + } } return false diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index 0ab95e603b..ef21118b40 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -12,6 +12,7 @@ import ( "context" "errors" "fmt" + "slices" "time" sgbucket "github.com/couchbase/sg-bucket" @@ -154,10 +155,8 @@ func (lds *LeakyDataStore) Update(k string, exp uint32, callback sgbucket.Update return updated, expiry, isDelete, err } casOut, err = lds.dataStore.Update(k, exp, wrapperCallback) - for _, errorKey := range lds.config.ForceTimeoutErrorOnUpdateKeys { - if k == errorKey { - return 0, ErrTimeout - } + if slices.Contains(lds.config.ForceTimeoutErrorOnUpdateKeys, k) { + return 0, ErrTimeout } return casOut, err } @@ -274,10 +273,8 @@ func (lds *LeakyDataStore) WriteUpdateWithXattrs(ctx context.Context, k string, return updatedDoc, err } casOut, err = lds.dataStore.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, wrapperCallback) - for _, errorKey := range lds.config.ForceTimeoutErrorOnUpdateKeys { - if k == errorKey { - return 0, ErrTimeout - } + if slices.Contains(lds.config.ForceTimeoutErrorOnUpdateKeys, k) { + return 0, ErrTimeout } return casOut, err } diff --git a/db/crud_test.go b/db/crud_test.go index fad6f2bb36..191a690060 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1687,7 +1687,7 @@ func TestAssignSequenceReleaseLoop(t *testing.T) { // - Init a channel cache by calling changes // - Write a doc that will return timeout error but will successfully persist // - Wait for it to arrive at change cache -// - Assert we don;t release a sequence for it + we hav eit in changes +// - Assert we don't release a sequence for it + we have it in changes cache // - Write new doc with conflict error // - Assert we release a sequence for this func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { @@ -1696,7 +1696,7 @@ func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { var ctx context.Context var db *Database - var enable bool + var forceDocConflict bool const ( conflictDoc = "doc1" @@ -1705,12 +1705,12 @@ func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { // call back to create a conflict mid write and force a non timeout error upon write attempt writeUpdateCallback := func(key string) { - if key == "doc1" && enable { - enable = false + if key == conflictDoc && forceDocConflict { + forceDocConflict = false body := Body{"test": "doc"} collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) _, _, err := collection.Put(ctx, conflictDoc, body) - assert.NoError(t, err) + require.NoError(t, err) } } @@ -1723,7 +1723,8 @@ func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) - // init channel cache + // init channel cache, this will make changes call after timeout doc is written below fail pre changes made in CBG-4067, + // due to duplicate sequence at the cache with an unused sequence. See steps in ticket CBG-4067 as example. _, err := collection.GetChanges(ctx, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) require.NoError(t, err) @@ -1751,7 +1752,7 @@ func TestReleaseSequenceOnDocWriteFailure(t *testing.T) { assert.Equal(t, timeoutDoc, changes[0].ID) // write doc that will have a conflict error, we should expect the document sequence to be released - enable = true + forceDocConflict = true _, _, err = collection.Put(ctx, conflictDoc, Body{"test": "doc"}) require.Error(t, err)