diff --git a/flytestdlib/cache/in_memory_auto_refresh.go b/flytestdlib/cache/in_memory_auto_refresh.go index a566c7928c..c3072f5b26 100644 --- a/flytestdlib/cache/in_memory_auto_refresh.go +++ b/flytestdlib/cache/in_memory_auto_refresh.go @@ -51,6 +51,7 @@ func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value type Options struct { clock clock.WithTicker createBatchesCb CreateBatchesFunc + syncOnCreate bool } // WithClock configures the clock to use for time related operations. Mainly used for unit testing. @@ -67,10 +68,20 @@ func WithCreateBatchesFunc(createBatchesCb CreateBatchesFunc) Option { } } +// WithSyncOnCreate configures whether the cache will attempt to sync items upon creation or wait until the next +// sync interval. Disabling this can be useful when the cache is under high load and synchronization both frequently +// and in large batches. Defaults to true. +func WithSyncOnCreate(syncOnCreate bool) Option { + return func(mo *Options) { + mo.syncOnCreate = syncOnCreate + } +} + func defaultOptions() *Options { opts := &Options{} WithClock(clock.RealClock{})(opts) WithCreateBatchesFunc(SingleItemBatches)(opts) + WithSyncOnCreate(true)(opts) return opts } @@ -102,6 +113,7 @@ type InMemoryAutoRefresh struct { syncCount atomic.Int32 // internal sync counter for unit testing enqueueCount atomic.Int32 // internal enqueue counter for unit testing enqueueLoopRunning atomic.Bool // internal bool to ensure goroutines are running + syncOnCreate bool } // NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh @@ -145,6 +157,7 @@ func NewInMemoryAutoRefresh( syncCount: atomic.NewInt32(0), enqueueCount: atomic.NewInt32(0), enqueueLoopRunning: atomic.NewBool(false), + syncOnCreate: opts.syncOnCreate, } return cache, nil @@ -228,10 +241,12 @@ func (w *InMemoryAutoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) { // It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created. // This way, the item will be processed without waiting for the next sync cycle (30s by default). - batch := make([]ItemWrapper, 0, 1) - batch = append(batch, itemWrapper{id: id, item: item}) - w.workqueue.AddRateLimited(&batch) - w.processing.Store(id, w.clock.Now()) + if w.syncOnCreate { + batch := make([]ItemWrapper, 0, 1) + batch = append(batch, itemWrapper{id: id, item: item}) + w.workqueue.AddRateLimited(&batch) + w.processing.Store(id, w.clock.Now()) + } return item, nil } diff --git a/flytestdlib/cache/in_memory_auto_refresh_test.go b/flytestdlib/cache/in_memory_auto_refresh_test.go index 229664c720..869c79fd8e 100644 --- a/flytestdlib/cache/in_memory_auto_refresh_test.go +++ b/flytestdlib/cache/in_memory_auto_refresh_test.go @@ -95,6 +95,40 @@ func TestCacheFour(t *testing.T) { assert.NoError(t, err) } + // Cache should be processing + assert.NotEmpty(t, cache.processing) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + // trigger periodic sync + fakeClock.Step(testResyncPeriod) + + for i := 1; i <= 10; i++ { + item, err := cache.Get(fmt.Sprintf("%d", i)) + assert.NoError(c, err) + assert.Equal(c, 10, item.(fakeCacheItem).val) + } + }, 3*time.Second, time.Millisecond) + cancel() + }) + + t.Run("disable sync on create", func(t *testing.T) { + cache, err := NewInMemoryAutoRefresh("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope(), WithClock(fakeClock), WithSyncOnCreate(false)) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, cache.Start(ctx)) + + // Create ten items in the cache + for i := 1; i <= 10; i++ { + _, err := cache.GetOrCreate(fmt.Sprintf("%d", i), fakeCacheItem{ + val: 0, + }) + assert.NoError(t, err) + } + + // Validate that none are processing since they aren't synced on creation + assert.Empty(t, cache.processing) + assert.EventuallyWithT(t, func(c *assert.CollectT) { // trigger periodic sync fakeClock.Step(testResyncPeriod)