Skip to content

Commit

Permalink
CBG-3947 Do not remove loaded database on transient fetch error
Browse files Browse the repository at this point in the history
For transient fetch errors (recoverable read errors), leave the database intact to avoid database unload/reload churn during transient connectivity issues with server.

Required changing the pre-delete validation check on the config  from _fetchDatabase (which scans all buckets) to GetConfig (which targets the known bucket), as _fetchDatabase can't bubble up the transient error for a particular bucket.

Added a test to ensure the actual delete behaviour still works as expected (TestConfigPollingRemoveDatabase), and performed some manual testing to simulate the transient error (see Force timeouts notes in that test) - this will be required until we implement "leaky bootstrap" test support.
  • Loading branch information
adamcfraser committed Aug 2, 2024
1 parent 560c65e commit 77ab7c5
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 50 deletions.
19 changes: 19 additions & 0 deletions base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,25 @@ func IsXattrNotFoundError(err error) bool {
return false
}

func IsRecoverableReadError(err error) bool {

if err == nil {
return false
}
// SG timeout error
if errors.Is(err, ErrTimeout) {
return true
}
// gocb timeouts or transient errors
if errors.Is(err, gocb.ErrTemporaryFailure) ||
errors.Is(err, gocb.ErrOverload) ||
errors.Is(err, gocb.ErrTimeout) ||
errors.Is(err, gocb.ErrCircuitBreakerOpen) {
return true
}
return false
}

// MultiError manages a set of errors. Callers must use ErrorOrNil when returning MultiError to callers
// in order to properly handle nil checks on the returned MultiError
// Sample usage:
Expand Down
85 changes: 85 additions & 0 deletions rest/adminapitest/admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,91 @@ func TestMultipleBucketWithBadDbConfigScenario3(t *testing.T) {

}

// TestConfigPollingRemoveDatabase:
//
// Validates db is removed when polling detects that the config is not found
func TestConfigPollingRemoveDatabase(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyConfig)
testCases := []struct {
useXattrConfig bool
}{
{
useXattrConfig: false,
},
{
useXattrConfig: true,
},
}
for _, testCase := range testCases {
t.Run(fmt.Sprintf("xattrConfig_%v", testCase.useXattrConfig), func(t *testing.T) {

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: base.GetTestBucket(t),
PersistentConfig: true,
MutateStartupConfig: func(config *rest.StartupConfig) {
// configure the interval time to pick up new configs from the bucket to every 50 milliseconds
config.Bootstrap.ConfigUpdateFrequency = base.NewConfigDuration(50 * time.Millisecond)
},
DatabaseConfig: nil,
UseXattrConfig: testCase.useXattrConfig,
})
defer rt.Close()

ctx := base.TestCtx(t)
// create a new db
dbName := "db1"
dbConfig := rt.NewDbConfig()
dbConfig.Name = dbName
dbConfig.BucketConfig.Bucket = base.StringPtr(rt.CustomTestBucket.GetName())
resp := rt.CreateDatabase(dbName, dbConfig)
rest.RequireStatus(t, resp, http.StatusCreated)

// Validate that db is loaded
_, err := rt.ServerContext().GetDatabase(ctx, dbName)
require.NoError(t, err)

// Force timeouts - dev-time only test enhancement to validate CBG-3947, requires manual "leaky bootstrap" handling
// To enable:
// - Add "var ForceTimeouts bool" to bootstrap.go
// - In CouchbaseCluster.GetMetadataDocument, add the following after loadConfig:
// if ForceTimeouts {
// return 0, gocb.ErrTimeout
// }
// - enable the code block below
/*
base.ForceTimeouts = true
// Wait to ensure database doesn't disappear
err = rt.WaitForConditionWithOptions(func() bool {
_, err := rt.ServerContext().GetActiveDatabase(dbName)
return errors.Is(err, base.ErrNotFound)
}, 200, 50)
require.Error(t, err)
base.ForceTimeouts = false
*/

// Delete the config directly
rt.RemoveDbConfigFromBucket("db1", rt.CustomTestBucket.GetName())

// assert that the database is unloaded
err = rt.WaitForConditionWithOptions(func() bool {
_, err := rt.ServerContext().GetActiveDatabase(dbName)
return errors.Is(err, base.ErrNotFound)

}, 200, 1000)
require.NoError(t, err)

// assert that a request to the database fails with correct error message
resp = rt.SendAdminRequest(http.MethodGet, "/db1/_config", "")
rest.RequireStatus(t, resp, http.StatusNotFound)
assert.Contains(t, resp.Body.String(), "no such database")
})
}
}

