Skip to content

Commit

Permalink
Added exponential retry to the domain cache (#6676)
Browse files Browse the repository at this point in the history
What changed?
Added a proper retry policy to the domain cache so it does not just retry every second

Why?
The load from retrying every second can cause too much load on the database.

How did you test it?
Unit test

Potential risks
Might cause domain data to be out of date, but we are already in that situation when it's failing.

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Feb 20, 2025
1 parent f717213 commit 622fdce
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 18 deletions.
29 changes: 16 additions & 13 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/errors"
Expand Down Expand Up @@ -60,10 +61,7 @@ const (
domainCacheMinRefreshInterval = 1 * time.Second
// DomainCacheRefreshInterval domain cache refresh interval
DomainCacheRefreshInterval = 10 * time.Second
// DomainCacheRefreshFailureRetryInterval is the wait time
// if refreshment encounters error
DomainCacheRefreshFailureRetryInterval = 1 * time.Second
domainCacheRefreshPageSize = 200
domainCacheRefreshPageSize = 200

domainCachePersistenceTimeout = 3 * time.Second

Expand Down Expand Up @@ -119,6 +117,8 @@ type (
callbackLock sync.Mutex
prepareCallbacks map[int]PrepareCallbackFn
callbacks map[int]CallbackFn

throttleRetry *backoff.ThrottleRetry
}

// DomainCacheEntries is DomainCacheEntry slice
Expand Down Expand Up @@ -160,6 +160,11 @@ func NewDomainCache(
opts ...DomainCacheOption,
) *DefaultDomainCache {

throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(common.CreateDomainCacheRetryPolicy()),
backoff.WithRetryableError(common.IsServiceTransientError),
)

cache := &DefaultDomainCache{
status: domainCacheInitialized,
shutdownChan: make(chan struct{}),
Expand All @@ -172,6 +177,7 @@ func NewDomainCache(
logger: logger,
prepareCallbacks: make(map[int]PrepareCallbackFn),
callbacks: make(map[int]CallbackFn),
throttleRetry: throttleRetry,
}
cache.cacheNameToID.Store(newDomainCache())
cache.cacheByID.Store(newDomainCache())
Expand Down Expand Up @@ -411,15 +417,12 @@ func (c *DefaultDomainCache) refreshLoop() {
case <-c.shutdownChan:
return
case <-timer.Chan():
for err := c.refreshDomains(); err != nil; err = c.refreshDomains() {
select {
case <-c.shutdownChan:
return
default:
c.logger.Error("Error refreshing domain cache", tag.Error(err))
c.timeSource.Sleep(DomainCacheRefreshFailureRetryInterval)
}
err := c.refreshDomains()
if err != nil {
c.logger.Error("Error refreshing domain cache", tag.Error(err))
continue
}

c.logger.Debug("Domain cache refreshed")
}
}
Expand All @@ -428,7 +431,7 @@ func (c *DefaultDomainCache) refreshLoop() {
func (c *DefaultDomainCache) refreshDomains() error {
c.refreshLock.Lock()
defer c.refreshLock.Unlock()
return c.refreshDomainsLocked()
return c.throttleRetry.Do(context.Background(), c.refreshDomainsLocked)
}

// this function only refresh the domains in the v2 table
Expand Down
28 changes: 23 additions & 5 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,13 +897,11 @@ func (s *domainCacheSuite) Test_refreshLoop_domainCacheRefreshedError() {

s.domainCache.timeSource = mockedTimeSource

s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Twice()
s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Once()

go func() {
mockedTimeSource.BlockUntil(1)
mockedTimeSource.Advance(DomainCacheRefreshInterval)
mockedTimeSource.BlockUntil(2)
mockedTimeSource.Advance(DomainCacheRefreshFailureRetryInterval)
s.domainCache.shutdownChan <- struct{}{}
}()

Expand All @@ -921,14 +919,34 @@ func (s *domainCacheSuite) Test_refreshDomainsLocked_IntervalTooShort() {
s.NoError(err)
}

func (s *domainCacheSuite) Test_refreshDomainsLocked_ListDomainsError() {
func (s *domainCacheSuite) Test_refreshDomains_ListDomainsNonRetryableError() {
s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Once()
s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()

err := s.domainCache.refreshDomainsLocked()
err := s.domainCache.refreshDomains()
s.ErrorIs(err, assert.AnError)
}

func (s *domainCacheSuite) Test_refreshDomains_ListDomainsRetryableError() {
retryableError := &types.ServiceBusyError{
Message: "Service is busy",
}

// We expect the metadataMgr to be called twice, once for the initial attempt and once for the retry
s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Times(2)

// First time return retryable error
s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, retryableError).Once()

// Second time return non-retryable error
s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()

err := s.domainCache.refreshDomains()

// We expect the error to be the first error
s.ErrorIs(err, retryableError)
}

func (s *domainCacheSuite) TestDomainCacheEntry_Getters() {
gen := testdatagen.New(s.T())

Expand Down
13 changes: 13 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ const (
taskCompleterMaxInterval = 10 * time.Second
taskCompleterExpirationInterval = 5 * time.Minute

domainCacheInitialInterval = 1 * time.Second
domainCacheMaxInterval = 5 * time.Second
domainCacheExpirationInterval = 2 * time.Minute

contextExpireThreshold = 10 * time.Millisecond

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
Expand Down Expand Up @@ -228,6 +232,15 @@ func CreateTaskCompleterRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateDomainCacheRetryPolicy creates a retry policy to handle domain cache refresh failures
func CreateDomainCacheRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(domainCacheInitialInterval)
policy.SetMaximumInterval(domainCacheMaxInterval)
policy.SetExpirationInterval(domainCacheExpirationInterval)

return policy
}

// IsValidIDLength checks if id is valid according to its length
func IsValidIDLength(
id string,
Expand Down

0 comments on commit 622fdce

Please sign in to comment.