Skip to content

Commit

Permalink
Registry performance (#4261)
Browse files Browse the repository at this point in the history
* Improve gauge performance

* Improve counter performance

* Drop externalLabels from metric interface after all implementations are updated
  • Loading branch information
zalegrala authored Nov 1, 2024
1 parent 013a4ee commit 8cb9728
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 109 deletions.
50 changes: 25 additions & 25 deletions modules/generator/registry/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type counter struct {

onAddSeries func(count uint32) bool
onRemoveSeries func(count uint32)

externalLabels map[string]string
}

type counterSeries struct {
Expand All @@ -31,6 +33,9 @@ type counterSeries struct {
// to the desired value. This avoids Prometheus throwing away the first
// value in the series, due to the transition from null -> x.
firstSeries *atomic.Bool

lb *labels.Builder
baseLabels labels.Labels
}

var (
Expand All @@ -46,7 +51,7 @@ func (co *counterSeries) registerSeenSeries() {
co.firstSeries.Store(false)
}

func newCounter(name string, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32)) *counter {
func newCounter(name string, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), externalLabels map[string]string) *counter {
if onAddSeries == nil {
onAddSeries = func(uint32) bool {
return true
Expand All @@ -61,6 +66,7 @@ func newCounter(name string, onAddSeries func(uint32) bool, onRemoveSeries func(
series: make(map[uint64]*counterSeries),
onAddSeries: onAddSeries,
onRemoveSeries: onRemoveSeries,
externalLabels: externalLabels,
}
}

Expand Down Expand Up @@ -98,11 +104,24 @@ func (c *counter) Inc(labelValueCombo *LabelValueCombo, value float64) {
}

func (c *counter) newSeries(labelValueCombo *LabelValueCombo, value float64) *counterSeries {
// base labels
baseLabels := make(labels.Labels, 0, 1+len(c.externalLabels))

// add external labels
for name, value := range c.externalLabels {
baseLabels = append(baseLabels, labels.Label{Name: name, Value: value})
}

// add metric name
baseLabels = append(baseLabels, labels.Label{Name: labels.MetricName, Value: c.metricName})

return &counterSeries{
labels: labelValueCombo.getLabelPair(),
value: atomic.NewFloat64(value),
lastUpdated: atomic.NewInt64(time.Now().UnixMilli()),
firstSeries: atomic.NewBool(true),
lb: labels.NewBuilder(baseLabels),
baseLabels: baseLabels,
}
}

Expand All @@ -115,37 +134,18 @@ func (c *counter) name() string {
return c.metricName
}

func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, externalLabels map[string]string) (activeSeries int, err error) {
func (c *counter) collectMetrics(appender storage.Appender, timeMs int64) (activeSeries int, err error) {
c.seriesMtx.RLock()
defer c.seriesMtx.RUnlock()

activeSeries = len(c.series)

labelsCount := 0
if activeSeries > 0 && c.series[0] != nil {
labelsCount = len(c.series[0].labels.names)
}

// base labels
baseLabels := make(labels.Labels, 0, 1+len(externalLabels)+labelsCount)

// add external labels
for name, value := range externalLabels {
baseLabels = append(baseLabels, labels.Label{Name: name, Value: value})
}

// add metric name
baseLabels = append(baseLabels, labels.Label{Name: labels.MetricName, Value: c.metricName})

// TODO: avoid allocation on each collection
lb := labels.NewBuilder(baseLabels)

for _, s := range c.series {
lb.Reset(baseLabels)
s.lb.Reset(s.baseLabels)

// set series-specific labels
for i, name := range s.labels.names {
lb.Set(name, s.labels.values[i])
s.lb.Set(name, s.labels.values[i])
}

// If we are about to call Append for the first time on a series, we need
Expand All @@ -155,14 +155,14 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern
// We set the timestamp of the init serie at the end of the previous minute, that way we ensure it ends in a
// different aggregation interval to avoid be downsampled.
endOfLastMinuteMs := getEndOfLastMinuteMs(timeMs)
_, err = appender.Append(0, lb.Labels(), endOfLastMinuteMs, 0)
_, err = appender.Append(0, s.lb.Labels(), endOfLastMinuteMs, 0)
if err != nil {
return
}
s.registerSeenSeries()
}

_, err = appender.Append(0, lb.Labels(), timeMs, s.value.Load())
_, err = appender.Append(0, s.lb.Labels(), timeMs, s.value.Load())
if err != nil {
return
}
Expand Down
38 changes: 19 additions & 19 deletions modules/generator/registry/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Test_counter(t *testing.T) {
return true
}

c := newCounter("my_counter", onAdd, nil)
c := newCounter("my_counter", onAdd, nil, nil)

c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-1"}), 1.0)
c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 2.0)
Expand All @@ -33,7 +33,7 @@ func Test_counter(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 2, expectedSamples, nil)

c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 2.0)
c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-3"}), 3.0)
Expand All @@ -49,7 +49,7 @@ func Test_counter(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-3"}, collectionTimeMs, 3),
}

collectMetricAndAssert(t, c, collectionTimeMs, nil, 3, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 3, expectedSamples, nil)
}

func TestCounterDifferentLabels(t *testing.T) {
Expand All @@ -59,7 +59,7 @@ func TestCounterDifferentLabels(t *testing.T) {
return true
}

c := newCounter("my_counter", onAdd, nil)
c := newCounter("my_counter", onAdd, nil, nil)

c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-1"}), 1.0)
c.Inc(newLabelValueCombo([]string{"another_label"}, []string{"another_value"}), 2.0)
Expand All @@ -74,7 +74,7 @@ func TestCounterDifferentLabels(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "another_label": "another_value"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "another_label": "another_value"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 2, expectedSamples, nil)
}

func Test_counter_cantAdd(t *testing.T) {
Expand All @@ -84,7 +84,7 @@ func Test_counter_cantAdd(t *testing.T) {
return canAdd
}

c := newCounter("my_counter", onAdd, nil)
c := newCounter("my_counter", onAdd, nil, nil)

// allow adding new series
canAdd = true
Expand All @@ -100,7 +100,7 @@ func Test_counter_cantAdd(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 2, expectedSamples, nil)

// block new series - existing series can still be updated
canAdd = false
Expand All @@ -113,7 +113,7 @@ func Test_counter_cantAdd(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 4),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 2, expectedSamples, nil)
}

func Test_counter_removeStaleSeries(t *testing.T) {
Expand All @@ -123,7 +123,7 @@ func Test_counter_removeStaleSeries(t *testing.T) {
removedSeries++
}

c := newCounter("my_counter", nil, onRemove)
c := newCounter("my_counter", nil, onRemove, nil)

timeMs := time.Now().UnixMilli()
c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-1"}), 1.0)
Expand All @@ -141,7 +141,7 @@ func Test_counter_removeStaleSeries(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 2, expectedSamples, nil)

time.Sleep(10 * time.Millisecond)
timeMs = time.Now().UnixMilli()
Expand All @@ -157,11 +157,11 @@ func Test_counter_removeStaleSeries(t *testing.T) {
expectedSamples = []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 4),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 1, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 1, expectedSamples, nil)
}

func Test_counter_externalLabels(t *testing.T) {
c := newCounter("my_counter", nil, nil)
c := newCounter("my_counter", nil, nil, map[string]string{"external_label": "external_value"})

c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-1"}), 1.0)
c.Inc(newLabelValueCombo([]string{"label"}, []string{"value-2"}), 2.0)
Expand All @@ -174,11 +174,11 @@ func Test_counter_externalLabels(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-2", "external_label": "external_value"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2", "external_label": "external_value"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, map[string]string{"external_label": "external_value"}, 2, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 2, expectedSamples, nil)
}

func Test_counter_concurrencyDataRace(t *testing.T) {
c := newCounter("my_counter", nil, nil)
c := newCounter("my_counter", nil, nil, nil)

end := make(chan struct{})

Expand Down Expand Up @@ -211,7 +211,7 @@ func Test_counter_concurrencyDataRace(t *testing.T) {
})

go accessor(func() {
_, err := c.collectMetrics(&noopAppender{}, 0, nil)
_, err := c.collectMetrics(&noopAppender{}, 0)
assert.NoError(t, err)
})

Expand All @@ -224,7 +224,7 @@ func Test_counter_concurrencyDataRace(t *testing.T) {
}

func Test_counter_concurrencyCorrectness(t *testing.T) {
c := newCounter("my_counter", nil, nil)
c := newCounter("my_counter", nil, nil, nil)

var wg sync.WaitGroup
end := make(chan struct{})
Expand Down Expand Up @@ -258,13 +258,13 @@ func Test_counter_concurrencyCorrectness(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, endOfLastMinuteMs, 0),
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, totalCount.Load()),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 1, expectedSamples, nil)
collectMetricAndAssert(t, c, collectionTimeMs, 1, expectedSamples, nil)
}