func TestResyncStopUsingDCPStream(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
// This test requires a gocb bucket
Expand Down
108 changes: 60 additions & 48 deletions rest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,17 +1629,26 @@ func (sc *ServerContext) fetchAndLoadConfigs(ctx context.Context, isInitialStart
sc.lock.Lock()
defer sc.lock.Unlock()
for _, dbName := range deletedDatabases {
dbc, ok := sc.databases_[dbName]
if !ok {
base.DebugfCtx(ctx, base.KeyConfig, "Database %q already removed from server context after acquiring write lock - do not need to remove not removing database", base.MD(dbName))
continue
}
// It's possible that the "deleted" database was not written to the server until after sc.FetchConfigs had returned...
// we'll need to pay for the cost of getting the config again now that we've got the write lock to double-check this db is definitely ok to remove...
found, _, err := sc._fetchDatabase(ctx, dbName)
if err != nil {
base.InfofCtx(ctx, base.KeyConfig, "Error fetching config for database %q to check whether we need to remove it: %v", dbName, err)
found, _, getConfigErr := sc._fetchDatabaseFromBucket(ctx, dbc.Bucket.GetName(), dbName)
if found && getConfigErr == nil {
base.DebugfCtx(ctx, base.KeyConfig, "Found config for database %q after acquiring write lock - not removing database", base.MD(dbName))
continue
}
if base.IsRecoverableReadError(getConfigErr) {
base.InfofCtx(ctx, base.KeyConfig, "Transient error fetching config for database %q to check whether we need to remove it, will not be removed: %v", base.MD(dbName), getConfigErr)
continue
}

if !found {
base.InfofCtx(ctx, base.KeyConfig, "Database %q was running on this node, but config was not found on the server - removing database", base.MD(dbName))
sc._removeDatabase(ctx, dbName)
} else {
base.DebugfCtx(ctx, base.KeyConfig, "Found config for database %q after acquiring write lock - not removing database", base.MD(dbName))
}
}

Expand Down Expand Up @@ -1747,56 +1756,59 @@ func (sc *ServerContext) fetchDatabase(ctx context.Context, dbName string) (foun
return sc._fetchDatabase(ctx, dbName)
}

func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (found bool, dbConfig *DatabaseConfig, err error) {
// loop code moved to foreachDbConfig
var cnf DatabaseConfig
callback := func(bucket string) (exit bool, err error) {
cas, err := sc.BootstrapContext.GetConfig(ctx, bucket, sc.Config.Bootstrap.ConfigGroupID, dbName, &cnf)
if err == base.ErrNotFound {
base.DebugfCtx(ctx, base.KeyConfig, "%q did not contain config in group %q", bucket, sc.Config.Bootstrap.ConfigGroupID)
return false, err
}
if err != nil {
base.DebugfCtx(ctx, base.KeyConfig, "unable to fetch config in group %q from bucket %q: %v", sc.Config.Bootstrap.ConfigGroupID, bucket, err)
return false, err
}
func (sc *ServerContext) _fetchDatabaseFromBucket(ctx context.Context, bucket string, dbName string) (found bool, cnf DatabaseConfig, err error) {

if cnf.Name == "" {
cnf.Name = bucket
}
cas, err := sc.BootstrapContext.GetConfig(ctx, bucket, sc.Config.Bootstrap.ConfigGroupID, dbName, &cnf)
if err == base.ErrNotFound {
base.DebugfCtx(ctx, base.KeyConfig, "%q did not contain config in group %q", bucket, sc.Config.Bootstrap.ConfigGroupID)
return false, cnf, err
}
if err != nil {
base.DebugfCtx(ctx, base.KeyConfig, "unable to fetch config in group %q from bucket %q: %v", sc.Config.Bootstrap.ConfigGroupID, bucket, err)
return false, cnf, err
}

if cnf.Name != dbName {
base.TracefCtx(ctx, base.KeyConfig, "%q did not contain config in group %q for db %q", bucket, sc.Config.Bootstrap.ConfigGroupID, dbName)
return false, err
}
if cnf.Name == "" {
cnf.Name = bucket
}

cnf.cfgCas = cas
if cnf.Name != dbName {
base.TracefCtx(ctx, base.KeyConfig, "%q did not contain config in group %q for db %q", bucket, sc.Config.Bootstrap.ConfigGroupID, dbName)
return false, cnf, err
}

// TODO: This code is mostly copied from FetchConfigs, move into shared function with DbConfig REST API work?
cnf.cfgCas = cas

// inherit properties the bootstrap config
cnf.CACertPath = sc.Config.Bootstrap.CACertPath
// inherit properties the bootstrap config
cnf.CACertPath = sc.Config.Bootstrap.CACertPath

// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != cnf.GetBucketName() {
sc._handleInvalidDatabaseConfig(ctx, bucket, cnf)
return true, fmt.Errorf("mismatch in persisted database bucket name %q vs the actual bucket name %q. Please correct db %q's config, groupID %q.", base.MD(cnf.Bucket), base.MD(bucket), base.MD(cnf.Name), base.MD(sc.Config.Bootstrap.ConfigGroupID))
}
bucketCopy := bucket
// no corruption detected carry on as usual
cnf.Bucket = &bucketCopy
// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != cnf.GetBucketName() {
sc._handleInvalidDatabaseConfig(ctx, bucket, cnf)
return true, cnf, fmt.Errorf("mismatch in persisted database bucket name %q vs the actual bucket name %q. Please correct db %q's config, groupID %q.", base.MD(cnf.Bucket), base.MD(bucket), base.MD(cnf.Name), base.MD(sc.Config.Bootstrap.ConfigGroupID))
}
bucketCopy := bucket
// no corruption detected carry on as usual
cnf.Bucket = &bucketCopy

// any authentication fields defined on the dbconfig take precedence over any in the bootstrap config
if cnf.Username == "" && cnf.Password == "" && cnf.CertPath == "" && cnf.KeyPath == "" {
cnf.Username = sc.Config.Bootstrap.Username
cnf.Password = sc.Config.Bootstrap.Password
cnf.CertPath = sc.Config.Bootstrap.X509CertPath
cnf.KeyPath = sc.Config.Bootstrap.X509KeyPath
}
base.TracefCtx(ctx, base.KeyConfig, "Got database config %s for bucket %q with cas %d and groupID %q", base.MD(dbName), base.MD(bucket), cas, base.MD(sc.Config.Bootstrap.ConfigGroupID))
return true, nil
// any authentication fields defined on the dbconfig take precedence over any in the bootstrap config
if cnf.Username == "" && cnf.Password == "" && cnf.CertPath == "" && cnf.KeyPath == "" {
cnf.Username = sc.Config.Bootstrap.Username
cnf.Password = sc.Config.Bootstrap.Password
cnf.CertPath = sc.Config.Bootstrap.X509CertPath
cnf.KeyPath = sc.Config.Bootstrap.X509KeyPath
}
base.TracefCtx(ctx, base.KeyConfig, "Got database config %s for bucket %q with cas %d and groupID %q", base.MD(dbName), base.MD(bucket), cas, base.MD(sc.Config.Bootstrap.ConfigGroupID))
return true, cnf, nil
}

func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (found bool, dbConfig *DatabaseConfig, err error) {
var cnf DatabaseConfig
callback := func(bucket string) (exit bool, err error) {
found, cnf, err = sc._fetchDatabaseFromBucket(ctx, bucket, dbName)
return found, err
}

err = sc.findBucketWithCallback(callback)
Expand Down
2 changes: 1 addition & 1 deletion rest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2981,7 +2981,7 @@ func TestInvalidDbConfigNoLongerPresentInBucket(t *testing.T) {
}, time.Second*10, time.Millisecond*100)

// remove the invalid config from the bucket
rt.DeleteDbConfigInBucket(dbName, realBucketName)
rt.RemoveDbConfigFromBucket(dbName, realBucketName)

// force reload of configs from bucket
rt.ServerContext().ForceDbConfigsReload(t, ctx)
Expand Down
2 changes: 2 additions & 0 deletions rest/utilities_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type RestTesterConfig struct {
syncGatewayVersion *base.ComparableBuildVersion // alternate version of Sync Gateway to use on startup
allowDbConfigEnvVars *bool
maxConcurrentRevs *int
UseXattrConfig bool
}

type collectionConfiguration uint8
Expand Down Expand Up @@ -234,6 +235,7 @@ func (rt *RestTester) Bucket() base.Bucket {
sc.Bootstrap.ServerTLSSkipVerify = base.BoolPtr(base.TestTLSSkipVerify())
sc.Unsupported.Serverless.Enabled = &rt.serverless
sc.Unsupported.AllowDbConfigEnvVars = rt.RestTesterConfig.allowDbConfigEnvVars
sc.Unsupported.UseXattrConfig = &rt.UseXattrConfig
sc.Replicator.MaxConcurrentRevs = rt.RestTesterConfig.maxConcurrentRevs
if rt.serverless {
if !rt.PersistentConfig {
Expand Down
2 changes: 1 addition & 1 deletion rest/utilities_testing_resttester.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (rt *RestTester) InsertDbConfigToBucket(config *DatabaseConfig, bucketName
require.NoError(rt.TB(), insertErr)
}

func (rt *RestTester) DeleteDbConfigInBucket(dbName, bucketName string) {
func (rt *RestTester) RemoveDbConfigFromBucket(dbName string, bucketName string) {
deleteErr := rt.ServerContext().BootstrapContext.DeleteConfig(base.TestCtx(rt.TB()), bucketName, rt.ServerContext().Config.Bootstrap.ConfigGroupID, dbName)
require.NoError(rt.TB(), deleteErr)
}
Expand Down

0 comments on commit 77ab7c5

Please sign in to comment.