Skip to content

Commit

Permalink
[receiver/prometheusremotewrite] Parse labels into resource attributes
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Oct 7, 2024
1 parent ac68ba0 commit cb43ca6
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prwreceiver-parselabels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Parse labels from Prometheus Remote Write requests into Resource Attributes

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: []

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Warning - The HTTP Server still pass metrics to the next consumer. It's just a placeholder for now.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
2 changes: 2 additions & 0 deletions receiver/prometheusremotewritereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.0
require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
github.com/prometheus/prometheus v0.54.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.111.0
Expand Down Expand Up @@ -46,6 +47,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions receiver/prometheusremotewritereceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 34 additions & 2 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/gogo/protobuf/proto"
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
promRemote "github.com/prometheus/prometheus/storage/remote"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -153,6 +154,37 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promCo
// translateV2 translates a v2 remote-write request into OTLP metrics.
// For now translateV2 is not implemented and returns an empty metrics.
// nolint
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promRemote.WriteResponseStats, int, error) {
return pmetric.NewMetrics(), promRemote.WriteResponseStats{}, 0, nil
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promRemote.WriteResponseStats, int, error) {
var (
badRequestErrors []error
otelMetrics = pmetric.NewMetrics()
b = labels.NewScratchBuilder(0)
stats = promRemote.WriteResponseStats{}
)

resourceMetrics := otelMetrics.ResourceMetrics().AppendEmpty()

for _, ts := range req.Timeseries {
ls := ts.ToLabels(&b, req.Symbols)

if !ls.Has(labels.MetricName) {
badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels"))
continue
} else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate {
badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel))
continue
}

ls = ls.DropMetricName()
for _, label := range ls {
resourceMetrics.Resource().Attributes().PutStr(label.Name, label.Value)
}

}

var statusCode int
if len(badRequestErrors) > 0 {
statusCode = http.StatusBadRequest
}
return otelMetrics, stats, statusCode, errors.Join(badRequestErrors...)
}
129 changes: 128 additions & 1 deletion receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,63 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func setupServer(t *testing.T) {
var (
testHistogram = histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 0,
Sum: 20,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{-1},
}

writeV2RequestFixture = &writev2.Request{
Symbols: []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE, // writeV2RequestSeries1Metadata.Type.

HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
},
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first.
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries2Metadata.Type.

HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help.
// No unit.
},
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
},
},
}
)

func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
t.Helper()

factory := NewFactory()
Expand All @@ -30,6 +78,13 @@ func setupServer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")

return prwReceiver.(*prometheusRemoteWriteReceiver)
}

func setupServer(t *testing.T) {
t.Helper()

prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

Expand Down Expand Up @@ -98,3 +153,75 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
})
}
}

func TestTranslateV2(t *testing.T) {
prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

for _, tc := range []struct {
name string
request *writev2.Request
expectError string
expectedMetrics pmetric.Metrics
expectedStats remote.WriteResponseStats
expectedCode int
}{
{
name: "missing metric name",
request: &writev2.Request{
Symbols: []string{"", "foo", "bar"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: "missing metric name in labels",
expectedCode: http.StatusBadRequest,
},
{
name: "duplicate label",
request: &writev2.Request{
Symbols: []string{"", "__name__", "test"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: `duplicate label "__name__" in labels`,
expectedCode: http.StatusBadRequest,
},
{
name: "valid request",
request: writeV2RequestFixture,
expectedMetrics: func() pmetric.Metrics {
expected := pmetric.NewMetrics()
rmAttributes := expected.ResourceMetrics().AppendEmpty().Resource().Attributes()
rmAttributes.PutStr("b", "c")
rmAttributes.PutStr("baz", "qux")
rmAttributes.PutStr("d", "e")
rmAttributes.PutStr("foo", "bar")
return expected
}(),
expectedStats: remote.WriteResponseStats{},
},
} {
t.Run(tc.name, func(t *testing.T) {
metrics, stats, code, err := prwReceiver.translateV2(ctx, tc.request)
if tc.expectError != "" {
assert.ErrorContains(t, err, tc.expectError)
assert.Equal(t, tc.expectedCode, code)
return
}

assert.NoError(t, err)
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
assert.Equal(t, tc.expectedStats, stats)
assert.Equal(t, tc.expectedCode, code)
})
}
}

0 comments on commit cb43ca6

Please sign in to comment.