Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update in-memory trace cache to use LRU instead of ring buffer #1359

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 47 additions & 79 deletions collect/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cache

import (
"math"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/honeycombio/refinery/generics"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
Expand All @@ -21,6 +23,9 @@ type Cache interface {
// GetCacheCapacity returns the number of traces that can be stored in the cache
GetCacheCapacity() int

// GetCacheEntryCount returns the number of traces currently stored in the cache
GetCacheEntryCount() int

// Retrieve and remove all traces which are past their SendBy date.
// Does not check whether they've been sent.
TakeExpiredTraces(now time.Time) []*types.Trace
Expand All @@ -39,12 +44,8 @@ type DefaultInMemCache struct {
Metrics metrics.Metrics
Logger logger.Logger

cache map[string]*types.Trace

// traceBuffer is a circular buffer of currently stored traces
traceBuffer []*types.Trace
// currentIndex is the current location in the circle.
currentIndex int
cache *lru.Cache[string, *types.Trace]
capacity int
}

const DefaultInMemCacheCapacity = 10000
Expand All @@ -67,127 +68,94 @@ func NewInMemCache(
met.Register(metadata)
}

if capacity == 0 {
capacity = DefaultInMemCacheCapacity
// if using the default capacity, allow the cache to grow really large by using math.MaxInt32 (2147483647)
if capacity == DefaultInMemCacheCapacity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we still want to allow customers to configure CacheCapacity anymore. If we do, we need to also support the trace ejection case from the cache

capacity = math.MaxInt32
}
cache, err := lru.New[string, *types.Trace](capacity)
if err != nil {
logger.Error().Logf("Failed to create LRU cache: %s", err)
return nil
}

return &DefaultInMemCache{
Metrics: met,
Logger: logger,
cache: make(map[string]*types.Trace, capacity),
traceBuffer: make([]*types.Trace, capacity),
Metrics: met,
Logger: logger,
cache: cache,
capacity: capacity,
}

}

func (d *DefaultInMemCache) GetCacheCapacity() int {
return len(d.traceBuffer)
return d.capacity
}

// looks for an insertion point by trying the next N slots in the circular buffer
// returns the index of the first empty slot it finds, or the first slot that
// has a trace that has already been sent. If it doesn't find anything, it
// returns the index of the last slot it looked at.
func (d *DefaultInMemCache) findNextInsertionPoint(maxtries int) int {
ip := d.currentIndex
for i := 0; i < maxtries; i++ {
ip++
if ip >= len(d.traceBuffer) {
ip = 0
}
oldTrace := d.traceBuffer[ip]
if oldTrace == nil || oldTrace.Sent {
break
}
}
// we didn't find anything we can overwrite, so we have to kick one out
return ip
func (d *DefaultInMemCache) GetCacheEntryCount() int {
return d.cache.Len()
}

// Set adds the trace to the ring. When the ring wraps around and hits a trace
// that has not been sent, it will try up to 5 times to skip that entry and find
// a slot that is available. If it is unable to do so, it will kick out the
// trace it is overwriting and return that trace. Otherwise returns nil.
func (d *DefaultInMemCache) Set(trace *types.Trace) *types.Trace {

// set retTrace to a trace if it is getting kicked out without having been
// sent. Leave it nil if we're not kicking out an unsent trace.
var retTrace *types.Trace

// we need to dereference the trace ID so skip bad inserts to avoid panic
if trace == nil {
return nil
}

// store the trace
d.cache[trace.TraceID] = trace

// figure out where to put it; try 5 times to find an empty slot
ip := d.findNextInsertionPoint(5)
// make sure we will record the trace in the right place
defer func() { d.currentIndex = ip }()
// expunge the trace at this point in the insertion ring, if necessary
oldTrace := d.traceBuffer[ip]
if oldTrace != nil {
delete(d.cache, oldTrace.TraceID)
if !oldTrace.Sent {
// if it hasn't already been sent,
// record that we're overrunning the buffer
// set retTrace to a trace if it is getting kicked out without having been
// sent. Leave it nil if we're not kicking out an unsent trace.
var retTrace *types.Trace
if d.cache.Len() >= d.capacity {
_, retTrace, _ = d.cache.RemoveOldest()
// if it hasn't already been sent,
// record that we're overrunning the buffer
if !retTrace.Sent {
d.Metrics.Increment("collect_cache_buffer_overrun")
// and return the trace so it can be sent.
retTrace = oldTrace
}
}
// record the trace in the insertion ring
d.traceBuffer[ip] = trace

d.cache.Add(trace.TraceID, trace)
return retTrace
}

func (d *DefaultInMemCache) Get(traceID string) *types.Trace {
return d.cache[traceID]
trace, _ := d.cache.Get(traceID)
return trace
}

// GetAll is not thread safe and should only be used when that's ok
// Returns all non-nil trace entries.
func (d *DefaultInMemCache) GetAll() []*types.Trace {
tmp := make([]*types.Trace, 0, len(d.traceBuffer))
for _, t := range d.traceBuffer {
if t != nil {
tmp = append(tmp, t)
}
}
return tmp
return d.cache.Values()
}

// TakeExpiredTraces should be called to decide which traces are past their expiration time;
// It removes and returns them.
func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time) []*types.Trace {
d.Metrics.Gauge("collect_cache_capacity", float64(len(d.traceBuffer)))
d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache)))
d.Metrics.Gauge("collect_cache_capacity", float64(d.capacity))
d.Metrics.Histogram("collect_cache_entries", float64(d.cache.Len()))

var res []*types.Trace
for i, t := range d.traceBuffer {
if t != nil && now.After(t.SendBy) {
for _, t := range d.cache.Values() {
if now.After(t.SendBy) {
res = append(res, t)
d.traceBuffer[i] = nil
delete(d.cache, t.TraceID)
d.cache.Remove(t.TraceID)
continue
}
break
}
return res
}

// RemoveTraces accepts a set of trace IDs and removes any matching ones from
// the insertion list. This is used in the case of a cache overrun.
func (d *DefaultInMemCache) RemoveTraces(toDelete generics.Set[string]) {
d.Metrics.Gauge("collect_cache_capacity", float64(len(d.traceBuffer)))
d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache)))

for i, t := range d.traceBuffer {
if t != nil {
if toDelete.Contains(t.TraceID) {
d.traceBuffer[i] = nil
delete(d.cache, t.TraceID)
}
}
d.Metrics.Gauge("collect_cache_capacity", float64(d.capacity))
d.Metrics.Histogram("collect_cache_entries", float64(d.cache.Len()))

for _, traceID := range toDelete.Members() {
d.cache.Remove(traceID)
}
}
Loading
Loading