diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 0856fbe63f6..cfa5f65f9ad 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -33,6 +33,7 @@ import ( "time" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/errors" @@ -60,10 +61,7 @@ const ( domainCacheMinRefreshInterval = 1 * time.Second // DomainCacheRefreshInterval domain cache refresh interval DomainCacheRefreshInterval = 10 * time.Second - // DomainCacheRefreshFailureRetryInterval is the wait time - // if refreshment encounters error - DomainCacheRefreshFailureRetryInterval = 1 * time.Second - domainCacheRefreshPageSize = 200 + domainCacheRefreshPageSize = 200 domainCachePersistenceTimeout = 3 * time.Second @@ -119,6 +117,8 @@ type ( callbackLock sync.Mutex prepareCallbacks map[int]PrepareCallbackFn callbacks map[int]CallbackFn + + throttleRetry *backoff.ThrottleRetry } // DomainCacheEntries is DomainCacheEntry slice @@ -160,6 +160,11 @@ func NewDomainCache( opts ...DomainCacheOption, ) *DefaultDomainCache { + throttleRetry := backoff.NewThrottleRetry( + backoff.WithRetryPolicy(common.CreateDomainCacheRetryPolicy()), + backoff.WithRetryableError(common.IsServiceTransientError), + ) + cache := &DefaultDomainCache{ status: domainCacheInitialized, shutdownChan: make(chan struct{}), @@ -172,6 +177,7 @@ func NewDomainCache( logger: logger, prepareCallbacks: make(map[int]PrepareCallbackFn), callbacks: make(map[int]CallbackFn), + throttleRetry: throttleRetry, } cache.cacheNameToID.Store(newDomainCache()) cache.cacheByID.Store(newDomainCache()) @@ -411,15 +417,12 @@ func (c *DefaultDomainCache) refreshLoop() { case <-c.shutdownChan: return case <-timer.Chan(): - for err := c.refreshDomains(); err != nil; err = c.refreshDomains() { - select { - case <-c.shutdownChan: - return - default: - c.logger.Error("Error refreshing domain cache", tag.Error(err)) - c.timeSource.Sleep(DomainCacheRefreshFailureRetryInterval) - } + err := c.refreshDomains() + if err != nil { + c.logger.Error("Error refreshing domain cache", tag.Error(err)) + continue } + c.logger.Debug("Domain cache refreshed") } } @@ -428,7 +431,7 @@ func (c *DefaultDomainCache) refreshLoop() { func (c *DefaultDomainCache) refreshDomains() error { c.refreshLock.Lock() defer c.refreshLock.Unlock() - return c.refreshDomainsLocked() + return c.throttleRetry.Do(context.Background(), c.refreshDomainsLocked) } // this function only refresh the domains in the v2 table diff --git a/common/cache/domainCache_test.go b/common/cache/domainCache_test.go index dd1cb1fa99b..b5f7cdeb1aa 100644 --- a/common/cache/domainCache_test.go +++ b/common/cache/domainCache_test.go @@ -897,13 +897,11 @@ func (s *domainCacheSuite) Test_refreshLoop_domainCacheRefreshedError() { s.domainCache.timeSource = mockedTimeSource - s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Twice() + s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Once() go func() { mockedTimeSource.BlockUntil(1) mockedTimeSource.Advance(DomainCacheRefreshInterval) - mockedTimeSource.BlockUntil(2) - mockedTimeSource.Advance(DomainCacheRefreshFailureRetryInterval) s.domainCache.shutdownChan <- struct{}{} }() @@ -921,14 +919,34 @@ func (s *domainCacheSuite) Test_refreshDomainsLocked_IntervalTooShort() { s.NoError(err) } -func (s *domainCacheSuite) Test_refreshDomainsLocked_ListDomainsError() { +func (s *domainCacheSuite) Test_refreshDomains_ListDomainsNonRetryableError() { s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Once() s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() - err := s.domainCache.refreshDomainsLocked() + err := s.domainCache.refreshDomains() s.ErrorIs(err, assert.AnError) } +func (s *domainCacheSuite) Test_refreshDomains_ListDomainsRetryableError() { + retryableError := &types.ServiceBusyError{ + Message: "Service is busy", + } + + // We expect the metadataMgr to be called twice, once for the initial attempt and once for the retry + s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Times(2) + + // First time return retryable error + s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, retryableError).Once() + + // Second time return non-retryable error + s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() + + err := s.domainCache.refreshDomains() + + // We expect the error to be the first error + s.ErrorIs(err, retryableError) +} + func (s *domainCacheSuite) TestDomainCacheEntry_Getters() { gen := testdatagen.New(s.T()) diff --git a/common/util.go b/common/util.go index 247be39191b..1918e9c6785 100644 --- a/common/util.go +++ b/common/util.go @@ -88,6 +88,10 @@ const ( taskCompleterMaxInterval = 10 * time.Second taskCompleterExpirationInterval = 5 * time.Minute + domainCacheInitialInterval = 1 * time.Second + domainCacheMaxInterval = 5 * time.Second + domainCacheExpirationInterval = 2 * time.Minute + contextExpireThreshold = 10 * time.Millisecond // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit @@ -228,6 +232,15 @@ func CreateTaskCompleterRetryPolicy() backoff.RetryPolicy { return policy } +// CreateDomainCacheRetryPolicy creates a retry policy to handle domain cache refresh failures +func CreateDomainCacheRetryPolicy() backoff.RetryPolicy { + policy := backoff.NewExponentialRetryPolicy(domainCacheInitialInterval) + policy.SetMaximumInterval(domainCacheMaxInterval) + policy.SetExpirationInterval(domainCacheExpirationInterval) + + return policy +} + // IsValidIDLength checks if id is valid according to its length func IsValidIDLength( id string,