Skip to content

Commit

Permalink
CBG-4309: update cbgt to address lack of janitor kick during rollback (
Browse files Browse the repository at this point in the history
…#7295)

* CBG-4309: add rollback hook for cbgt to call during rollback of pIndex

* remove temp test

* update cbgt to include rollback hook

* CBG-4309 add Rollback to to PIndexImplType

* Update cbgt fix to trigger janitor kick

---------

Co-authored-by: Tor Colvin <[email protected]>
  • Loading branch information
gregns1 and torcolvin authored Jan 29, 2025
1 parent be4c2de commit 7e89160
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
dario.cat/mergo v1.0.0
github.com/KimMachineGun/automemlimit v0.7.0
github.com/coreos/go-oidc/v3 v3.12.0
github.com/couchbase/cbgt v1.3.9
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d
github.com/couchbase/clog v0.1.0
github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd
github.com/couchbase/gocb/v2 v2.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79
github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU=
github.com/couchbase/cbauth v0.1.11 h1:LLyGiVnsKxyHp9wbOQk87oF9eDUSh1in2vh/l6vaezg=
github.com/couchbase/cbauth v0.1.11/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg=
github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE=
github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE=
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d h1:X80jy41uF1ivq513eSm+k+Vih+eSMHZKjQJ5JawMuRs=
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE=
github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY=
github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4=
github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd h1:ERQXaXuX1eix3NUqrxQ5VY0hqHH90vcfrWdbEWKzlEY=
Expand Down
121 changes: 121 additions & 0 deletions rest/importtest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,127 @@ func TestImportRollback(t *testing.T) {
}
}

// TestImportRollbackMultiplePartitions:
// - Test is much like TestImportRollback, but with multiple partitions and multiple vBuckets rolling back
// - Test case rollbackWithoutFailover will only rollback one partition
func TestImportRollbackMultiplePartitions(t *testing.T) {
if !base.IsEnterpriseEdition() {
t.Skip("This test only works against EE")
}

if base.UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server - needs cbgt and import checkpointing")
}

base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP, base.KeyCluster)
ctx := base.TestCtx(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(ctx)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
DatabaseConfig: &rest.DatabaseConfig{
DbConfig: rest.DbConfig{
ImportPartitions: base.Uint16Ptr(2),
},
},
})

// create doc id's for vb0 and vb800
vb0DocIDs := []string{"abbacomes", "abdicate", "accrescent", "aconitum", "acrux", "adduction", "affrication", "algraphy", "allantoinuria", "altiloquent"}
vb800DocIDs := []string{"abrook", "accept", "accompaniment", "acoemeti", "adiposeness", "alkyd", "alnage", "ambulance", "anasazi", "anhydroxime"}

for _, v := range vb0DocIDs {
added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.True(t, added)
require.NoError(t, err)
}
for _, v := range vb800DocIDs {
added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.True(t, added)
require.NoError(t, err)
}

// wait for docs to be imported
changes, err := rt.WaitForChanges(20, "/{{.keyspace}}/_changes?since=0", "", true)
require.NoError(t, err)
lastSeq := changes.Last_Seq.String()

// Close db while we alter checkpoints to force rollback
db := rt.GetDatabase()
checkpointPrefix := rt.GetDatabase().MetadataKeys.DCPVersionedCheckpointPrefix(db.Options.GroupID, db.Options.ImportVersion)
rt.Close()

metaStore := bucket.GetMetadataStore()
// fetch the checkpoint for the vBucket 0 and 800, modify the checkpoint values to a higher sequence to
// trigger rollback upon stream open request
checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, 0)
var checkpointData base.ShardedImportDCPMetadata
checkpointBytes, _, err := metaStore.GetRaw(checkpointKey)
require.NoError(t, err)
require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData))
checkpointData.SnapStart = 3000 + checkpointData.SnapStart
checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd
checkpointData.SeqStart = 3000 + checkpointData.SeqStart
checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd
existingVbUUID := checkpointData.FailOverLog[0][0]
checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}}

updatedBytes, err := base.JSONMarshal(checkpointData)
require.NoError(t, err)
err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes)
require.NoError(t, err)

// vBucket 800
checkpointKey = fmt.Sprintf("%s%d", checkpointPrefix, 800)
checkpointData = base.ShardedImportDCPMetadata{}
checkpointBytes, _, err = metaStore.GetRaw(checkpointKey)
require.NoError(t, err)
require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData))
checkpointData.SnapStart = 3000 + checkpointData.SnapStart
checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd
checkpointData.SeqStart = 3000 + checkpointData.SeqStart
checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd
existingVbUUID = checkpointData.FailOverLog[0][0]
checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}}

updatedBytes, err = base.JSONMarshal(checkpointData)
require.NoError(t, err)
err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes)
require.NoError(t, err)

// Reopen the db, expect DCP rollback
rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
DatabaseConfig: &rest.DatabaseConfig{
DbConfig: rest.DbConfig{
ImportPartitions: base.Uint16Ptr(2),
},
},
})
defer rt2.Close()

for _, v := range vb0DocIDs {
err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
}
for _, v := range vb800DocIDs {
err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
}

// Add doc to non rolled back vBucket (392) and assert its imported
added, err := rt2.GetSingleDataStore().AddRaw("someKey", 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
require.True(t, added)

// wait for doc update to be imported
_, err = rt2.WaitForChanges(21, "/{{.keyspace}}/_changes?since="+lastSeq, "", true)
require.NoError(t, err)
}

func TestImportUpdateExpiry(t *testing.T) {
testCases := []struct {
name string
Expand Down

0 comments on commit 7e89160

Please sign in to comment.