Skip to content

Commit

Permalink
Add DroppedAttributeCount field to Otel/STEF schema
Browse files Browse the repository at this point in the history
Resolves #8

Otel/STEF schema changes:

- Renamed "Attributes" multimap to "AttributesList".
- Made "Attributes" a struct, containing 2 fields: a multimap
  of key/values and "DroppedAttributeCount".
- Use the new "Attributes" struct in the "Resource", "Scope", "Span", etc structs.

The schema changes reveal a serialization bug that I had to fix otherwise
the tests were failing.

The bug was in struct's Encode() function, where if a struct was encoded
by reference the struct's modified bits were not correctly reset. The reset
is now done recursively, which is necessary so that the next Encode() correctly
picks up the field modifications.

Also updated testdata files (because the schema is now different) and renamed
the incorrect ".tefz" to ".stefz" extension.
  • Loading branch information
tigrannajaryan committed Jan 24, 2025
1 parent 46aac76 commit 27cba70
Show file tree
Hide file tree
Showing 53 changed files with 1,388 additions and 431 deletions.
8 changes: 4 additions & 4 deletions benchmarks/encodings/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (d *Encoding) FromOTLP(data pmetric.Metrics) (encodings.InMemoryData, error
err := byResource.Iter(
func(scope *oteltef.Scope, byScope *sortedbymetric.ByScope) error {
err := byScope.Iter(
func(attrs *oteltef.Attributes, points *sortedbymetric.Points) error {
func(attrs *oteltef.AttributesList, points *sortedbymetric.Points) error {
for _, value := range *points {
// TODO: histogram support
datums = append(
Expand All @@ -73,11 +73,11 @@ func (d *Encoding) FromOTLP(data pmetric.Metrics) (encodings.InMemoryData, error
Type: uint(metric.Type()),
Flags: uint(metric.AggregationTemporality()),
Resource: Resource{
Attributes: convertAttrs(resource.Attributes()),
Attributes: convertAttrs(resource.Attributes().List()),
SchemaURL: resource.SchemaURL(),
},
Scope: Scope{
Attributes: convertAttrs(scope.Attributes()),
Attributes: convertAttrs(scope.Attributes().List()),
SchemaURL: scope.SchemaURL(),
Name: scope.Name(),
Version: scope.Version(),
Expand Down Expand Up @@ -125,7 +125,7 @@ func (d *Encoding) Encode(data encodings.InMemoryData) ([]byte, error) {
return buf.Bytes(), nil
}

func convertAttrs(attrs *oteltef.Attributes) (r []Attribute) {
func convertAttrs(attrs *oteltef.AttributesList) (r []Attribute) {
for i := 0; i < attrs.Len(); i++ {
attr := attrs.At(i)
r = append(
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/encodings/parquet/parquetz.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (d *EncodingZ) FromOTLP(data pmetric.Metrics) (encodings.InMemoryData, erro
err := byResource.Iter(
func(scope *oteltef.Scope, byScope *sortedbymetric.ByScope) error {
err := byScope.Iter(
func(attrs *oteltef.Attributes, points *sortedbymetric.Points) error {
func(attrs *oteltef.AttributesList, points *sortedbymetric.Points) error {
for _, value := range *points {
datums = append(
datums, DatumZ{
Expand All @@ -195,11 +195,11 @@ func (d *EncodingZ) FromOTLP(data pmetric.Metrics) (encodings.InMemoryData, erro
Type: uint(metric.Type()),
Flags: uint(metric.AggregationTemporality()),
Resource: ResourceZ{
Attributes: convertAttrsZ(resource.Attributes()),
Attributes: convertAttrsZ(resource.Attributes().List()),
SchemaURL: resource.SchemaURL(),
},
Scope: ScopeZ{
Attributes: convertAttrsZ(scope.Attributes()),
Attributes: convertAttrsZ(scope.Attributes().List()),
SchemaURL: scope.SchemaURL(),
Name: scope.Name(),
Version: scope.Version(),
Expand Down Expand Up @@ -246,7 +246,7 @@ func (d *EncodingZ) Encode(data encodings.InMemoryData) ([]byte, error) {
return buf.Bytes(), nil
}

func convertAttrsZ(attrs *oteltef.Attributes) (r []AttributeZ) {
func convertAttrsZ(attrs *oteltef.AttributesList) (r []AttributeZ) {
for i := 0; i < attrs.Len(); i++ {
attr := attrs.At(i)
r = append(
Expand Down
14 changes: 7 additions & 7 deletions benchmarks/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

func TestCopy(t *testing.T) {
files := []string{
"oteldemo-with-histogram.tefz",
"hipstershop.tefz",
"hostandcollectormetrics.tefz",
"astronomyshop.tefz",
"oteldemo-with-histogram.stefz",
"hipstershop.stefz",
"hostandcollectormetrics.stefz",
"astronomyshop.stefz",
}

fmt.Printf(
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestCopy(t *testing.T) {
}

func BenchmarkReadSTEF(b *testing.B) {
tefBytes, err := os.ReadFile("testdata/hipstershop.tefz")
tefBytes, err := os.ReadFile("testdata/hipstershop.stefz")
require.NoError(b, err)

tefSrc, err := oteltef.NewMetricsReader(bytes.NewBuffer(tefBytes))
Expand Down Expand Up @@ -136,7 +136,7 @@ func BenchmarkReadSTEF(b *testing.B) {
}

func BenchmarkReadSTEFZ(b *testing.B) {
tefBytes, err := os.ReadFile("testdata/hipstershop.tefz")
tefBytes, err := os.ReadFile("testdata/hipstershop.stefz")
require.NoError(b, err)

recCount := 0
Expand Down Expand Up @@ -164,7 +164,7 @@ func BenchmarkReadSTEFZ(b *testing.B) {
}

func BenchmarkReadSTEFZWriteSTEF(b *testing.B) {
tefBytes, err := os.ReadFile("testdata/hipstershop.tefz")
tefBytes, err := os.ReadFile("testdata/hipstershop.stefz")
require.NoError(b, err)

recCount := 0
Expand Down
22 changes: 18 additions & 4 deletions benchmarks/size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"log"
"os"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -89,6 +91,14 @@ func TestTracesMultipart(t *testing.T) {
}
}

func replaceExt(fname string, ext string) string {
idx := strings.Index(fname, ".")
if idx > 0 {
return fname[:idx] + ext
}
return fname
}

func TestMetricsSize(t *testing.T) {

dataVariations := []struct {
Expand Down Expand Up @@ -184,10 +194,14 @@ func TestMetricsSize(t *testing.T) {
log.Fatal(err)
}

//if enc, ok := encoding.(*stef.STEFEncoding); ok {
// fname := "testdata/" + dataVariation.generator.GetName() + "." + strings.ToLower(enc.Name())
// os.WriteFile(fname, bodyBytes, 0644)
//}
if enc, ok := encoding.(*stef.STEFEncoding); ok {
// Write STEF file if it does not exist
fname := "testdata/" + replaceExt(dataVariation.generator.GetName(), "."+strings.ToLower(enc.Name()))
_, err = os.Stat(fname)
if err != nil {
os.WriteFile(fname, bodyBytes, 0644)
}
}

zstdedBytes := testutils.CompressZstd(bodyBytes)

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
4 changes: 2 additions & 2 deletions go/otel/manual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestWriterFrameLimit(t *testing.T) {
assert.EqualValues(t, 4, buf.chunkCount)
}

func mapToTef(m map[string]any, out *oteltef.Attributes) {
func mapToTef(m map[string]any, out *oteltef.AttributesList) {
out.EnsureLen(len(m))
i := 0
for k, v := range m {
Expand Down Expand Up @@ -168,7 +168,7 @@ func valueToTef(v any, into *oteltef.AnyValue) {
}
}

func tefToMap(in *oteltef.Attributes) map[string]any {
func tefToMap(in *oteltef.AttributesList) map[string]any {
out := map[string]any{}

for i := 0; i < in.Len(); i++ {
Expand Down
33 changes: 23 additions & 10 deletions go/otel/oteltef.wire.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@
}
]
},
"Attributes": {
"name": "Attributes",
"fields": [
{
"multimap": "AttributesList",
"name": "List"
},
{
"name": "DroppedAttributeCount",
"primitive": 1
}
]
},
"Exemplar": {
"name": "Exemplar",
"fields": [
Expand All @@ -71,7 +84,7 @@
"name": "TraceID"
},
{
"multimap": "Attributes",
"multimap": "AttributesList",
"name": "FilteredAttributes"
}
]
Expand Down Expand Up @@ -144,7 +157,7 @@
"name": "Type"
},
{
"multimap": "Attributes",
"multimap": "AttributesList",
"name": "Metadata"
},
{
Expand Down Expand Up @@ -225,7 +238,7 @@
"name": "Scope"
},
{
"multimap": "Attributes",
"multimap": "AttributesList",
"name": "Attributes"
},
{
Expand All @@ -244,7 +257,7 @@
"name": "SchemaURL"
},
{
"multimap": "Attributes",
"struct": "Attributes",
"name": "Attributes"
}
]
Expand All @@ -269,7 +282,7 @@
"name": "SchemaURL"
},
{
"multimap": "Attributes",
"struct": "Attributes",
"name": "Attributes"
}
]
Expand Down Expand Up @@ -316,7 +329,7 @@
"name": "EndTimeUnixNano"
},
{
"multimap": "Attributes",
"struct": "Attributes",
"name": "Attributes"
},
{
Expand Down Expand Up @@ -358,7 +371,7 @@
"name": "Flags"
},
{
"multimap": "Attributes",
"struct": "Attributes",
"name": "Attributes"
}
]
Expand All @@ -377,7 +390,7 @@
"name": "TimeUnixNano"
},
{
"multimap": "Attributes",
"struct": "Attributes",
"name": "Attributes"
}
]
Expand Down Expand Up @@ -421,8 +434,8 @@
}
},
"multimaps": {
"Attributes": {
"name": "Attributes",
"AttributesList": {
"name": "AttributesList",
"key": {
"type": {
"primitive": 4,
Expand Down
26 changes: 26 additions & 0 deletions go/otel/oteltef/anyvalue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions go/otel/oteltef/anyvaluearray.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 27cba70

Please sign in to comment.