Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffered_metrics: reduce a bit lock contention #289

Merged
merged 5 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions statsd/buffered_metric_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type bufferedMetricContexts struct {
nbContext uint64
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric
newMetric func(string, float64, string, float64) *bufferedMetric

// Each bufferedMetricContexts uses its own random source and random
// lock to prevent goroutines from contending for the lock on the
Expand All @@ -25,11 +25,11 @@ type bufferedMetricContexts struct {
randomLock sync.Mutex
}

func newBufferedContexts(newMetric func(string, float64, string, int64) *bufferedMetric, maxSamples int64) bufferedMetricContexts {
func newBufferedContexts(newMetric func(string, float64, string, int64, float64) *bufferedMetric, maxSamples int64) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: func(name string, value float64, stringTags string) *bufferedMetric {
return newMetric(name, value, stringTags, maxSamples)
newMetric: func(name string, value float64, stringTags string, rate float64) *bufferedMetric {
return newMetric(name, value, stringTags, maxSamples, rate)
},
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
Expand Down Expand Up @@ -57,6 +57,16 @@ func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error {
keepingSample := shouldSample(rate, bc.random, &bc.randomLock)

// If we don't keep the sample, return early. If we do keep the sample
// we end up storing the *first* observed sampling rate in the metric.
// This is the *wrong* behavior but it's the one we had before and the alternative would increase lock contention too
// much with the current code.
// TODO: change this behavior in the future, probably by introducing thread-local storage and lockless stuctures.
// If this code is removed, also remove the observed sampling rate in the metric and fix `bufferedMetric.flushUnsafe()`
if !keepingSample {
return nil
}

context, stringTags := getContextAndTags(name, tags)
var v *bufferedMetric = nil

Expand All @@ -71,7 +81,7 @@ func (bc *bufferedMetricContexts) sample(name string, value float64, tags []stri
v, _ = bc.values[context]
if v == nil {
// If we might keep a sample that we should have skipped, but that should not drastically affect performances.
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.values[context] = bc.newMetric(name, value, stringTags, rate)
// We added a new value, we need to unlock the mutex and quit
bc.mutex.Unlock()
return nil
Expand Down
27 changes: 23 additions & 4 deletions statsd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ type bufferedMetric struct {

// maxSamples is the maximum number of samples we keep in memory
maxSamples int64

// The first observed user-specified sample rate. When specified
// it is used because we don't know better.
specifiedRate float64
}

func (s *bufferedMetric) sample(v float64) {
Expand Down Expand Up @@ -184,18 +188,30 @@ func (s *bufferedMetric) skipSample() {
}

func (s *bufferedMetric) flushUnsafe() metric {
totalSamples := atomic.LoadInt64(&s.totalSamples)
var rate float64

// If the user had a specified rate send it because we don't know better.
// This code should be removed once we can also remove the early return at the top of
// `bufferedMetricContexts.sample`
if s.specifiedRate != 1.0 {
rate = s.specifiedRate
Comment on lines +197 to +198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match comment on specifiedRate. Should we be checking totalSamples in the condition above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the comment, originally I planned to try to do something smarter, but I think it would require more thinking .. so reverting to the previous behavior is a better first step

} else {
rate = float64(s.storedSamples) / float64(totalSamples)
}

return metric{
metricType: s.mtype,
name: s.name,
stags: s.tags,
rate: float64(s.storedSamples) / float64(atomic.LoadInt64(&s.totalSamples)),
rate: rate,
fvalues: s.data[:s.storedSamples],
}
}

type histogramMetric = bufferedMetric

func newHistogramMetric(name string, value float64, stringTags string, maxSamples int64) *histogramMetric {
func newHistogramMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *histogramMetric {
return &histogramMetric{
data: newData(value, maxSamples),
totalSamples: 1,
Expand All @@ -204,12 +220,13 @@ func newHistogramMetric(name string, value float64, stringTags string, maxSample
tags: stringTags,
mtype: histogramAggregated,
maxSamples: maxSamples,
specifiedRate: rate,
}
}

type distributionMetric = bufferedMetric

func newDistributionMetric(name string, value float64, stringTags string, maxSamples int64) *distributionMetric {
func newDistributionMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *distributionMetric {
return &distributionMetric{
data: newData(value, maxSamples),
totalSamples: 1,
Expand All @@ -218,12 +235,13 @@ func newDistributionMetric(name string, value float64, stringTags string, maxSam
tags: stringTags,
mtype: distributionAggregated,
maxSamples: maxSamples,
specifiedRate: rate,
}
}

type timingMetric = bufferedMetric

func newTimingMetric(name string, value float64, stringTags string, maxSamples int64) *timingMetric {
func newTimingMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *timingMetric {
return &timingMetric{
data: newData(value, maxSamples),
totalSamples: 1,
Expand All @@ -232,6 +250,7 @@ func newTimingMetric(name string, value float64, stringTags string, maxSamples i
tags: stringTags,
mtype: timingAggregated,
maxSamples: maxSamples,
specifiedRate: rate,
}
}

Expand Down
18 changes: 9 additions & 9 deletions statsd/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
}

func TestNewHistogramMetric(t *testing.T) {
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0)
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0, 1.0)
assert.Equal(t, s.data, []float64{1.0})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, "tag1,tag2")
assert.Equal(t, s.mtype, histogramAggregated)
}

func TestHistogramMetricSample(t *testing.T) {
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0)
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0, 1.0)
s.sample(123.45)
assert.Equal(t, s.data, []float64{1.0, 123.45})
assert.Equal(t, s.name, "test")
Expand All @@ -149,7 +149,7 @@ func TestHistogramMetricSample(t *testing.T) {
}

func TestFlushUnsafeHistogramMetricSample(t *testing.T) {
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0)
s := newHistogramMetric("test", 1.0, "tag1,tag2", 0, 1.0)
m := s.flushUnsafe()

assert.Equal(t, m.metricType, histogramAggregated)
Expand All @@ -170,15 +170,15 @@ func TestFlushUnsafeHistogramMetricSample(t *testing.T) {
}

func TestNewDistributionMetric(t *testing.T) {
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0)
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0, 1.0)
assert.Equal(t, s.data, []float64{1.0})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, "tag1,tag2")
assert.Equal(t, s.mtype, distributionAggregated)
}

func TestDistributionMetricSample(t *testing.T) {
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0)
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0, 1.0)
s.sample(123.45)
assert.Equal(t, s.data, []float64{1.0, 123.45})
assert.Equal(t, s.name, "test")
Expand All @@ -187,7 +187,7 @@ func TestDistributionMetricSample(t *testing.T) {
}

func TestFlushUnsafeDistributionMetricSample(t *testing.T) {
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0)
s := newDistributionMetric("test", 1.0, "tag1,tag2", 0, 1.0)
m := s.flushUnsafe()

assert.Equal(t, m.metricType, distributionAggregated)
Expand All @@ -208,15 +208,15 @@ func TestFlushUnsafeDistributionMetricSample(t *testing.T) {
}

func TestNewTimingMetric(t *testing.T) {
s := newTimingMetric("test", 1.0, "tag1,tag2", 0)
s := newTimingMetric("test", 1.0, "tag1,tag2", 0, 1.0)
assert.Equal(t, s.data, []float64{1.0})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, "tag1,tag2")
assert.Equal(t, s.mtype, timingAggregated)
}

func TestTimingMetricSample(t *testing.T) {
s := newTimingMetric("test", 1.0, "tag1,tag2", 0)
s := newTimingMetric("test", 1.0, "tag1,tag2", 0, 1.0)
s.sample(123.45)
assert.Equal(t, s.data, []float64{1.0, 123.45})
assert.Equal(t, s.name, "test")
Expand All @@ -225,7 +225,7 @@ func TestTimingMetricSample(t *testing.T) {
}

func TestFlushUnsafeTimingMetricSample(t *testing.T) {
s := newTimingMetric("test", 1.0, "tag1,tag2", 0)
s := newTimingMetric("test", 1.0, "tag1,tag2", 0, 1.0)
m := s.flushUnsafe()

assert.Equal(t, m.metricType, timingAggregated)
Expand Down
2 changes: 2 additions & 0 deletions statsd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ func WithoutClientSideAggregation() Option {

// WithExtendedClientSideAggregation enables client side aggregation for all types. This feature is only compatible with
// Agent's version >=6.25.0 && <7.0.0 or Agent's versions >=7.25.0.
// When enabled, the use of `rate` with distribution is discouraged and `WithMaxSamplesPerContext()` should be used.
// If `rate` is used with different values of `rate` the resulting rate is not guaranteed to be correct.
func WithExtendedClientSideAggregation() Option {
return func(o *Options) error {
o.aggregation = true
Expand Down