func collectMetricAndAssert(t *testing.T, m metric, collectionTimeMs int64, externalLabels map[string]string, expectedActiveSeries int, expectedSamples []sample, expectedExemplars []exemplarSample) {
func collectMetricAndAssert(t *testing.T, m metric, collectionTimeMs int64, expectedActiveSeries int, expectedSamples []sample, expectedExemplars []exemplarSample) {
appender := &capturingAppender{}

activeSeries, err := m.collectMetrics(appender, collectionTimeMs, externalLabels)
activeSeries, err := m.collectMetrics(appender, collectionTimeMs)
assert.NoError(t, err)
assert.Equal(t, expectedActiveSeries, activeSeries)

Expand Down
47 changes: 23 additions & 24 deletions modules/generator/registry/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ type gauge struct {

onAddSeries func(count uint32) bool
onRemoveSeries func(count uint32)

externalLabels map[string]string
}

type gaugeSeries struct {
// labelValueCombo should not be modified after creation
labels LabelPair
value *atomic.Float64
lastUpdated *atomic.Int64
lb *labels.Builder
baseLabels labels.Labels
}

var (
Expand All @@ -43,7 +47,7 @@ const (
set = "set"
)

func newGauge(name string, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32)) *gauge {
func newGauge(name string, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), externalLabels map[string]string) *gauge {
if onAddSeries == nil {
onAddSeries = func(uint32) bool {
return true
Expand All @@ -58,6 +62,7 @@ func newGauge(name string, onAddSeries func(uint32) bool, onRemoveSeries func(co
series: make(map[uint64]*gaugeSeries),
onAddSeries: onAddSeries,
onRemoveSeries: onRemoveSeries,
externalLabels: externalLabels,
}
}

Expand Down Expand Up @@ -107,10 +112,23 @@ func (g *gauge) updateSeries(labelValueCombo *LabelValueCombo, value float64, op
}

func (g *gauge) newSeries(labelValueCombo *LabelValueCombo, value float64) *gaugeSeries {
// base labels
baseLabels := make(labels.Labels, 1+len(g.externalLabels))

// add metric name
baseLabels = append(baseLabels, labels.Label{Name: labels.MetricName, Value: g.metricName})

// add external labels
for name, value := range g.externalLabels {
baseLabels = append(baseLabels, labels.Label{Name: name, Value: value})
}

return &gaugeSeries{
labels: labelValueCombo.getLabelPair(),
value: atomic.NewFloat64(value),
lastUpdated: atomic.NewInt64(time.Now().UnixMilli()),
lb: labels.NewBuilder(baseLabels),
baseLabels: baseLabels,
}
}

Expand All @@ -127,43 +145,24 @@ func (g *gauge) name() string {
return g.metricName
}

func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64, externalLabels map[string]string) (activeSeries int, err error) {
func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64) (activeSeries int, err error) {
g.seriesMtx.RLock()
defer g.seriesMtx.RUnlock()

activeSeries = len(g.series)

labelsCount := 0
if activeSeries > 0 && g.series[0] != nil {
labelsCount = len(g.series[0].labels.names)
}

// base labels
baseLabels := make(labels.Labels, 1+len(externalLabels)+labelsCount)

// add metric name
baseLabels = append(baseLabels, labels.Label{Name: labels.MetricName, Value: g.metricName})

// add external labels
for name, value := range externalLabels {
baseLabels = append(baseLabels, labels.Label{Name: name, Value: value})
}

// TODO: avoid allocation on each collection
lb := labels.NewBuilder(baseLabels)

for _, s := range g.series {
t := time.UnixMilli(timeMs)

// reset labels for every series
lb.Reset(baseLabels)
s.lb.Reset(s.baseLabels)

// set series-specific labels
for i, name := range s.labels.names {
lb.Set(name, s.labels.values[i])
s.lb.Set(name, s.labels.values[i])
}

_, err = appender.Append(0, lb.Labels(), t.UnixMilli(), s.value.Load())
_, err = appender.Append(0, s.lb.Labels(), t.UnixMilli(), s.value.Load())
if err != nil {
return
}
Expand Down
Loading

0 comments on commit 8cb9728

Please sign in to comment.