From 967c03c47d294bde64a3e77f911a2ef4168c32a5 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Tue, 14 Jan 2025 15:32:08 +0100 Subject: [PATCH] [CRE-45] Check for duplicated cache modules (#15901) * fix: check for duplicated cache modules * chore: return error and log duplicated module id as error level * chore: log with warn level --- core/capabilities/compute/cache.go | 9 +++- core/capabilities/compute/cache_test.go | 61 +++++++++++++++++++++-- core/capabilities/compute/compute.go | 5 +- core/capabilities/compute/compute_test.go | 4 +- 4 files changed, 71 insertions(+), 8 deletions(-) diff --git a/core/capabilities/compute/cache.go b/core/capabilities/compute/cache.go index dbcc42c1606..e15678a6e6d 100644 --- a/core/capabilities/compute/cache.go +++ b/core/capabilities/compute/cache.go @@ -1,6 +1,7 @@ package compute import ( + "fmt" "sync" "time" @@ -82,12 +83,18 @@ func (mc *moduleCache) reapLoop() { } } -func (mc *moduleCache) add(id string, mod *module) { +func (mc *moduleCache) add(id string, mod *module) error { mc.mu.Lock() defer mc.mu.Unlock() + + if mc.m[id] != nil { + return fmt.Errorf("module with id %q already exists in cache", id) + } + mod.lastFetchedAt = mc.clock.Now() mc.m[id] = mod moduleCacheAddition.Inc() + return nil } func (mc *moduleCache) get(id string) (*module, bool) { diff --git a/core/capabilities/compute/cache_test.go b/core/capabilities/compute/cache_test.go index ad075f493b5..fb55eba285b 100644 --- a/core/capabilities/compute/cache_test.go +++ b/core/capabilities/compute/cache_test.go @@ -1,6 +1,7 @@ package compute import ( + "fmt" "testing" "time" @@ -16,8 +17,8 @@ import ( ) const ( - binaryLocation = "test/simple/cmd/testmodule.wasm" - binaryCmd = "core/capabilities/compute/test/simple/cmd" + simpleBinaryLocation = "test/simple/cmd/testmodule.wasm" + simpleBinaryCmd = "core/capabilities/compute/test/simple/cmd" ) // Verify that cache evicts an expired module. @@ -34,7 +35,7 @@ func TestCache(t *testing.T) { cache.start() defer cache.close() - binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t) + binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, false, t) hmod, err := host.NewModule(&host.ModuleConfig{ Logger: logger.TestLogger(t), IsUncompressed: true, @@ -74,7 +75,7 @@ func TestCache_EvictAfterSize(t *testing.T) { cache.start() defer cache.close() - binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t) + binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, false, t) hmod, err := host.NewModule(&host.ModuleConfig{ Logger: logger.TestLogger(t), IsUncompressed: true, @@ -103,3 +104,55 @@ func TestCache_EvictAfterSize(t *testing.T) { _, ok = cache.get(id) assert.True(t, ok) } + +func TestCache_AddDuplicatedModule(t *testing.T) { + t.Parallel() + clock := clockwork.NewFakeClock() + tick := 1 * time.Second + timeout := 1 * time.Second + reapTicker := make(chan time.Time) + + cache := newModuleCache(clock, tick, timeout, 0) + cache.onReaper = make(chan struct{}, 1) + cache.reapTicker = reapTicker + cache.start() + defer cache.close() + + simpleBinary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, false, t) + shmod, err := host.NewModule(&host.ModuleConfig{ + Logger: logger.TestLogger(t), + IsUncompressed: true, + }, simpleBinary) + require.NoError(t, err) + + // we will use the same id for both modules, but should only be associated to the simple module + duplicatedID := uuid.New().String() + smod := &module{ + module: shmod, + } + err = cache.add(duplicatedID, smod) + require.NoError(t, err) + + got, ok := cache.get(duplicatedID) + assert.True(t, ok) + assert.Equal(t, got, smod) + + // Adding a different module but with the same id should not overwrite the existing module + fetchBinary := wasmtest.CreateTestBinary(fetchBinaryCmd, fetchBinaryLocation, false, t) + fhmod, err := host.NewModule(&host.ModuleConfig{ + Logger: logger.TestLogger(t), + IsUncompressed: true, + }, fetchBinary) + require.NoError(t, err) + + fmod := &module{ + module: fhmod, + } + err = cache.add(duplicatedID, fmod) + require.ErrorContains(t, err, fmt.Sprintf("module with id %q already exists in cache", duplicatedID)) + + // validate that the module is still the same + got, ok = cache.get(duplicatedID) + assert.True(t, ok) + assert.Equal(t, got, smod) +} diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index 156c5154c99..4508d47534e 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -197,7 +197,10 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, r computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration)) m := &module{module: mod} - c.modules.add(id, m) + err = c.modules.add(id, m) + if err != nil { + c.log.Warnf("failed to add module to cache: %s", err.Error()) + } return m, nil } diff --git a/core/capabilities/compute/compute_test.go b/core/capabilities/compute/compute_test.go index 3e5f501fa61..98c8223409d 100644 --- a/core/capabilities/compute/compute_test.go +++ b/core/capabilities/compute/compute_test.go @@ -91,7 +91,7 @@ func TestComputeExecuteMissingConfig(t *testing.T) { th := setup(t, defaultConfig) require.NoError(t, th.compute.Start(tests.Context(t))) - binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, true, t) config, err := values.WrapMap(map[string]any{ "binary": binary, @@ -134,7 +134,7 @@ func TestComputeExecute(t *testing.T) { require.NoError(t, th.compute.Start(tests.Context(t))) - binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, true, t) config, err := values.WrapMap(map[string]any{ "config": []byte(""),