Skip to content

Commit

Permalink
feat: generics-based map pre-fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
p0mvn committed Aug 3, 2024
1 parent 5fe177e commit 6943417
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
55 changes: 55 additions & 0 deletions sqsutil/datafetchers/map_prefetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package datafetchers

import (
"fmt"
"time"
)

// MapFetcher is an interface that defines a fetchers that is represented
// by mapped data internally with keys and values.
type MapFetcher[K comparable, V any] interface {
Fetcher[map[K]V]

// GetByKey returns the value for the given key.
// Returns the value, the last fetch time, a boolean indicating if the data is stale, and an error.
// The value is considered stale if 2x more time passed since last update.
GetByKey(key K) (V, time.Time, bool, error)
}

type MapIntervalFetcher[K comparable, V any] struct {
*IntervalFetcher[map[K]V]
}

var _ MapFetcher[uint64, uint64] = (*MapIntervalFetcher[uint64, uint64])(nil)

// NewMapFetcher returns a new MapIntervalFetcher.
func NewMapFetcher[K comparable, V any](updateFn func() map[K]V, interval time.Duration) *MapIntervalFetcher[K, V] {
return &MapIntervalFetcher[K, V]{
IntervalFetcher: NewIntervalFetcher(updateFn, interval),
}
}

// GetByKey returns the value for the given key.
func (p *MapIntervalFetcher[K, V]) GetByKey(key K) (V, time.Time, bool, error) {
isStale := false

// If 2x more time passed since last update, set stale flag
timeDiff := time.Since(p.lastRetrievedTime)
if timeDiff > 2*p.interval {
isStale = true
}

resultMap, _, err := p.IntervalFetcher.Get()
if err != nil {
var zeroValue V
return zeroValue, p.lastRetrievedTime, isStale, err
}

value, ok := resultMap[key]
if !ok {
var zeroValue V
return zeroValue, p.lastRetrievedTime, isStale, fmt.Errorf("key not found: %v", key)
}

return value, p.lastRetrievedTime, isStale, nil
}
64 changes: 64 additions & 0 deletions sqsutil/datafetchers/map_prefetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package datafetchers_test

import (
"sync/atomic"
"testing"
"time"

"github.com/osmosis-labs/sqs/sqsutil/datafetchers"
"github.com/stretchr/testify/assert"
)

func TestMapIntervalFetcher_GetByKey(t *testing.T) {
// Run the test in parallel with other tests
t.Parallel()

didFetchOnce := atomic.Bool{}

// Define the update function
updateFn := func() map[int]string {
if didFetchOnce.Load() {
// Intentionally block the update function to simulate a slow update
time.Sleep(10 * time.Second)
}

didFetchOnce.Store(true)

return map[int]string{
1: "one",
2: "two",
3: "three",
}
}

// Create a new MapIntervalFetcher with a short interval
interval := 50 * time.Millisecond
fetcher := datafetchers.NewMapFetcher(updateFn, interval)

// Wait until the first result is fetched
fetcher.WaitUntilFirstResult()

// Test getting a valid key
value, lastFetch, isStale, err := fetcher.GetByKey(1)
assert.NoError(t, err)
assert.Equal(t, "one", value)
assert.False(t, isStale)
assert.NotZero(t, lastFetch)

// Test getting an invalid key
value, lastFetch, isStale, err = fetcher.GetByKey(99)
assert.Error(t, err)
assert.Equal(t, "", value)
assert.False(t, isStale)
assert.NotZero(t, lastFetch)

// Wait for more than 2x the interval to ensure data becomes stale
time.Sleep(200 * time.Millisecond)

// Test getting a key after data should be stale
value, lastFetch, isStale, err = fetcher.GetByKey(2)
assert.NoError(t, err)
assert.Equal(t, "two", value)
assert.True(t, isStale)
assert.NotZero(t, lastFetch)
}
6 changes: 6 additions & 0 deletions sqsutil/datafetchers/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
// Fetcher is an interface that provides a method to get a value.
type Fetcher[T any] interface {
Get() (T, time.Time, error)
GetRefetchInterval() time.Duration
WaitUntilFirstResult()
}

// IntervalFetcher is a struct that prefetches a value at a given interval
Expand Down Expand Up @@ -100,3 +102,7 @@ func (p *IntervalFetcher[T]) Close() {
p.hasClosed = true
p.timer.Stop()
}

func (p *IntervalFetcher[T]) GetRefetchInterval() time.Duration {
return p.interval
}

0 comments on commit 6943417

Please sign in to comment.