Skip to content

Commit

Permalink
[CRE-45] Check for duplicated cache modules (#15901)
Browse files Browse the repository at this point in the history
* fix: check for duplicated cache modules

* chore: return error and log duplicated module id as error level

* chore: log with warn level
  • Loading branch information
agparadiso authored Jan 14, 2025
1 parent 6860107 commit 967c03c
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 8 deletions.
9 changes: 8 additions & 1 deletion core/capabilities/compute/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package compute

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -82,12 +83,18 @@ func (mc *moduleCache) reapLoop() {
}
}

func (mc *moduleCache) add(id string, mod *module) {
func (mc *moduleCache) add(id string, mod *module) error {
mc.mu.Lock()
defer mc.mu.Unlock()

if mc.m[id] != nil {
return fmt.Errorf("module with id %q already exists in cache", id)
}

mod.lastFetchedAt = mc.clock.Now()
mc.m[id] = mod
moduleCacheAddition.Inc()
return nil
}

func (mc *moduleCache) get(id string) (*module, bool) {
Expand Down
61 changes: 57 additions & 4 deletions core/capabilities/compute/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package compute

import (
"fmt"
"testing"
"time"

Expand All @@ -16,8 +17,8 @@ import (
)

const (
binaryLocation = "test/simple/cmd/testmodule.wasm"
binaryCmd = "core/capabilities/compute/test/simple/cmd"
simpleBinaryLocation = "test/simple/cmd/testmodule.wasm"
simpleBinaryCmd = "core/capabilities/compute/test/simple/cmd"
)

// Verify that cache evicts an expired module.
Expand All @@ -34,7 +35,7 @@ func TestCache(t *testing.T) {
cache.start()
defer cache.close()

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t)
binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, false, t)
hmod, err := host.NewModule(&host.ModuleConfig{
Logger: logger.TestLogger(t),
IsUncompressed: true,
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestCache_EvictAfterSize(t *testing.T) {
cache.start()
defer cache.close()

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, false, t)
binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, false, t)
hmod, err := host.NewModule(&host.ModuleConfig{
Logger: logger.TestLogger(t),
IsUncompressed: true,
Expand Down Expand Up @@ -103,3 +104,55 @@ func TestCache_EvictAfterSize(t *testing.T) {
_, ok = cache.get(id)
assert.True(t, ok)
}

func TestCache_AddDuplicatedModule(t *testing.T) {
t.Parallel()
clock := clockwork.NewFakeClock()
tick := 1 * time.Second
timeout := 1 * time.Second
reapTicker := make(chan time.Time)

cache := newModuleCache(clock, tick, timeout, 0)
cache.onReaper = make(chan struct{}, 1)
cache.reapTicker = reapTicker
cache.start()
defer cache.close()

simpleBinary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, false, t)
shmod, err := host.NewModule(&host.ModuleConfig{
Logger: logger.TestLogger(t),
IsUncompressed: true,
}, simpleBinary)
require.NoError(t, err)

// we will use the same id for both modules, but should only be associated to the simple module
duplicatedID := uuid.New().String()
smod := &module{
module: shmod,
}
err = cache.add(duplicatedID, smod)
require.NoError(t, err)

got, ok := cache.get(duplicatedID)
assert.True(t, ok)
assert.Equal(t, got, smod)

// Adding a different module but with the same id should not overwrite the existing module
fetchBinary := wasmtest.CreateTestBinary(fetchBinaryCmd, fetchBinaryLocation, false, t)
fhmod, err := host.NewModule(&host.ModuleConfig{
Logger: logger.TestLogger(t),
IsUncompressed: true,
}, fetchBinary)
require.NoError(t, err)

fmod := &module{
module: fhmod,
}
err = cache.add(duplicatedID, fmod)
require.ErrorContains(t, err, fmt.Sprintf("module with id %q already exists in cache", duplicatedID))

// validate that the module is still the same
got, ok = cache.get(duplicatedID)
assert.True(t, ok)
assert.Equal(t, got, smod)
}
5 changes: 4 additions & 1 deletion core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, r
computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration))

m := &module{module: mod}
c.modules.add(id, m)
err = c.modules.add(id, m)
if err != nil {
c.log.Warnf("failed to add module to cache: %s", err.Error())
}
return m, nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestComputeExecuteMissingConfig(t *testing.T) {
th := setup(t, defaultConfig)
require.NoError(t, th.compute.Start(tests.Context(t)))

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, true, t)

config, err := values.WrapMap(map[string]any{
"binary": binary,
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestComputeExecute(t *testing.T) {

require.NoError(t, th.compute.Start(tests.Context(t)))

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
binary := wasmtest.CreateTestBinary(simpleBinaryCmd, simpleBinaryLocation, true, t)

config, err := values.WrapMap(map[string]any{
"config": []byte(""),
Expand Down

0 comments on commit 967c03c

Please sign in to comment.