diff --git a/go.mod b/go.mod index 723f4c8a79..0487064585 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( dario.cat/mergo v1.0.0 github.com/KimMachineGun/automemlimit v0.7.0 github.com/coreos/go-oidc/v3 v3.12.0 - github.com/couchbase/cbgt v1.3.9 + github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d github.com/couchbase/clog v0.1.0 github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd github.com/couchbase/gocb/v2 v2.9.1 diff --git a/go.sum b/go.sum index 46a3ab3770..1c26d492aa 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79 github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= github.com/couchbase/cbauth v0.1.11 h1:LLyGiVnsKxyHp9wbOQk87oF9eDUSh1in2vh/l6vaezg= github.com/couchbase/cbauth v0.1.11/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg= -github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE= -github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= +github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d h1:X80jy41uF1ivq513eSm+k+Vih+eSMHZKjQJ5JawMuRs= +github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY= github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4= github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd h1:ERQXaXuX1eix3NUqrxQ5VY0hqHH90vcfrWdbEWKzlEY= diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index b8a9329c62..e0d287619d 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -2316,6 +2316,127 @@ func TestImportRollback(t *testing.T) { } } +// TestImportRollbackMultiplePartitions: +// - Test is much like TestImportRollback, but with multiple partitions and multiple vBuckets rolling back +// - Test case rollbackWithoutFailover will only rollback one partition +func TestImportRollbackMultiplePartitions(t *testing.T) { + if !base.IsEnterpriseEdition() { + t.Skip("This test only works against EE") + } + + if base.UnitTestUrlIsWalrus() { + t.Skip("This test only works against Couchbase Server - needs cbgt and import checkpointing") + } + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP, base.KeyCluster) + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) + + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ImportPartitions: base.Uint16Ptr(2), + }, + }, + }) + + // create doc id's for vb0 and vb800 + vb0DocIDs := []string{"abbacomes", "abdicate", "accrescent", "aconitum", "acrux", "adduction", "affrication", "algraphy", "allantoinuria", "altiloquent"} + vb800DocIDs := []string{"abrook", "accept", "accompaniment", "acoemeti", "adiposeness", "alkyd", "alnage", "ambulance", "anasazi", "anhydroxime"} + + for _, v := range vb0DocIDs { + added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.True(t, added) + require.NoError(t, err) + } + for _, v := range vb800DocIDs { + added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.True(t, added) + require.NoError(t, err) + } + + // wait for docs to be imported + changes, err := rt.WaitForChanges(20, "/{{.keyspace}}/_changes?since=0", "", true) + require.NoError(t, err) + lastSeq := changes.Last_Seq.String() + + // Close db while we alter checkpoints to force rollback + db := rt.GetDatabase() + checkpointPrefix := rt.GetDatabase().MetadataKeys.DCPVersionedCheckpointPrefix(db.Options.GroupID, db.Options.ImportVersion) + rt.Close() + + metaStore := bucket.GetMetadataStore() + // fetch the checkpoint for the vBucket 0 and 800, modify the checkpoint values to a higher sequence to + // trigger rollback upon stream open request + checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, 0) + var checkpointData base.ShardedImportDCPMetadata + checkpointBytes, _, err := metaStore.GetRaw(checkpointKey) + require.NoError(t, err) + require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + checkpointData.SnapStart = 3000 + checkpointData.SnapStart + checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd + checkpointData.SeqStart = 3000 + checkpointData.SeqStart + checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd + existingVbUUID := checkpointData.FailOverLog[0][0] + checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}} + + updatedBytes, err := base.JSONMarshal(checkpointData) + require.NoError(t, err) + err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) + require.NoError(t, err) + + // vBucket 800 + checkpointKey = fmt.Sprintf("%s%d", checkpointPrefix, 800) + checkpointData = base.ShardedImportDCPMetadata{} + checkpointBytes, _, err = metaStore.GetRaw(checkpointKey) + require.NoError(t, err) + require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + checkpointData.SnapStart = 3000 + checkpointData.SnapStart + checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd + checkpointData.SeqStart = 3000 + checkpointData.SeqStart + checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd + existingVbUUID = checkpointData.FailOverLog[0][0] + checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}} + + updatedBytes, err = base.JSONMarshal(checkpointData) + require.NoError(t, err) + err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) + require.NoError(t, err) + + // Reopen the db, expect DCP rollback + rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ImportPartitions: base.Uint16Ptr(2), + }, + }, + }) + defer rt2.Close() + + for _, v := range vb0DocIDs { + err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + } + for _, v := range vb800DocIDs { + err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + } + + // Add doc to non rolled back vBucket (392) and assert its imported + added, err := rt2.GetSingleDataStore().AddRaw("someKey", 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + require.True(t, added) + + // wait for doc update to be imported + _, err = rt2.WaitForChanges(21, "/{{.keyspace}}/_changes?since="+lastSeq, "", true) + require.NoError(t, err) +} + func TestImportUpdateExpiry(t *testing.T) { testCases := []struct { name string