From 64f975f6265bb6c6df946628827d528d6a86b6f9 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Tue, 18 Feb 2025 15:17:30 +0100 Subject: [PATCH 1/4] Added exponential retry to the domain cache --- common/cache/domainCache.go | 24 +++++++++++++++--------- common/cache/domainCache_test.go | 24 ++++++++++++++++++++++-- common/util.go | 13 +++++++++++++ 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 0856fbe63f6..e2b02ff62fe 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -33,6 +33,7 @@ import ( "time" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/errors" @@ -119,6 +120,8 @@ type ( callbackLock sync.Mutex prepareCallbacks map[int]PrepareCallbackFn callbacks map[int]CallbackFn + + throttleRetry *backoff.ThrottleRetry } // DomainCacheEntries is DomainCacheEntry slice @@ -160,6 +163,11 @@ func NewDomainCache( opts ...DomainCacheOption, ) *DefaultDomainCache { + throttleRetry := backoff.NewThrottleRetry( + backoff.WithRetryPolicy(common.CreateDomainCacheRetryPolicy()), + backoff.WithRetryableError(common.IsServiceTransientError), + ) + cache := &DefaultDomainCache{ status: domainCacheInitialized, shutdownChan: make(chan struct{}), @@ -172,6 +180,7 @@ func NewDomainCache( logger: logger, prepareCallbacks: make(map[int]PrepareCallbackFn), callbacks: make(map[int]CallbackFn), + throttleRetry: throttleRetry, } cache.cacheNameToID.Store(newDomainCache()) cache.cacheByID.Store(newDomainCache()) @@ -411,15 +420,12 @@ func (c *DefaultDomainCache) refreshLoop() { case <-c.shutdownChan: return case <-timer.Chan(): - for err := c.refreshDomains(); err != nil; err = c.refreshDomains() { - select { - case <-c.shutdownChan: - return - default: - c.logger.Error("Error refreshing domain cache", tag.Error(err)) - c.timeSource.Sleep(DomainCacheRefreshFailureRetryInterval) - } + err := c.refreshDomains() + if err != nil { + c.logger.Error("Error refreshing domain cache", tag.Error(err)) + continue } + c.logger.Debug("Domain cache refreshed") } } @@ -428,7 +434,7 @@ func (c *DefaultDomainCache) refreshLoop() { func (c *DefaultDomainCache) refreshDomains() error { c.refreshLock.Lock() defer c.refreshLock.Unlock() - return c.refreshDomainsLocked() + return c.throttleRetry.Do(context.Background(), c.refreshDomainsLocked) } // this function only refresh the domains in the v2 table diff --git a/common/cache/domainCache_test.go b/common/cache/domainCache_test.go index dd1cb1fa99b..faf588b91e4 100644 --- a/common/cache/domainCache_test.go +++ b/common/cache/domainCache_test.go @@ -921,14 +921,34 @@ func (s *domainCacheSuite) Test_refreshDomainsLocked_IntervalTooShort() { s.NoError(err) } -func (s *domainCacheSuite) Test_refreshDomainsLocked_ListDomainsError() { +func (s *domainCacheSuite) Test_refreshDomains_ListDomainsNonRetryableError() { s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Once() s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() - err := s.domainCache.refreshDomainsLocked() + err := s.domainCache.refreshDomains() s.ErrorIs(err, assert.AnError) } +func (s *domainCacheSuite) Test_refreshDomains_ListDomainsRetryableError() { + retryableError := &types.ServiceBusyError{ + Message: "Service is busy", + } + + // We expect the metadataMgr to be called twice, once for the initial attempt and once for the retry + s.metadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 0}, nil).Times(2) + + // First time return retryable error + s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, retryableError).Once() + + // Second time return non-retryable error + s.metadataMgr.On("ListDomains", mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() + + err := s.domainCache.refreshDomains() + + // We expect the error to be the first error + s.ErrorIs(err, retryableError) +} + func (s *domainCacheSuite) TestDomainCacheEntry_Getters() { gen := testdatagen.New(s.T()) diff --git a/common/util.go b/common/util.go index 247be39191b..1918e9c6785 100644 --- a/common/util.go +++ b/common/util.go @@ -88,6 +88,10 @@ const ( taskCompleterMaxInterval = 10 * time.Second taskCompleterExpirationInterval = 5 * time.Minute + domainCacheInitialInterval = 1 * time.Second + domainCacheMaxInterval = 5 * time.Second + domainCacheExpirationInterval = 2 * time.Minute + contextExpireThreshold = 10 * time.Millisecond // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit @@ -228,6 +232,15 @@ func CreateTaskCompleterRetryPolicy() backoff.RetryPolicy { return policy } +// CreateDomainCacheRetryPolicy creates a retry policy to handle domain cache refresh failures +func CreateDomainCacheRetryPolicy() backoff.RetryPolicy { + policy := backoff.NewExponentialRetryPolicy(domainCacheInitialInterval) + policy.SetMaximumInterval(domainCacheMaxInterval) + policy.SetExpirationInterval(domainCacheExpirationInterval) + + return policy +} + // IsValidIDLength checks if id is valid according to its length func IsValidIDLength( id string, From 9a3a297365ab7ea66d1e500a7d3839ec6167b1e0 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 20 Feb 2025 07:56:04 +0100 Subject: [PATCH 2/4] Updated test that was testing the manual retry loop --- common/cache/domainCache_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/cache/domainCache_test.go b/common/cache/domainCache_test.go index faf588b91e4..b5f7cdeb1aa 100644 --- a/common/cache/domainCache_test.go +++ b/common/cache/domainCache_test.go @@ -897,13 +897,11 @@ func (s *domainCacheSuite) Test_refreshLoop_domainCacheRefreshedError() { s.domainCache.timeSource = mockedTimeSource - s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Twice() + s.metadataMgr.On("GetMetadata", mock.Anything).Return(nil, assert.AnError).Once() go func() { mockedTimeSource.BlockUntil(1) mockedTimeSource.Advance(DomainCacheRefreshInterval) - mockedTimeSource.BlockUntil(2) - mockedTimeSource.Advance(DomainCacheRefreshFailureRetryInterval) s.domainCache.shutdownChan <- struct{}{} }() From fb5586be127f6b4537b6d7719716c2506bde3238 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 20 Feb 2025 07:56:53 +0100 Subject: [PATCH 3/4] Removed unused constant --- common/cache/domainCache.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index e2b02ff62fe..cfa5f65f9ad 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -61,10 +61,7 @@ const ( domainCacheMinRefreshInterval = 1 * time.Second // DomainCacheRefreshInterval domain cache refresh interval DomainCacheRefreshInterval = 10 * time.Second - // DomainCacheRefreshFailureRetryInterval is the wait time - // if refreshment encounters error - DomainCacheRefreshFailureRetryInterval = 1 * time.Second - domainCacheRefreshPageSize = 200 + domainCacheRefreshPageSize = 200 domainCachePersistenceTimeout = 3 * time.Second From ade9c4d374dbf1662e15798c15449b0ca98453b9 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 20 Feb 2025 09:21:55 +0100 Subject: [PATCH 4/4] retrigger CD