Skip to content

Commit

Permalink
address todos
Browse files Browse the repository at this point in the history
  • Loading branch information
maciuszek committed Jan 13, 2025
1 parent b1a2def commit 8eb942d
Showing 1 changed file with 25 additions and 38 deletions.
63 changes: 25 additions & 38 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ type StatGenerator interface {
// NewStore returns an Empty store that flushes to Sink passed as an argument.
// Note: the export argument is unused.
func NewStore(sink Sink, _ bool) Store {
return &statStore{sink: sink}
return &statStore{
sink: sink,
conf: GetSettings(), // todo: right now the enviornmnet is being loaded in multiple places and this is inefficient
}
}

// NewDefaultStore returns a Store with a TCP statsd sink, and a running flush timer.
Expand Down Expand Up @@ -330,19 +333,18 @@ func (t *standardTimer) AllocateSpan() Timespan {
}

func (t *standardTimer) CollectedValue() []float64 {
return nil
return nil // since we flush right away nothing will be collected
}

func (t *standardTimer) SampleRate() float64 {
return 0.0 // todo: using zero value of float64. the correct value would be 1.0 given 1 stat, hwoever that 1 stat is never stored, just flushed right away
return 1.0 // metrics which are not sampled have an implicit sample rate 1.0
}

type reservoirTimer struct {
base time.Duration
name string
capacity int
values []float64
fill int // todo: the only purpose of this is to be faster than calculating len(values), is it worht it?
count int
mu sync.Mutex
}
Expand All @@ -359,16 +361,12 @@ func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

// todo: consider edge cases for <
if t.fill < t.capacity {
if t.count < t.capacity {
t.values = append(t.values, value)
} else {
// todo: discarding the oldest value when the reference is full, this can probably be smarter
t.values = append(t.values[1:], value)
t.fill--
t.values = append(t.values[1:], value) // discard the oldest value when the reservoir is full, this can probably be smarter
}

t.fill++
t.count++
}

Expand All @@ -380,14 +378,14 @@ func (t *reservoirTimer) CollectedValue() []float64 {
t.mu.Lock()
defer t.mu.Unlock()

// todo: Return a copy of the values slice to avoid data races
valuesCopy := make([]float64, len(t.values))
copy(valuesCopy, t.values)
return valuesCopy
// return a copy of the values slice to avoid data races
values := make([]float64, len(t.values))
copy(values, t.values)
return values
}

func (t *reservoirTimer) SampleRate() float64 {
return float64(t.fill) / float64(t.count) // todo: is it faster to store these values as float64 instead of converting here
return float64(len(t.values)) / float64(t.count)
}

type timespan struct {
Expand All @@ -408,12 +406,14 @@ func (ts *timespan) CompleteWithDuration(value time.Duration) {
type statStore struct {
counters sync.Map
gauges sync.Map
timers sync.Map // todo: should be control this count, especially for reservoirs we will be storing a lot of these in memory before flushing them
timers sync.Map // todo: idea how memory was managed here before did we just expect these maps to just be replaced after it's filled?

mu sync.RWMutex
statGenerators []StatGenerator

sink Sink

conf Settings
}

var ReservedTagWords = map[string]bool{"asg": true, "az": true, "backend": true, "canary": true, "host": true, "period": true, "region": true, "shard": true, "window": true, "source": true, "project": true, "facet": true, "envoyservice": true}
Expand Down Expand Up @@ -463,22 +463,15 @@ func (s *statStore) Flush() {
return true
})

settings := GetSettings() // todo: move this to some shared memory
// todo: i'm not sure not sure if we need a condition here or there's another way to assume this implicitly but since to my understanding s.timers
// will retain/store data even if it's unused in the case of standardTimer. in any case this should provide some optimization
if settings.isTimerReservoirEnabled() {
s.timers.Range(func(key, v interface{}) bool {
timer := v.(timer)
s.timers.Range(func(key, v interface{}) bool {
if timer, ok := v.(*reservoirTimer); ok {
sampleRate := timer.SampleRate()
// CollectedValue() should be nil unless reservoirTimer
for _, value := range timer.CollectedValue() {
s.sink.FlushAggregatedTimer(key.(string), value, sampleRate)
}

s.timers.Delete(key) // todo: not sure if this cleanup is necessary
return true
})
}
}
return true
})

flushableSink, ok := s.sink.(FlushableSink)
if ok {
Expand Down Expand Up @@ -583,18 +576,12 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
}

var t timer
settings := GetSettings() // todo: move this to some shared memory
if settings.isTimerReservoirEnabled() {
// todo: if s.timers gets to a certain size, we can flush all timers and delete them from the map
// todo: no idea how memory was managed here before did we just expect the map of s.timers to just be replaced after it's filled?

// todo: have defaults defined in a shared location
if s.conf.isTimerReservoirEnabled() {
t = &reservoirTimer{
name: serializedName,
base: base,
capacity: 100,
values: make([]float64, 0, 100),
fill: 0,
capacity: s.conf.TimerReservoirSize,
values: make([]float64, 0, s.conf.TimerReservoirSize),
count: 0,
}
} else {
Expand All @@ -605,7 +592,7 @@ func (s *statStore) newTimer(serializedName string, base time.Duration) timer {
}
}

// todo: why would the timer ever be replaced, will this hurt reservoirs or benefit them? or is it just redundant since we load above?
// todo: do we need special rules to not lose active reservoirs
if v, loaded := s.timers.LoadOrStore(serializedName, t); loaded {
return v.(timer)
}
Expand Down

0 comments on commit 8eb942d

Please sign in to comment.