Skip to content

Commit

Permalink
Merge pull request #289 from iksaif/corentin.chary/reduce-lock-conten…
Browse files Browse the repository at this point in the history
…tion

buffered_metrics: reduce a bit lock contention
  • Loading branch information
vickenty authored Nov 15, 2023
2 parents e612112 + db7e98b commit 54ec306
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 18 deletions.
20 changes: 15 additions & 5 deletions statsd/buffered_metric_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type bufferedMetricContexts struct {
nbContext uint64
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric
newMetric func(string, float64, string, float64) *bufferedMetric

// Each bufferedMetricContexts uses its own random source and random
// lock to prevent goroutines from contending for the lock on the
Expand All @@ -25,11 +25,11 @@ type bufferedMetricContexts struct {
randomLock sync.Mutex
}

func newBufferedContexts(newMetric func(string, float64, string, int64) *bufferedMetric, maxSamples int64) bufferedMetricContexts {
func newBufferedContexts(newMetric func(string, float64, string, int64, float64) *bufferedMetric, maxSamples int64) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: func(name string, value float64, stringTags string) *bufferedMetric {
return newMetric(name, value, stringTags, maxSamples)
newMetric: func(name string, value float64, stringTags string, rate float64) *bufferedMetric {
return newMetric(name, value, stringTags, maxSamples, rate)
},
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
Expand Down Expand Up @@ -57,6 +57,16 @@ func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error {
keepingSample := shouldSample(rate, bc.random, &bc.randomLock)

// If we don't keep the sample, return early. If we do keep the sample
// we end up storing the *first* observed sampling rate in the metric.
// This is the *wrong* behavior but it's the one we had before and the alternative would increase lock contention too
// much with the current code.
// TODO: change this behavior in the future, probably by introducing thread-local storage and lockless stuctures.
// If this code is removed, also remove the observed sampling rate in the metric and fix `bufferedMetric.flushUnsafe()`
if !keepingSample {
return nil
}

context, stringTags := getContextAndTags(name, tags)
var v *bufferedMetric = nil

Expand All @@ -71,7 +81,7 @@ func (bc *bufferedMetricContexts) sample(name string, value float64, tags []stri
v, _ = bc.values[context]
if v == nil {
// If we might keep a sample that we should have skipped, but that should not drastically affect performances.
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.values[context] = bc.newMetric(name, value, stringTags, rate)
// We added a new value, we need to unlock the mutex and quit
bc.mutex.Unlock()
return nil
Expand Down
27 changes: 23 additions & 4 deletions statsd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ type bufferedMetric struct {

// maxSamples is the maximum number of samples we keep in memory
maxSamples int64

// The first observed user-specified sample rate. When specified
// it is used because we don't know better.
specifiedRate float64
}

func (s *bufferedMetric) sample(v float64) {
Expand Down Expand Up @@ -184,18 +188,30 @@ func (s *bufferedMetric) skipSample() {
}

func (s *bufferedMetric) flushUnsafe() metric {
totalSamples := atomic.LoadInt64(&s.totalSamples)
var rate float64

// If the user had a specified rate send it because we don't know better.
// This code should be removed once we can also remove the early return at the top of
// `bufferedMetricContexts.sample`
if s.specifiedRate != 1.0 {
rate = s.specifiedRate
} else {
rate = float64(s.storedSamples) / float64(totalSamples)
}

return metric{
metricType: s.mtype,
name: s.name,
stags: s.tags,
rate: float64(s.storedSamples) / float64(atomic.LoadInt64(&s.totalSamples)),
rate: rate,
fvalues: s.data[:s.storedSamples],
}
}

type histogramMetric = bufferedMetric

func newHistogramMetric(name string, value float64, stringTags string, maxSamples int64) *histogramMetric {
func newHistogramMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *histogramMetric {
return &histogramMetric{
data: newData(value, maxSamples),
totalSamples: 1,
Expand All @@ -204,12 +220,13 @@ func newHistogramMetric(name string, value float64, stringTags string, maxSample
tags: stringTags,
mtype: histogramAggregated,
maxSamples: maxSamples,
specifiedRate: rate,
}
}

type distributionMetric = bufferedMetric

func newDistributionMetric(name string, value float64, stringTags string, maxSamples int64) *distributionMetric {
func newDistributionMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *distributionMetric {
return &distributionMetric{
data: newData(value, maxSamples),
totalSamples: 1,
Expand All @@ -218,12 +235,13 @@ func newDistributionMetric(name string, value float64, stringTags string, maxSam
tags: stringTags,
mtype: distributionAggregated,
maxSamples: maxSamples,
specifiedRate: rate,
}
}

type timingMetric = bufferedMetric

func newTimingMetric(name string, value float64, stringTags string, maxSamples int64) *timingMetric {
func newTimingMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *timingMetric {
return &timingMetric{
data: newData(value, maxSamples),
totalSamples: 1,
Expand All @@ -232,6 +250,7 @@ func newTimingMetric(name string, value float64, stringTags string, maxSamples i
tags: stringTags,
mtype: timingAggregated,
maxSamples: maxSamples,
specifiedRate: rate,
}
}

Expand Down
18 changes: 9 additions & 9 deletions statsd/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
}

func TestNewHistogramMetric(t *testing.T) {
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0)
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0, 1.0)
assert.Equal(t, s.data, []float64{1.0})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, "tag1,tag2")
assert.Equal(t, s.mtype, histogramAggregated)
}

func TestHistogramMetricSample(t *testing.T) {
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0)
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0, 1.0)
s.sample(123.45)
assert.Equal(t, s.data, []float64{1.0, 123.45})
assert.Equal(t, s.name, "test")
Expand All @@ -149,7 +149,7 @@ func TestHistogramMetricSample(t *testing.T) {
}

func TestFlushUnsafeHistogramMetricSample(t *testing.T) {
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0)
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0, 1.0)
m := s.flushUnsafe()

assert.Equal(t, m.metricType, histogramAggregated)
Expand All @@ -170,15 +170,15 @@ func TestFlushUnsafeHistogramMetricSample(t *testing.T) {
}

func TestNewDistributionMetric(t *testing.T) {
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0)
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0, 1.0)
assert.Equal(t, s.data, []float64{1.0})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, "tag1,tag2")
assert.Equal(t, s.mtype, distributionAggregated)
}

func TestDistributionMetricSample(t *testing.T) {
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0)
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0, 1.0)
s.sample(123.45)
assert.Equal(t, s.data, []float64{1.0, 123.45})
assert.Equal(t, s.name, "test")
Expand All @@ -187,7 +187,7 @@ func TestDistributionMetricSample(t *testing.T) {
}

func TestFlushUnsafeDistributionMetricSample(t *testing.T) {
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0)
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0, 1.0)
m := s.flushUnsafe()

assert.Equal(t, m.metricType, distributionAggregated)
Expand All @@ -208,15 +208,15 @@ func TestFlushUnsafeDistributionMetricSample(t *testing.T) {
}

func TestNewTimingMetric(t *testing.T) {
s := newTimingMetric("test", 1.0, "tag1,tag2", 0)
s := newTimingMetric("test", 1.0, "tag1,tag2", 0, 1.0)
assert.Equal(t, s.data, []float64{1.0})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, "tag1,tag2")
assert.Equal(t, s.mtype, timingAggregated)
}

func TestTimingMetricSample(t *testing.T) {
s := newTimingMetric("test", 1.0, "tag1,tag2", 0)
s := newTimingMetric("test", 1.0, "tag1,tag2", 0, 1.0)
s.sample(123.45)
assert.Equal(t, s.data, []float64{1.0, 123.45})
assert.Equal(t, s.name, "test")
Expand All @@ -225,7 +225,7 @@ func TestTimingMetricSample(t *testing.T) {
}

func TestFlushUnsafeTimingMetricSample(t *testing.T) {
s := newTimingMetric("test", 1.0, "tag1,tag2", 0)
s := newTimingMetric("test", 1.0, "tag1,tag2", 0, 1.0)
m := s.flushUnsafe()

assert.Equal(t, m.metricType, timingAggregated)
Expand Down
2 changes: 2 additions & 0 deletions statsd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ func WithoutClientSideAggregation() Option {

// WithExtendedClientSideAggregation enables client side aggregation for all types. This feature is only compatible with
// Agent's version >=6.25.0 && <7.0.0 or Agent's versions >=7.25.0.
// When enabled, the use of `rate` with distribution is discouraged and `WithMaxSamplesPerContext()` should be used.
// If `rate` is used with different values of `rate` the resulting rate is not guaranteed to be correct.
func WithExtendedClientSideAggregation() Option {
return func(o *Options) error {
o.aggregation = true
Expand Down

0 comments on commit 54ec306

Please sign in to comment.