Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added exponential retry to the domain cache #6676

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/errors"
Expand Down Expand Up @@ -60,10 +61,7 @@ const (
domainCacheMinRefreshInterval = 1 * time.Second
// DomainCacheRefreshInterval domain cache refresh interval
DomainCacheRefreshInterval = 10 * time.Second
// DomainCacheRefreshFailureRetryInterval is the wait time
// if refreshment encounters error
DomainCacheRefreshFailureRetryInterval = 1 * time.Second
domainCacheRefreshPageSize = 200
domainCacheRefreshPageSize = 200

domainCachePersistenceTimeout = 3 * time.Second

Expand Down Expand Up @@ -119,6 +117,8 @@ type (
callbackLock sync.Mutex
prepareCallbacks map[int]PrepareCallbackFn
callbacks map[int]CallbackFn

throttleRetry *backoff.ThrottleRetry
}

// DomainCacheEntries is DomainCacheEntry slice
Expand Down Expand Up @@ -160,6 +160,11 @@ func NewDomainCache(
opts ...DomainCacheOption,
) *DefaultDomainCache {

throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(common.CreateDomainCacheRetryPolicy()),
backoff.WithRetryableError(common.IsServiceTransientError),
)

cache := &DefaultDomainCache{
status: domainCacheInitialized,
shutdownChan: make(chan struct{}),
Expand All @@ -172,6 +177,7 @@ func NewDomainCache(
logger: logger,
prepareCallbacks: make(map[int]PrepareCallbackFn),
callbacks: make(map[int]CallbackFn),
throttleRetry: throttleRetry,
}
cache.cacheNameToID.Store(newDomainCache())
cache.cacheByID.Store(newDomainCache())
Expand Down Expand Up @@ -411,15 +417,12 @@ func (c *DefaultDomainCache) refreshLoop() {
case <-c.shutdownChan:
return
case <-timer.Chan():
for err := c.refreshDomains(); err != nil; err = c.refreshDomains() {
select {
case <-c.shutdownChan:
return
default:
c.logger.Error("Error refreshing domain cache", tag.Error(err))
c.timeSource.Sleep(DomainCacheRefreshFailureRetryInterval)
}
err := c.refreshDomains()
if err != nil {
c.logger.Error("Error refreshing domain cache", tag.Error(err))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider logging fatal here? Since we already retryed quite a lot, and we still failed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it like this. It may have failed due to persistence rps limits and in those cases we don't want to get into crash loop. Ideally fetching domains shouldn't be subject to the same limits. @davidporter-id-au will look into granular rate limits area.

continue
}

c.logger.Debug("Domain cache refreshed")
}
}
Expand All @@ -428,7 +431,7 @@ func (c *DefaultDomainCache) refreshLoop() {
func (c *DefaultDomainCache) refreshDomains() error {
c.refreshLock.Lock()
defer c.refreshLock.Unlock()
return c.refreshDomainsLocked()
return c.throttleRetry.Do(context.Background(), c.refreshDomainsLocked)
}

// this function only refresh the domains in the v2 table
Expand Down
28 changes: 23 additions & 5 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,13 +897,11 @@ func (s *domainCacheSuite) Test_refreshLoop_domainCacheRefreshedError() {

s.domainCache.timeSource = mockedTimeSource

s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Twice()
s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Once()

go func() {
mockedTimeSource.BlockUntil(1)
mockedTimeSource.Advance(DomainCacheRefreshInterval)
mockedTimeSource.BlockUntil(2)
mockedTimeSource.Advance(DomainCacheRefreshFailureRetryInterval)
s.domainCache.shutdownChan <- struct{}{}
}()

Expand All @@ -921,14 +919,34 @@ func (s *domainCacheSuite) Test_refreshDomainsLocked_IntervalTooShort() {
s.NoError(err)
}

func (s *domainCacheSuite) Test_refreshDomainsLocked_ListDomainsError() {
func (s *domainCacheSuite) Test_refreshDomains_ListDomainsNonRetryableError() {
s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Once()
s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()

err := s.domainCache.refreshDomainsLocked()
err := s.domainCache.refreshDomains()
s.ErrorIs(err, assert.AnError)
}

func (s *domainCacheSuite) Test_refreshDomains_ListDomainsRetryableError() {
retryableError := &types.ServiceBusyError{
Message: "Service is busy",
}

// We expect the metadataMgr to be called twice, once for the initial attempt and once for the retry
s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Times(2)

// First time return retryable error
s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, retryableError).Once()

// Second time return non-retryable error
s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()

err := s.domainCache.refreshDomains()

// We expect the error to be the first error
s.ErrorIs(err, retryableError)
}

func (s *domainCacheSuite) TestDomainCacheEntry_Getters() {
gen := testdatagen.New(s.T())

Expand Down
13 changes: 13 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ const (
taskCompleterMaxInterval = 10 * time.Second
taskCompleterExpirationInterval = 5 * time.Minute

domainCacheInitialInterval = 1 * time.Second
domainCacheMaxInterval = 5 * time.Second
domainCacheExpirationInterval = 2 * time.Minute

contextExpireThreshold = 10 * time.Millisecond

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
Expand Down Expand Up @@ -228,6 +232,15 @@ func CreateTaskCompleterRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateDomainCacheRetryPolicy creates a retry policy to handle domain cache refresh failures
func CreateDomainCacheRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(domainCacheInitialInterval)
policy.SetMaximumInterval(domainCacheMaxInterval)
policy.SetExpirationInterval(domainCacheExpirationInterval)

return policy
}

// IsValidIDLength checks if id is valid according to its length
func IsValidIDLength(
id string,
Expand Down
Loading