From 052ec8b7b2c0873b13e2d9c37c9dc73e1bdeea08 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 17 Mar 2023 09:17:01 -0700 Subject: [PATCH] Add cardinality limits to the Lightstep metrics SDK (#385) --- CHANGELOG.md | 9 +- VERSION | 2 +- go.mod | 6 +- launcher/version.go | 2 +- lightstep/sdk/metric/README.md | 24 + lightstep/sdk/metric/aggregator/aggregator.go | 20 +- lightstep/sdk/metric/config.go | 13 +- lightstep/sdk/metric/example/go.mod | 2 +- .../sdk/metric/internal/asyncstate/async.go | 7 +- .../metric/internal/asyncstate/async_test.go | 8 +- .../sdk/metric/internal/pipeline/overflow.go | 26 ++ .../sdk/metric/internal/syncstate/sync.go | 48 +- .../metric/internal/syncstate/sync_test.go | 258 ++++++++++- lightstep/sdk/metric/internal/test/test.go | 2 +- .../internal/viewstate/base_instrument.go | 39 ++ .../metric/internal/viewstate/collectors.go | 17 +- .../metric/internal/viewstate/errors_test.go | 3 + .../metric/internal/viewstate/viewstate.go | 59 +-- .../internal/viewstate/viewstate_test.go | 428 +++++++++++++++++- lightstep/sdk/metric/provider.go | 16 +- .../sdk/metric/sdkinstrument/performance.go | 25 + lightstep/sdk/metric/view/standard.go | 15 +- lightstep/sdk/metric/view/view.go | 8 +- lightstep/sdk/metric/view/views.go | 35 +- lightstep/sdk/metric/view/views_test.go | 25 +- pipelines/go.mod | 4 +- 26 files changed, 977 insertions(+), 124 deletions(-) create mode 100644 lightstep/sdk/metric/internal/pipeline/overflow.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ed61c76..c2aa73a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,14 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased -## [1.13.4](https://github.com/lightstep/otel-launcher-go/releases/tag/v1.13.3) - 2023-03-01) +## [1.14.0](https://github.com/lightstep/otel-launcher-go/releases/tag/v1.14.0) - 2023-03-17) + +- Support cardinality limits. Synchronous instruments support an + instrument-level cardinality limit; Synchronous and Asynchronous + aggregators support per-view cardinality limits. Performance settings + determine the view-configured defaults. [#385](https://github.com/lightstep/otel-launcher-go/pull/385) + +## [1.13.4](https://github.com/lightstep/otel-launcher-go/releases/tag/v1.13.3) - 2023-03-02) - Minor performance improvement, one less allocation under the lock when fingerprint collisions are being checked. [#407](https://github.com/lightstep/otel-launcher-go/pull/407) diff --git a/VERSION b/VERSION index 80138e71..850e7424 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.13.4 +1.14.0 diff --git a/go.mod b/go.mod index 65f2f44e..05a127ee 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/lightstep/otel-launcher-go go 1.18 require ( - github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.4 - github.com/lightstep/otel-launcher-go/pipelines v1.13.4 + github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.14.0 + github.com/lightstep/otel-launcher-go/pipelines v1.14.0 github.com/sethvargo/go-envconfig v0.8.3 github.com/stretchr/testify v1.8.2 go.opentelemetry.io/otel v1.14.0 @@ -23,7 +23,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/lightstep/go-expohisto v1.0.0 // indirect - github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.13.4 // indirect + github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.14.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/launcher/version.go b/launcher/version.go index 6ef3228d..3489d2f8 100644 --- a/launcher/version.go +++ b/launcher/version.go @@ -14,4 +14,4 @@ package launcher -const version = "1.13.4" +const version = "1.14.0" diff --git a/lightstep/sdk/metric/README.md b/lightstep/sdk/metric/README.md index 6ff71d96..1fe6759e 100644 --- a/lightstep/sdk/metric/README.md +++ b/lightstep/sdk/metric/README.md @@ -147,3 +147,27 @@ Setting this field to 1 means records will be removed from memory after one inactive collection cycle. Setting this field to 0 causes the default value 10 to be used. + +#### InstrumentCardinalityLimit + +Synchronous instruments are implemented using a map of intermediate +state. When this map grows to `InstrumentCardinalityLimit`, new +attribute sets will be replaced by the overflow attribute set, which +is `{ otel.metric.overflow=true }`. This limit is applied to all +instruments regardless of view configuration before attribute filters +are applied. + +For instruments configured with Delta temporality, where it is +possible for the map to shrink, note that the size of this map +includes records maintained due to `InactiveCollectionPeriods`. The +inactivity period should be taken into account when setting +`InstrumentCardinalityLimit` to avoid overflow. + +#### AggregatorCardinalityLimit + +All views maintain a configurable cardinality limit, calculated after +attribute filters are applied. + +When the aggregator's output grows to `AggregatorCardinalityLimit`, +new attribute sets will be replaced by the overflow attribute set, +which is `{ otel.metric.overflow=true }`. diff --git a/lightstep/sdk/metric/aggregator/aggregator.go b/lightstep/sdk/metric/aggregator/aggregator.go index abc56f65..9df98e29 100644 --- a/lightstep/sdk/metric/aggregator/aggregator.go +++ b/lightstep/sdk/metric/aggregator/aggregator.go @@ -75,19 +75,18 @@ type JSONHistogramConfig struct { // JSONConfig supports the configuration for all aggregators in a single struct. type JSONConfig struct { - Histogram JSONHistogramConfig `json:"histogram"` -} - -// ToConfig returns a Config from the fixed-JSON represented. -func (jc JSONConfig) ToConfig() Config { - return Config{ - Histogram: histostruct.NewConfig(histostruct.WithMaxSize(jc.Histogram.MaxSize)), - } + Histogram JSONHistogramConfig `json:"histogram"` + CardinalityLimit uint32 `json:"cardinality_limit"` } // Config supports the configuration for all aggregators in a single struct. type Config struct { + // Histogram configuration, specifically. Histogram histostruct.Config + + // CardinalityLimit limits the number of instances of this + // aggregator in a given view. + CardinalityLimit uint32 } // Valid returns true for valid configurations. @@ -101,6 +100,11 @@ func (c Config) Valid() bool { func (c Config) Validate() (Config, error) { var err error c.Histogram, err = c.Histogram.Validate() + + if c.CardinalityLimit == 0 { + c.CardinalityLimit = sdkinstrument.DefaultAggregatorCardinalityLimit + } + return c, err } diff --git a/lightstep/sdk/metric/config.go b/lightstep/sdk/metric/config.go index 34eaa327..9126c861 100644 --- a/lightstep/sdk/metric/config.go +++ b/lightstep/sdk/metric/config.go @@ -17,7 +17,6 @@ package metric // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/me import ( "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/resource" ) @@ -30,9 +29,9 @@ type config struct { // the i'th reader uses the i'th entry in views. readers []Reader - // views is a slice of *Views instances corresponding with readers. - // the i'th views applies to the i'th reader. - views []*view.Views + // vopts is a slice of []view.Option corresponding with readers. + // the i'th view option list applies to the i'th reader. + vopts [][]view.Option // performance settings performance sdkinstrument.Performance @@ -63,12 +62,8 @@ func WithResource(res *resource.Resource) Option { // a new MeterProvider func WithReader(r Reader, opts ...view.Option) Option { return optionFunction(func(cfg config) config { - v, err := view.Validate(view.New(r.String(), opts...)) - if err != nil { - otel.Handle(err) - } cfg.readers = append(cfg.readers, r) - cfg.views = append(cfg.views, v) + cfg.vopts = append(cfg.vopts, opts) return cfg }) } diff --git a/lightstep/sdk/metric/example/go.mod b/lightstep/sdk/metric/example/go.mod index fa32e681..107c5051 100644 --- a/lightstep/sdk/metric/example/go.mod +++ b/lightstep/sdk/metric/example/go.mod @@ -3,7 +3,7 @@ module github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/example go 1.18 require ( - github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.4 + github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.14.0 github.com/lightstep/otel-launcher-go/pipelines v1.8.0 go.opentelemetry.io/proto/otlp v0.19.0 ) diff --git a/lightstep/sdk/metric/internal/asyncstate/async.go b/lightstep/sdk/metric/internal/asyncstate/async.go index 498422e4..4c0ae525 100644 --- a/lightstep/sdk/metric/internal/asyncstate/async.go +++ b/lightstep/sdk/metric/internal/asyncstate/async.go @@ -86,9 +86,10 @@ func New(desc sdkinstrument.Descriptor, _ sdkinstrument.Performance, opaque inte // disabled the instrument. This ensures that certain error // checks still work (wrong meter, wrong callback, etc). // - // Note: performance settings are not used because async - // instruments do not use fingerprinting so IgnoreCollisions is - // meaningless. + // Note: performance settings are not used. + // 1. There is no fingerprinting, so IgnoreCollisions is meaningless. + // 2. InstrumentCardinalityLimit is not enforceable, because of duplicate + // suppression -- better left to the aggregator. return &Observer{ opaque: opaque, descriptor: desc, diff --git a/lightstep/sdk/metric/internal/asyncstate/async_test.go b/lightstep/sdk/metric/internal/asyncstate/async_test.go index 630cdfc1..8aa64f01 100644 --- a/lightstep/sdk/metric/internal/asyncstate/async_test.go +++ b/lightstep/sdk/metric/internal/asyncstate/async_test.go @@ -79,8 +79,8 @@ func (tsdk *testSDK) compile(desc sdkinstrument.Descriptor) pipeline.Register[vi func testAsync(name string, opts ...view.Option) *testSDK { return &testSDK{ compilers: []*viewstate.Compiler{ - viewstate.New(testLibrary, view.New(name, opts...)), - viewstate.New(testLibrary, view.New(name, opts...)), + viewstate.New(testLibrary, view.New(name, ignorePerf, opts...)), + viewstate.New(testLibrary, view.New(name, ignorePerf, opts...)), }, } } @@ -88,8 +88,8 @@ func testAsync(name string, opts ...view.Option) *testSDK { func testAsync2(name string, opts1, opts2 []view.Option) *testSDK { return &testSDK{ compilers: []*viewstate.Compiler{ - viewstate.New(testLibrary, view.New(name, opts1...)), - viewstate.New(testLibrary, view.New(name, opts2...)), + viewstate.New(testLibrary, view.New(name, ignorePerf, opts1...)), + viewstate.New(testLibrary, view.New(name, ignorePerf, opts2...)), }, } } diff --git a/lightstep/sdk/metric/internal/pipeline/overflow.go b/lightstep/sdk/metric/internal/pipeline/overflow.go new file mode 100644 index 00000000..c5a7161b --- /dev/null +++ b/lightstep/sdk/metric/internal/pipeline/overflow.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" + +import "go.opentelemetry.io/otel/attribute" + +// OverflowAttributes is the specified list of attributes to use when +// configured mechanisms overflow a cardinality limit. +var OverflowAttributes = []attribute.KeyValue{ + attribute.Bool("otel.metric.overflow", true), +} + +// OverflowAttributeSet is the set corresponding with OverflowAttributes. +var OverflowAttributeSet = attribute.NewSet(OverflowAttributes...) diff --git a/lightstep/sdk/metric/internal/syncstate/sync.go b/lightstep/sdk/metric/internal/syncstate/sync.go index 59762ede..b26f1658 100644 --- a/lightstep/sdk/metric/internal/syncstate/sync.go +++ b/lightstep/sdk/metric/internal/syncstate/sync.go @@ -30,6 +30,8 @@ import ( "go.opentelemetry.io/otel/metric/instrument" ) +var overflowAttributesFingerprint = fingerprintAttributes(pipeline.OverflowAttributes) + // Instrument maintains a mapping from attribute.Set to an internal // record type for a single API-level instrument. This type is // organized so that a single attribute.Set lookup is performed @@ -129,7 +131,7 @@ func (inst *Observer) SnapshotAndProcess() { } } - // When records are kept, delete the map entry. + // When no records are kept, delete the map entry. if head == nil { delete(inst.current, key) continue @@ -429,11 +431,31 @@ func attributesEqual(a, b []attribute.KeyValue) bool { return true } -// acquireRead acquires the read lock and searches for a `*record`. -func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record { +// acquireRead acquires the lock and searches for a `*record`. +// This returns the overflow attributes and fingerprint in case the +// the cardinality limit is reached. The caller should exchange their +// fp and attrs for the ones returned by this call. +func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) (uint64, []attribute.KeyValue, *record) { inst.lock.RLock() defer inst.lock.RUnlock() + overflow := false + fp, attrs, rec := acquireReadLocked(inst, fp, attrs, &overflow) + + if rec != nil { + return fp, attrs, rec + } + // The overflow signal indicates another call is needed w/ the + // same logic but updated fp and attrs. + if !overflow { + // Otherwise, this is the first appearance of an overflow. + return fp, attrs, nil + } + // In which case fp and attrs are now the overflow attributes. + return acquireReadLocked(inst, fp, attrs, &overflow) +} + +func acquireReadLocked(inst *Observer, fp uint64, attrs []attribute.KeyValue, overflow *bool) (uint64, []attribute.KeyValue, *record) { rec := inst.current[fp] // Potentially test for hash collisions. @@ -447,10 +469,21 @@ func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record if rec != nil && rec.refMapped.ref() { // At this moment it is guaranteed that the // record is in the map and will not be removed. - return rec + return fp, attrs, rec + } + + // Check for overflow after checking for the original + // attribute set. Note this means we are performing + // two map lookups for overflowing attributes and only + // one lookup if the attribute set was preexisting. + if !*overflow && uint32(len(inst.current)) >= inst.performance.InstrumentCardinalityLimit-1 { + // Use the overflow attributes, repeat. + attrs = pipeline.OverflowAttributes + fp = overflowAttributesFingerprint + *overflow = true } - return nil + return fp, attrs, nil } // acquireUninitialized gets or creates a `*record` corresponding to @@ -459,7 +492,10 @@ func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record func acquireUninitialized[N number.Any](inst *Observer, attrs []attribute.KeyValue) *record { fp := fingerprintAttributes(attrs) - if rec := acquireRead(inst, fp, attrs); rec != nil { + // acquireRead may replace fp and attrs when there is overflow. + var rec *record + fp, attrs, rec = acquireRead(inst, fp, attrs) + if rec != nil { return rec } diff --git a/lightstep/sdk/metric/internal/syncstate/sync_test.go b/lightstep/sdk/metric/internal/syncstate/sync_test.go index 2c44d187..a67f5b8a 100644 --- a/lightstep/sdk/metric/internal/syncstate/sync_test.go +++ b/lightstep/sdk/metric/internal/syncstate/sync_test.go @@ -144,7 +144,7 @@ func testSyncStateConcurrencyWithPerf[N number.Any, Traits number.Traits[N]](t * } vcs := make([]*viewstate.Compiler, numReaders) for vci := range vcs { - vcs[vci] = viewstate.New(lib, view.New("test", vopts...)) + vcs[vci] = viewstate.New(lib, view.New("test", safePerf, vopts...)) } attrs := make([]attribute.KeyValue, numAttrs) for i := range attrs { @@ -245,8 +245,8 @@ func TestSyncStatePartialNoopInstrument(t *testing.T) { Name: "testlib", } vcs := make([]*viewstate.Compiler, 2) - vcs[0] = viewstate.New(lib, view.New("dropper", vopts...)) - vcs[1] = viewstate.New(lib, view.New("keeper")) + vcs[0] = viewstate.New(lib, view.New("dropper", safePerf, vopts...)) + vcs[1] = viewstate.New(lib, view.New("keeper", safePerf)) desc := test.Descriptor("dropme", sdkinstrument.SyncHistogram, number.Float64Kind) @@ -313,8 +313,8 @@ func TestSyncStateFullNoopInstrument(t *testing.T) { Name: "testlib", } vcs := make([]*viewstate.Compiler, 2) - vcs[0] = viewstate.New(lib, view.New("dropper", vopts...)) - vcs[1] = viewstate.New(lib, view.New("keeper", vopts...)) + vcs[0] = viewstate.New(lib, view.New("dropper", safePerf, vopts...)) + vcs[1] = viewstate.New(lib, view.New("keeper", safePerf, vopts...)) desc := test.Descriptor("dropme", sdkinstrument.SyncHistogram, number.Float64Kind) @@ -353,7 +353,7 @@ func TestOutOfRangeValues(t *testing.T) { Name: "testlib", } vcs := make([]*viewstate.Compiler, 1) - vcs[0] = viewstate.New(lib, view.New("test")) + vcs[0] = viewstate.New(lib, view.New("test", safePerf)) pipes := make(pipeline.Register[viewstate.Instrument], 1) pipes[0], _ = vcs[0].Compile(desc) @@ -432,6 +432,7 @@ func TestSyncGaugeDeltaInstrument(t *testing.T) { vcs := make([]*viewstate.Compiler, 2) vcs[0] = viewstate.New(lib, view.New( "test", + safePerf, deltaSelector, view.WithClause( view.WithKeys([]attribute.Key{"A", "C"}), @@ -731,6 +732,7 @@ func TestDuplicateFingerprintSafety(t *testing.T) { vcs := make([]*viewstate.Compiler, 2) vcs[0] = viewstate.New(lib, view.New( "test", + safePerf, deltaSelector, view.WithClause( view.WithKeys([]attribute.Key{fpKey}), @@ -738,6 +740,7 @@ func TestDuplicateFingerprintSafety(t *testing.T) { )) vcs[1] = viewstate.New(lib, view.New( "test", + safePerf, deltaSelector, view.WithClause( view.WithKeys([]attribute.Key{}), @@ -928,6 +931,7 @@ func TestDuplicateFingerprintCollisionIgnored(t *testing.T) { vcs := make([]*viewstate.Compiler, 1) vcs[0] = viewstate.New(lib, view.New( "test", + safePerf, deltaSelector, )) @@ -1025,6 +1029,7 @@ func TestRecordInactivity(t *testing.T) { vcs := make([]*viewstate.Compiler, 1) vcs[0] = viewstate.New(lib, view.New( "test", + safePerf, deltaSelector, )) @@ -1122,3 +1127,244 @@ func TestRecordInactivity(t *testing.T) { }) } } + +func TestCardinalityOverflowCumulative(t *testing.T) { + const total = 300 + const limit = 133 + + ctx := context.Background() + lib := instrumentation.Library{ + Name: "testlib", + } + perf := sdkinstrument.Performance{ + InstrumentCardinalityLimit: limit, + } + vcs := make([]*viewstate.Compiler, 1) + vcs[0] = viewstate.New(lib, view.New("test", perf)) + + desc := test.Descriptor("c", sdkinstrument.SyncCounter, number.Float64Kind) + + pipes := make(pipeline.Register[viewstate.Instrument], 1) + pipes[0], _ = vcs[0].Compile(desc) + + inst := New(desc, perf, nil, pipes) + require.NotNil(t, inst) + + var expectPoints []data.Point + var oflow int + + for i := 0; i < total; i++ { + inst.ObserveFloat64(ctx, 1, attribute.Int("i", i)) + + if i < int(perf.InstrumentCardinalityLimit)-1 { + expectPoints = append(expectPoints, test.Point( + startTime, endTime, + sum.NewMonotonicFloat64(1), + aggregation.CumulativeTemporality, + attribute.Int("i", i), + )) + } else { + oflow++ + } + } + require.Equal(t, total-limit+1, oflow) + expectPoints = append(expectPoints, test.Point( + startTime, endTime, + sum.NewMonotonicFloat64(float64(oflow)), + aggregation.CumulativeTemporality, + attribute.Bool("otel.metric.overflow", true), + )) + + inst.SnapshotAndProcess() + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + expectPoints..., + ), + ) + + // Repeat with all new attributes; since this is cumulative, + // the original set is retained an this goes entirely to overflow. + for i := 0; i < total; i++ { + inst.ObserveFloat64(ctx, 1, attribute.Int("i", total+i)) + oflow++ + } + // Replace the overflow value + expectPoints = expectPoints[:len(expectPoints)-1] + expectPoints = append(expectPoints, test.Point( + startTime, endTime, + sum.NewMonotonicFloat64(float64(oflow)), + aggregation.CumulativeTemporality, + attribute.Bool("otel.metric.overflow", true), + )) + + inst.SnapshotAndProcess() + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + expectPoints..., + ), + ) +} + +func TestCardinalityOverflowAvoidedDelta(t *testing.T) { + const limit = 100 // has to be even + + ctx := context.Background() + lib := instrumentation.Library{ + Name: "testlib", + } + perf := sdkinstrument.Performance{ + InstrumentCardinalityLimit: limit, + InactiveCollectionPeriods: 1, + } + vcs := make([]*viewstate.Compiler, 1) + vcs[0] = viewstate.New(lib, view.New("test", perf, deltaSelector)) + + desc := test.Descriptor("c", sdkinstrument.SyncUpDownCounter, number.Int64Kind) + + pipes := make(pipeline.Register[viewstate.Instrument], 1) + pipes[0], _ = vcs[0].Compile(desc) + + inst := New(desc, perf, nil, pipes) + require.NotNil(t, inst) + + uniq := 0 + for reps := 0; reps < 4; reps++ { + + var expectPoints []data.Point + + // we expect to create half the limit less on every + // iteration and never overflow. + for i := 0; i < limit/2-1; i++ { + // attr is unique on every iteration + attr := attribute.String("s", fmt.Sprint(uniq)) + uniq++ + inst.ObserveInt64(ctx, 1, attr) + + expectPoints = append(expectPoints, test.Point( + middleTime, endTime, + sum.NewNonMonotonicInt64(1), + aggregation.DeltaTemporality, + attr, + )) + } + + inst.SnapshotAndProcess() + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + expectPoints..., + ), + ) + } +} + +func TestCardinalityOverflowOscillationDelta(t *testing.T) { + // Note this tests a fairly bad behavior; it can be improved + // upon, but at least this validates the current mechanism. + // If more than half of the available aggregators are not + // re-used an oscillation develops. + // + // TODO improve this logic and replace this test. + const total = 8 + const limit = 4 + + ctx := context.Background() + lib := instrumentation.Library{ + Name: "testlib", + } + perf := sdkinstrument.Performance{ + InstrumentCardinalityLimit: limit, + InactiveCollectionPeriods: 1, + } + vcs := make([]*viewstate.Compiler, 1) + vcs[0] = viewstate.New(lib, view.New("test", perf, deltaSelector)) + + desc := test.Descriptor("c", sdkinstrument.SyncCounter, number.Float64Kind) + + pipes := make(pipeline.Register[viewstate.Instrument], 1) + pipes[0], _ = vcs[0].Compile(desc) + + inst := New(desc, perf, nil, pipes) + require.NotNil(t, inst) + + uniq := 0 + + for reps := 0; reps < 10; reps++ { + var expectPoints []data.Point + var oflow int + + for i := 0; i < total; i++ { + inst.ObserveFloat64(ctx, 1, attribute.Int("uniq", uniq)) + + // Note: on even intervals, the overflow will + // be as we expect, and in odd intervals, the + // entire batch will overflow. + if reps%2 == 0 { + normal := int(perf.InstrumentCardinalityLimit) - 1 + if reps > 0 { + // Except the first round there's one + // overflow element already used so one + // less normal "new" attribute set. + normal-- + } + if i < normal { + expectPoints = append(expectPoints, test.Point( + middleTime, endTime, + sum.NewMonotonicFloat64(1), + aggregation.DeltaTemporality, + attribute.Int("uniq", uniq), + )) + } else { + oflow++ + } + } else { + oflow++ + } + uniq++ + } + expectPoints = append(expectPoints, test.Point( + middleTime, endTime, + sum.NewMonotonicFloat64(float64(oflow)), + aggregation.DeltaTemporality, + attribute.Bool("otel.metric.overflow", true), + )) + + inst.SnapshotAndProcess() + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + expectPoints..., + ), + ) + } +} diff --git a/lightstep/sdk/metric/internal/test/test.go b/lightstep/sdk/metric/internal/test/test.go index 346dc03b..375c13c9 100644 --- a/lightstep/sdk/metric/internal/test/test.go +++ b/lightstep/sdk/metric/internal/test/test.go @@ -101,7 +101,7 @@ func CollectScopeReuse(t *testing.T, collectors []data.Collector, seq data.Seque func RequireEqualPoints(t *testing.T, output []data.Point, expected ...data.Point) { t.Helper() - require.Equal(t, len(output), len(expected), "points have different length") + require.Equal(t, len(expected), len(output), "points have different length") cpy := make([]data.Point, len(expected)) copy(cpy, expected) diff --git a/lightstep/sdk/metric/internal/viewstate/base_instrument.go b/lightstep/sdk/metric/internal/viewstate/base_instrument.go index 9590878b..fdc6beb3 100644 --- a/lightstep/sdk/metric/internal/viewstate/base_instrument.go +++ b/lightstep/sdk/metric/internal/viewstate/base_instrument.go @@ -23,12 +23,17 @@ import ( "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/doevery" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" ) +var overflowAttributeSet = pipeline.OverflowAttributeSet + +var errInternalOverflowError = fmt.Errorf("internal overflow error condition") + // storageHolder is a generic struct for holding one storage and one // auxiliary field. Storage will be one of the aggregators. The // auxiliary type depends on whether synchronous or asynchronous. @@ -139,6 +144,40 @@ func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) getOrCreateEntry(k if has { return entry } + // Special case at one less than the limit -- is there already + // an overflow attribute set? + sz := len(metric.data) + lim := int(metric.acfg.CardinalityLimit) + + if sz == lim { + // Second lookup is required and it *must* succeed or + // there is an internal error condition. + if entry, has = metric.data[overflowAttributeSet]; has { + return entry + } + // The boundary condtions in the branch below ensures + // this won't happen, but fall through to create an + // overflow entry above the limit to avoid panic. + doevery.TimePeriod(time.Minute, func() { + otel.Handle(fmt.Errorf("limit passed %d: %w", len(metric.data), errInternalOverflowError)) + }) + } else if sz == lim-1 { + // If this is not the overflow set, check whether the + // overflow aggregator already exists. If it already + // exists, allow this attribute set to be created, + // otherwise force creation of the overflow attribute + // set. + if kvs != overflowAttributeSet { + if _, overflowed := metric.data[overflowAttributeSet]; !overflowed { + // First overflow event. + kvs = overflowAttributeSet + } + // If not overflowed, the overflow aggregator + // already exists, means we're creating the + // last aggregator until existing attribute + // sets fall out of use. + } + } var methods Methods entry = &storageHolder[Storage, Auxiliary]{} diff --git a/lightstep/sdk/metric/internal/viewstate/collectors.go b/lightstep/sdk/metric/internal/viewstate/collectors.go index 070822bb..97642bdf 100644 --- a/lightstep/sdk/metric/internal/viewstate/collectors.go +++ b/lightstep/sdk/metric/internal/viewstate/collectors.go @@ -20,6 +20,7 @@ import ( "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" "go.opentelemetry.io/otel/attribute" ) @@ -87,7 +88,6 @@ func (p *statelessSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence if numRefs == 0 { delete(p.data, set) } - } } @@ -137,9 +137,14 @@ func (p *statefulAsyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence ioutput := p.appendInstrument(output) + var ofe *storageHolder[Storage, notUsed] for set, entry := range p.data { // Compute the difference. pval, has := p.prior[set] + + if set == pipeline.OverflowAttributeSet { + ofe = entry + } if has { // This does `*pval := *storage - *pval` methods.SubtractSwap(&pval.storage, &entry.storage) @@ -161,4 +166,14 @@ func (p *statefulAsyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence // Copy the current to the prior and reset. p.prior = p.data p.data = map[attribute.Set]*storageHolder[Storage, notUsed]{} + + // Note: the overflow attribute set is synthesized from a + // number of inputs which are presumed cumulative. To maintain this + // illusion, copy its current cumulative value into the next data set. + if ofe != nil { + cpy := &storageHolder[Storage, notUsed]{} + methods.Copy(&ofe.storage, &cpy.storage) + + p.data[pipeline.OverflowAttributeSet] = cpy + } } diff --git a/lightstep/sdk/metric/internal/viewstate/errors_test.go b/lightstep/sdk/metric/internal/viewstate/errors_test.go index 23d63770..eedd9267 100644 --- a/lightstep/sdk/metric/internal/viewstate/errors_test.go +++ b/lightstep/sdk/metric/internal/viewstate/errors_test.go @@ -28,6 +28,8 @@ import ( "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" ) +var safePerf sdkinstrument.Performance + var oneConflict = Conflict{ Semantic: SemanticError{ Instrument: sdkinstrument.SyncCounter, @@ -113,6 +115,7 @@ func TestConflictCombine(t *testing.T) { func TestConflictError(t *testing.T) { views := view.New( "problem", + safePerf, view.WithDefaultAggregationKindSelector(func(k sdkinstrument.Kind) aggregation.Kind { return aggregation.GaugeKind }), diff --git a/lightstep/sdk/metric/internal/viewstate/viewstate.go b/lightstep/sdk/metric/internal/viewstate/viewstate.go index 4164f89f..f4e9ab2c 100644 --- a/lightstep/sdk/metric/internal/viewstate/viewstate.go +++ b/lightstep/sdk/metric/internal/viewstate/viewstate.go @@ -20,6 +20,7 @@ import ( "strings" "sync" + histostruct "github.com/lightstep/go-expohisto/structure" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/gauge" @@ -42,20 +43,20 @@ import ( // Information flows through the Compiler as follows: // // When new instruments are created: -// - The Compiler.Compile() method returns an Instrument value and possible -// duplicate or semantic conflict error. +// - The Compiler.Compile() method returns an Instrument value and possible +// duplicate or semantic conflict error. // // When instruments are used: // - The Instrument.NewAccumulator() method returns an Accumulator value for each attribute.Set used // - The Accumulator.Update() aggregates one value for each measurement. // // During collection: -// - The Accumulator.SnapshotAndProcess() method captures the current value -// and conveys it to the output storage -// - The Compiler.Collectors() interface returns one Collector per output -// Metric in the Meter (duplicate definitions included). -// - The Collector.Collect() method outputs one Point for each attribute.Set -// in the result. +// - The Accumulator.SnapshotAndProcess() method captures the current value +// and conveys it to the output storage +// - The Compiler.Collectors() interface returns one Collector per output +// Metric in the Meter (duplicate definitions included). +// - The Collector.Collect() method outputs one Point for each attribute.Set +// in the result. type Compiler struct { // views is the configuration of this compiler. views *view.Views @@ -177,6 +178,7 @@ type singleBehavior struct { // New returns a compiler for library given configured views. func New(library instrumentation.Library, views *view.Views) *Compiler { + views, _ = view.Validate(views) return &Compiler{ library: library, views: views, @@ -193,24 +195,25 @@ func (v *Compiler) Collectors() []data.Collector { // tryToApplyHint looks for a Lightstep-specified hint structure // encoded as JSON in the description. If valid, returns the modified // configuration, otherwise returns the default for the instrument. -func (v *Compiler) tryToApplyHint(instrument sdkinstrument.Descriptor) (_ sdkinstrument.Descriptor, akind aggregation.Kind, acfg aggregator.Config, hinted bool) { +func (v *Compiler) tryToApplyHint(instrument sdkinstrument.Descriptor) (_ sdkinstrument.Descriptor, akind aggregation.Kind, acfg, defCfg aggregator.Config, hinted bool) { // These are the default behaviors, we'll use them unless there's a valid hint. akind = v.views.Defaults.Aggregation(instrument.Kind) - acfg = v.views.Defaults.AggregationConfig( + defCfg = v.views.Defaults.AggregationConfig( instrument.Kind, instrument.NumberKind, ) + acfg = defCfg // Check for required JSON symbols, empty strings, ... if !strings.Contains(instrument.Description, "{") { - return instrument, akind, acfg, hinted + return instrument, akind, acfg, defCfg, hinted } var hint view.Hint if err := json.Unmarshal([]byte(instrument.Description), &hint); err != nil { // This could be noisy if valid descriptions contain spurious '{' chars. otel.Handle(fmt.Errorf("hint parse error: %w", err)) - return instrument, akind, acfg, hinted + return instrument, akind, acfg, defCfg, hinted } // Replace the hint input with its embedded description. @@ -229,16 +232,19 @@ func (v *Compiler) tryToApplyHint(instrument sdkinstrument.Descriptor) (_ sdkins } } - if hint.Config != (aggregator.JSONConfig{}) { - cfg := hint.Config.ToConfig() + if hint.Config.Histogram.MaxSize != 0 { + cfg := histostruct.NewConfig(histostruct.WithMaxSize(hint.Config.Histogram.MaxSize)) cfg, err := cfg.Validate() if err != nil { - otel.Handle(fmt.Errorf("hint invalid aggregator config: %w", err)) + otel.Handle(err) } - acfg = cfg + acfg.Histogram = cfg + } + if hint.Config.CardinalityLimit != 0 { + acfg.CardinalityLimit = hint.Config.CardinalityLimit } - return instrument, akind, acfg, hinted + return instrument, akind, acfg, defCfg, hinted } // Compile is called during NewInstrument by the Meter @@ -261,7 +267,7 @@ func (v *Compiler) Compile(instrument sdkinstrument.Descriptor) (Instrument, Vie continue } - modified, hintAkind, hintAcfg, hinted := v.tryToApplyHint(instrument) + modified, hintAkind, hintAcfg, defCfg, hinted := v.tryToApplyHint(instrument) instrument = modified // the hint erases itself from the description if akind == aggregation.UndefinedKind { @@ -272,7 +278,7 @@ func (v *Compiler) Compile(instrument sdkinstrument.Descriptor) (Instrument, Vie fromName: instrument.Name, desc: viewDescriptor(instrument, view), kind: akind, - acfg: pickAggConfig(hintAcfg, view.AggregatorConfig()), + acfg: pickAggConfig(hintAcfg, defCfg, view.AggregatorConfig()), tempo: v.views.Defaults.Temporality(instrument.Kind), hinted: hinted, } @@ -287,7 +293,7 @@ func (v *Compiler) Compile(instrument sdkinstrument.Descriptor) (Instrument, Vie // If there were no matching views, set the default aggregation. if len(matches) == 0 { - modified, akind, acfg, hinted := v.tryToApplyHint(instrument) + modified, akind, acfg, _, hinted := v.tryToApplyHint(instrument) instrument = modified // the hint erases itself from the description if akind != aggregation.DropKind { @@ -406,6 +412,7 @@ func newSyncView[ // is being copied before the new object is returned to the // user, and the extra allocation cost here would be // noticeable. + metric := instrumentBase[N, Storage, int64, Methods]{ fromName: behavior.fromName, desc: behavior.desc, @@ -603,13 +610,13 @@ func equalConfigs(a, b aggregator.Config) bool { return a == b } -// pickAggConfig returns the aggregator configuration prescribed by a view clause -// if it is not empty, otherwise the default value. -func pickAggConfig(def, vcfg aggregator.Config) aggregator.Config { - if vcfg != (aggregator.Config{}) { - return vcfg +// pickAggConfig returns the aggregator configuration prescribed by a +// view clause when it not the default value, otherwise the hinted config. +func pickAggConfig(hintCfg, defCfg, viewCfg aggregator.Config) aggregator.Config { + if viewCfg != defCfg { + return viewCfg } - return def + return hintCfg } // checkSemanticCompatibility checks whether an instrument / diff --git a/lightstep/sdk/metric/internal/viewstate/viewstate_test.go b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go index 50ae135f..6f6594ea 100644 --- a/lightstep/sdk/metric/internal/viewstate/viewstate_test.go +++ b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go @@ -31,6 +31,7 @@ import ( "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/minmaxsumcount" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/sum" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/test" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" @@ -144,7 +145,7 @@ func testCollectSequenceReuse(t *testing.T, vc *Compiler, seq data.Sequence, out // TestDeduplicateNoConflict verifies that two identical instruments // have the same collector. func TestDeduplicateNoConflict(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Int64Kind) require.NoError(t, err1) @@ -160,7 +161,7 @@ func TestDeduplicateNoConflict(t *testing.T) { // TestDeduplicateRenameNoConflict verifies that one instrument can be renamed // such that it becomes identical to another, so no conflict. func TestDeduplicateRenameNoConflict(t *testing.T) { - vc := New(testLib, view.New("test", fooToBarView)) + vc := New(testLib, view.New("test", safePerf, fooToBarView)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Int64Kind) require.NoError(t, err1) @@ -176,7 +177,7 @@ func TestDeduplicateRenameNoConflict(t *testing.T) { // TestNoRenameNoConflict verifies that one instrument does not // conflict with another differently-named instrument. func TestNoRenameNoConflict(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Int64Kind) require.NoError(t, err1) @@ -192,7 +193,7 @@ func TestNoRenameNoConflict(t *testing.T) { // TestDuplicateNumberConflict verifies that two same instruments // except different number kind conflict. func TestDuplicateNumberConflict(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Int64Kind) require.NoError(t, err1) @@ -212,7 +213,7 @@ func TestDuplicateNumberConflict(t *testing.T) { // TestDuplicateSyncAsyncConflict verifies that two same instruments // except one synchonous, one asynchronous conflict. func TestDuplicateSyncAsyncConflict(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Float64Kind) require.NoError(t, err1) @@ -229,7 +230,7 @@ func TestDuplicateSyncAsyncConflict(t *testing.T) { // TestDuplicateUnitConflict verifies that two same instruments // except different units conflict. func TestDuplicateUnitConflict(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err1 := testCompileDescUnit(vc, "foo", sdkinstrument.SyncCounter, number.Float64Kind, "", "gal_us") require.NoError(t, err1) @@ -247,7 +248,7 @@ func TestDuplicateUnitConflict(t *testing.T) { // TestDuplicateMonotonicConflict verifies that two same instruments // except different monotonic values. func TestDuplicateMonotonicConflict(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Float64Kind) require.NoError(t, err1) @@ -265,7 +266,7 @@ func TestDuplicateMonotonicConflict(t *testing.T) { // TestDuplicateAggregatorConfigConflict verifies that two same instruments // except different aggregator.Config values. func TestDuplicateAggregatorConfigConflict(t *testing.T) { - vc := New(testLib, view.New("test", fooToBarAltHistView)) + vc := New(testLib, view.New("test", safePerf, fooToBarAltHistView)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncHistogram, number.Float64Kind) require.NoError(t, err1) @@ -287,6 +288,7 @@ func TestDuplicateAggregatorConfigNoConflict(t *testing.T) { t.Run(nk.String(), func(t *testing.T) { views := view.New( "test", + safePerf, view.WithDefaultAggregationConfigSelector( func(_ sdkinstrument.Kind) (int64Config, float64Config aggregator.Config) { if nk == number.Int64Kind { @@ -316,7 +318,7 @@ func TestDuplicateAggregatorConfigNoConflict(t *testing.T) { // TestDuplicateAggregationKindConflict verifies that two instruments // with different aggregation kinds conflict. func TestDuplicateAggregationKindConflict(t *testing.T) { - vc := New(testLib, view.New("test", fooToBarView)) + vc := New(testLib, view.New("test", safePerf, fooToBarView)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncHistogram, number.Int64Kind) require.NoError(t, err1) @@ -335,7 +337,7 @@ func TestDuplicateAggregationKindConflict(t *testing.T) { // instruments with different aggregation kinds do not conflict when // the view drops one of the instruments. func TestDuplicateAggregationKindNoConflict(t *testing.T) { - vc := New(testLib, view.New("test", dropHistInstView)) + vc := New(testLib, view.New("test", safePerf, dropHistInstView)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncHistogram, number.Int64Kind) require.NoError(t, err1) @@ -349,7 +351,7 @@ func TestDuplicateAggregationKindNoConflict(t *testing.T) { // TestDuplicateMultipleConflicts verifies that multiple duplicate // instrument conflicts include sufficient explanatory information. func TestDuplicateMultipleConflicts(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err1 := testCompile(vc, "foo", instrumentKinds[0], number.Float64Kind) require.NoError(t, err1) @@ -380,7 +382,7 @@ func TestDuplicateFilterConflicts(t *testing.T) { fooToBarDifferentFiltersViews, } { t.Run(fmt.Sprint(idx), func(t *testing.T) { - vc := New(testLib, view.New("test", vws...)) + vc := New(testLib, view.New("test", safePerf, vws...)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Int64Kind) require.NoError(t, err1) @@ -400,7 +402,7 @@ func TestDuplicateFilterConflicts(t *testing.T) { // renamed to match another exactly, including filters, they are not // in conflict. func TestDeduplicateSameFilters(t *testing.T) { - vc := New(testLib, view.New("test", fooToBarSameFiltersViews...)) + vc := New(testLib, view.New("test", safePerf, fooToBarSameFiltersViews...)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Int64Kind) require.NoError(t, err1) @@ -415,7 +417,7 @@ func TestDeduplicateSameFilters(t *testing.T) { // TestDuplicatesMergeDescriptor ensures that the longest description string is used. func TestDuplicatesMergeDescriptor(t *testing.T) { - vc := New(testLib, view.New("test", fooToBarSameFiltersViews...)) + vc := New(testLib, view.New("test", safePerf, fooToBarSameFiltersViews...)) inst1, err1 := testCompile(vc, "foo", sdkinstrument.SyncCounter, number.Int64Kind) require.NoError(t, err1) @@ -451,6 +453,7 @@ func TestDuplicatesMergeDescriptor(t *testing.T) { func TestViewDescription(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( view.MatchInstrumentName("foo"), view.WithDescription("something helpful"), @@ -492,7 +495,7 @@ func TestViewDescription(t *testing.T) { // TestKeyFilters verifies that keys are filtred and metrics are // correctly aggregated. func TestKeyFilters(t *testing.T) { - views := view.New("test", + views := view.New("test", safePerf, view.WithClause(view.WithKeys([]attribute.Key{"a", "b"})), ) @@ -532,6 +535,7 @@ func TestKeyFilters(t *testing.T) { func TestTwoViewsOneInt64Instrument(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( view.MatchInstrumentName("foo"), view.WithName("foo_a"), @@ -600,6 +604,7 @@ func TestTwoViewsOneInt64Instrument(t *testing.T) { func TestHistogramTwoAggregations(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( view.MatchInstrumentName("foo"), view.WithName("foo_sum"), @@ -648,6 +653,7 @@ func TestHistogramTwoAggregations(t *testing.T) { func TestAllKeysFilter(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause(view.WithKeys([]attribute.Key{})), ) @@ -682,6 +688,7 @@ func TestAllKeysFilter(t *testing.T) { func TestAnySumAggregation(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause(view.WithAggregation(aggregation.AnySumKind)), ) @@ -746,7 +753,7 @@ func TestAnySumAggregation(t *testing.T) { // instrument accumulators keep only the last observed value, while // synchronous instruments correctly snapshotAndProcess them all. func TestDuplicateAsyncMeasurementsIngored(t *testing.T) { - vc := New(testLib, view.New("test")) + vc := New(testLib, view.New("test", safePerf)) inst1, err := testCompile(vc, "async", sdkinstrument.AsyncCounter, number.Float64Kind) require.NoError(t, err) @@ -788,6 +795,7 @@ func TestDuplicateAsyncMeasurementsIngored(t *testing.T) { func TestCumulativeTemporality(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( // Dropping all keys view.WithKeys([]attribute.Key{}), @@ -841,6 +849,7 @@ func TestCumulativeTemporality(t *testing.T) { func TestDeltaTemporalityCounter(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( // Dropping all keys view.WithKeys([]attribute.Key{}), @@ -902,6 +911,7 @@ func TestDeltaTemporalityCounter(t *testing.T) { func TestDeltaTemporalityAsyncCounter(t *testing.T) { views := view.New( "test", + safePerf, view.WithDefaultAggregationTemporalitySelector(view.DeltaPreferredTemporality), ) @@ -1008,6 +1018,7 @@ func TestDeltaTemporalityAsyncCounter(t *testing.T) { func TestDeltaTemporalityAsyncGauge(t *testing.T) { views := view.New( "test", + safePerf, view.WithDefaultAggregationTemporalitySelector(view.DeltaPreferredTemporality), ) @@ -1095,6 +1106,7 @@ func TestDeltaTemporalityAsyncGauge(t *testing.T) { func TestDeltaTemporalitySyncGauge(t *testing.T) { views := view.New( "test", + safePerf, view.WithDefaultAggregationTemporalitySelector( func(ik sdkinstrument.Kind) aggregation.Temporality { return aggregation.DeltaTemporality @@ -1255,6 +1267,7 @@ func TestDeltaTemporalitySyncGauge(t *testing.T) { func TestSyncDeltaTemporalityCounter(t *testing.T) { views := view.New( "test", + safePerf, view.WithDefaultAggregationTemporalitySelector( func(ik sdkinstrument.Kind) aggregation.Temporality { return aggregation.DeltaTemporality // Always delta @@ -1359,6 +1372,7 @@ func TestSyncDeltaTemporalityCounter(t *testing.T) { func TestSyncDeltaTemporalityMapDeletion(t *testing.T) { views := view.New( "test", + safePerf, view.WithDefaultAggregationTemporalitySelector( func(ik sdkinstrument.Kind) aggregation.Temporality { return aggregation.DeltaTemporality // Always delta @@ -1407,12 +1421,12 @@ func TestSyncDeltaTemporalityMapDeletion(t *testing.T) { ) require.Equal(t, 0, len(inst.(*statelessSyncInstrument[float64, sum.MonotonicFloat64, sum.MonotonicFloat64Methods]).data)) - } func TestRegexpMatch(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( view.MatchInstrumentNameRegexp(regexp.MustCompile(".*_rate")), view.WithAggregation(aggregation.DropKind), @@ -1436,6 +1450,7 @@ func TestRegexpMatch(t *testing.T) { func TestSingleInstrumentWarning(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( view.MatchInstrumentNameRegexp(regexp.MustCompile(".*_rate")), view.WithName("fixed"), @@ -1450,6 +1465,7 @@ func TestSingleInstrumentWarning(t *testing.T) { func TestDeltaTemporalityMinMaxSumCount(t *testing.T) { views := view.New( "test", + safePerf, view.WithClause( view.MatchInstrumentKind(sdkinstrument.SyncHistogram), view.WithAggregation(aggregation.MinMaxSumCountKind), @@ -1505,7 +1521,7 @@ func TestDeltaTemporalityMinMaxSumCount(t *testing.T) { } func TestViewHints(t *testing.T) { - views := view.New("test") + views := view.New("test", safePerf) vc := New(testLib, views) otelErrs := test.OTelErrors() @@ -1589,7 +1605,7 @@ func TestViewHints(t *testing.T) { } func TestViewHintErrors(t *testing.T) { - views := view.New("test") + views := view.New("test", safePerf) vc := New(testLib, views) otelErrs := test.OTelErrors() @@ -1645,11 +1661,11 @@ func TestViewHintErrors(t *testing.T) { require.Contains(t, (*otelErrs)[0].Error(), "invalid character") require.Contains(t, (*otelErrs)[1].Error(), "looking for beginning") require.Contains(t, (*otelErrs)[2].Error(), "invalid aggregation") - require.Contains(t, (*otelErrs)[3].Error(), "invalid aggregator config") + require.Contains(t, (*otelErrs)[3].Error(), "invalid histogram size: -3") } func TestViewHintNoOverrideEmpty(t *testing.T) { - views := view.New("test", + views := view.New("test", safePerf, view.WithDefaultAggregationConfigSelector( func(_ sdkinstrument.Kind) (int64Config, float64Config aggregator.Config) { cfg := aggregator.Config{ @@ -1703,7 +1719,7 @@ func TestViewHintNoOverrideEmpty(t *testing.T) { // TestEmptyKeyFilter ensures no empty keys are used (w/o view config). func TestEmptyKeyFilter(t *testing.T) { - views := view.New("test") + views := view.New("test", safePerf) vc := New(testLib, views) @@ -1743,7 +1759,7 @@ func TestEmptyKeyFilter(t *testing.T) { // TestEmptyKeyFilterAndView ensures no empty keys are used (with a view config). func TestEmptyKeyFilterAndView(t *testing.T) { - views := view.New("test", + views := view.New("test", safePerf, view.WithClause( view.WithKeys([]attribute.Key{"a"}), ), @@ -1773,3 +1789,369 @@ func TestEmptyKeyFilterAndView(t *testing.T) { ), ) } + +// TestOverflowSyncCumulative tests that all synchronous cumulative +// respect thier configured cardinality limit and that the limit can +// be set three ways. +func TestOverflowSyncCumulative(t *testing.T) { + const limitA = 10 + const limitB = 20 + const limitC = 30 + views := view.New( + "test", + sdkinstrument.Performance{ + AggregatorCardinalityLimit: limitC, + }, + view.WithClause( + view.MatchInstrumentName("B"), + view.WithAggregatorConfig(aggregator.Config{ + CardinalityLimit: limitB, + }), + ), + ) + views, err := view.Validate(views) + require.NoError(t, err) + + vc := New(testLib, views) + + descA := fmt.Sprintf(`{ + "config": { + "cardinality_limit": %d + } +}`, limitA) + + instA, err := testCompileDescUnit(vc, "A", sdkinstrument.SyncCounter, number.Float64Kind, descA, "") + require.NoError(t, err) + + instB, err := testCompile(vc, "B", sdkinstrument.SyncUpDownCounter, number.Float64Kind) + require.NoError(t, err) + + instC, err := testCompile(vc, "C", sdkinstrument.SyncHistogram, number.Float64Kind) + require.NoError(t, err) + + var expA []data.Point + var expB []data.Point + var expC []data.Point + var oflowA, oflowB, oflowC float64 + + for i := 0; i < 1000; i++ { + acc1 := instA.NewAccumulator(attribute.NewSet(attribute.Int("a", i))) + acc1.(Updater[float64]).Update(1) + acc1.SnapshotAndProcess(true) + + if i < limitA-1 { + expA = append(expA, + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(1), cumulative, attribute.Int("a", i), + )) + } else { + oflowA++ + } + + acc2 := instB.NewAccumulator(attribute.NewSet(attribute.Int("b", i))) + acc2.(Updater[float64]).Update(1) + acc2.SnapshotAndProcess(true) + + if i < limitB-1 { + expB = append(expB, + test.Point( + startTime, endTime, sum.NewNonMonotonicFloat64(1), cumulative, attribute.Int("b", i), + )) + } else { + oflowB++ + } + + acc3 := instC.NewAccumulator(attribute.NewSet(attribute.Int("c", i))) + acc3.(Updater[float64]).Update(1) + acc3.SnapshotAndProcess(true) + + if i < limitC-1 { + expC = append(expC, + test.Point( + startTime, endTime, histogram.NewFloat64(histogram.Config{}, 1), cumulative, attribute.Int("c", i), + )) + } else { + oflowC++ + } + } + + expA = append(expA, + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(oflowA), cumulative, attribute.Bool("otel.metric.overflow", true), + )) + expB = append(expB, + test.Point( + startTime, endTime, sum.NewNonMonotonicFloat64(oflowB), cumulative, attribute.Bool("otel.metric.overflow", true), + )) + + var many []float64 + for i := 0.0; i < oflowC; i++ { + many = append(many, 1.0) + } + + expC = append(expC, + test.Point( + startTime, endTime, histogram.NewFloat64(histogram.Config{}, many...), cumulative, attribute.Bool("otel.metric.overflow", true), + )) + + test.RequireEqualMetrics( + t, + testCollect(t, vc), + test.Instrument( + test.Descriptor("A", sdkinstrument.SyncCounter, number.Float64Kind), + expA..., + ), + test.Instrument( + test.Descriptor("B", sdkinstrument.SyncUpDownCounter, number.Float64Kind), + expB..., + ), + test.Instrument( + test.Descriptor("C", sdkinstrument.SyncHistogram, number.Float64Kind), + expC..., + ), + ) +} + +// TestOverflowAsync is a cursory test of the cumulative async +// behavior. This is a weak test because the asyncstate package +// uses map iteration, making the results unpredictable. +// +// TODO: Fix the asyncstate behavior in a separate change, then +// strengthen this test. +func TestOverflowAsyncCumulative(t *testing.T) { + const limitA = 10 + const limitB = 20 + const limitC = 30 + const count = 1000 + views := view.New( + "test", + sdkinstrument.Performance{ + AggregatorCardinalityLimit: limitC, + }, + view.WithClause( + view.MatchInstrumentName("B"), + view.WithAggregatorConfig(aggregator.Config{ + CardinalityLimit: limitB, + }), + ), + ) + views, err := view.Validate(views) + require.NoError(t, err) + + vc := New(testLib, views) + + descA := fmt.Sprintf(`{ + "config": { + "cardinality_limit": %d + } +}`, limitA) + + instA, err := testCompileDescUnit(vc, "A", sdkinstrument.AsyncCounter, number.Int64Kind, descA, "") + require.NoError(t, err) + + instB, err := testCompile(vc, "B", sdkinstrument.AsyncUpDownCounter, number.Int64Kind) + require.NoError(t, err) + + instC, err := testCompile(vc, "C", sdkinstrument.AsyncGauge, number.Int64Kind) + require.NoError(t, err) + + for reps := 0; reps < 10; reps++ { + for i := 0; i < count; i++ { + acc1 := instA.NewAccumulator(attribute.NewSet(attribute.Int("a", i))) + acc1.(Updater[int64]).Update(1) + acc1.SnapshotAndProcess(true) + + acc2 := instB.NewAccumulator(attribute.NewSet(attribute.Int("b", i))) + acc2.(Updater[int64]).Update(1) + acc2.SnapshotAndProcess(true) + + acc3 := instC.NewAccumulator(attribute.NewSet(attribute.Int("c", i))) + acc3.(Updater[int64]).Update(1) + acc3.SnapshotAndProcess(true) + } + + collected := testCollect(t, vc) + + require.Equal(t, limitA, len(collected[0].Points)) + require.Equal(t, limitB, len(collected[1].Points)) + require.Equal(t, limitC, len(collected[2].Points)) + + // Each point list has one overflow. + for idx, data := range collected { + oflow := 0 + sum := int64(0) + for _, pt := range data.Points { + if pt.Attributes == pipeline.OverflowAttributeSet { + oflow++ + } + if idx < 2 { + sum += number.ToInt64(pt.Aggregation.(aggregation.Sum).Sum()) + } else { + sum += number.ToInt64(pt.Aggregation.(aggregation.Gauge).Gauge()) + } + } + require.Equal(t, 1, oflow) + if idx < 2 { + require.Equal(t, int64(count), sum) + } else { + require.Equal(t, int64(limitC), sum) + } + } + } +} + +// TestOneViewOverflowsOneDoesNot tests that views can independently +// repair an overflow problem. +func TestOneViewOverflowsOneDoesNot(t *testing.T) { + const limit = 10 + const count = 20 + views := view.New( + "test", + sdkinstrument.Performance{ + AggregatorCardinalityLimit: limit, + }, + view.WithClause( + view.WithName("filtered"), + view.MatchInstrumentName("input"), + view.WithKeys([]attribute.Key{"stable"}), + ), + view.WithClause( + view.WithName("unfiltered"), + view.MatchInstrumentName("input"), + ), + ) + views, err := view.Validate(views) + require.NoError(t, err) + + vc := New(testLib, views) + + inst, err := testCompile(vc, "input", sdkinstrument.SyncCounter, number.Float64Kind) + require.NoError(t, err) + + sattr := attribute.String("stable", "constant") + var expNF []data.Point + + for i := 0; i < count; i++ { + vattr := attribute.Int("varies", i) + acc := inst.NewAccumulator(attribute.NewSet( + sattr, + vattr, + )) + acc.(Updater[float64]).Update(1) + acc.SnapshotAndProcess(true) + + if i < limit-1 { + expNF = append(expNF, + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(1), cumulative, sattr, vattr, + )) + } + } + expNF = append(expNF, + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(count-limit+1), cumulative, attribute.Bool("otel.metric.overflow", true), + )) + + test.RequireEqualMetrics( + t, + testCollect(t, vc), + test.Instrument( + test.Descriptor("filtered", sdkinstrument.SyncCounter, number.Float64Kind), + test.Point( + startTime, endTime, sum.NewMonotonicFloat64(count), cumulative, sattr, + ), + ), + test.Instrument( + test.Descriptor("unfiltered", sdkinstrument.SyncCounter, number.Float64Kind), + expNF..., + ), + ) +} + +// TestInstrumentOverflowCombined tests that the aggregator limit is a +// hard limit even when the instrument-level limit was reached early. +func TestInstrumentOverflowCombined(t *testing.T) { + const aggLimit = 10 + const instLimit = 2 * aggLimit + const count = 5 * instLimit + views := view.New( + "test", + sdkinstrument.Performance{ + InactiveCollectionPeriods: 1, + AggregatorCardinalityLimit: aggLimit, + InstrumentCardinalityLimit: instLimit, + }, + view.WithDefaultAggregationTemporalitySelector(view.DeltaPreferredTemporality), + ) + views, err := view.Validate(views) + require.NoError(t, err) + + vc := New(testLib, views) + + instS, err := testCompile(vc, "S", sdkinstrument.SyncCounter, number.Float64Kind) + require.NoError(t, err) + + instA, err := testCompile(vc, "A", sdkinstrument.AsyncCounter, number.Int64Kind) + require.NoError(t, err) + + totalS := float64(0) + totalA := int64(0) + + for reps := 0; reps < 10; reps++ { + for i := 0; i < count; i++ { + attr := attribute.Int("RCi", reps*count+i) + aset := attribute.NewSet(attr) + + accS := instS.NewAccumulator(aset) + accS.(Updater[float64]).Update(1) + accS.SnapshotAndProcess(true) + + accA := instA.NewAccumulator(aset) + accA.(Updater[int64]).Update(1) + accA.SnapshotAndProcess(true) + } + + // Both experience overflow; neither exceeds its limit + data := testCollect(t, vc) + require.Equal(t, test.Descriptor("S", sdkinstrument.SyncCounter, number.Float64Kind), data[0].Descriptor) + require.Equal(t, test.Descriptor("A", sdkinstrument.AsyncCounter, number.Int64Kind), data[1].Descriptor) + + // As tested in syncstate, there is an oscillation that + // develops when a delta temporality experiences major + // overflow. We're not testing the form of the overflow here, + // just that the sum is correct. + oflowSCnt := 0 + sumS := 0.0 + for _, pt := range data[0].Points { + if pt.Attributes == pipeline.OverflowAttributeSet { + oflowSCnt++ + } + sumS += number.ToFloat64(pt.Aggregation.(aggregation.Sum).Sum()) + } + if reps == 0 { + require.Equal(t, 1, oflowSCnt) + require.Equal(t, float64(count), sumS) + } + totalS += sumS + require.Equal(t, float64((reps+1)*count), totalS, "rep %d", reps) + + // Note that because the attribute set is new every + // time, we expect a new sum in every round equal to count. + // See the special case treatment of overflow in the async + // delta-temporality aggregator. + oflowACnt := 0 + sumA := int64(0) + for _, pt := range data[1].Points { + if pt.Attributes == pipeline.OverflowAttributeSet { + oflowACnt++ + } + sumA += number.ToInt64(pt.Aggregation.(aggregation.Sum).Sum()) + } + if reps == 0 { + require.Equal(t, 1, oflowACnt) + require.Equal(t, int64(count), sumA) + } + totalA += sumA + require.Equal(t, int64((reps+1)*count), totalA, "rep %d", reps) + } +} diff --git a/lightstep/sdk/metric/provider.go b/lightstep/sdk/metric/provider.go index d0f0c044..3b9076d3 100644 --- a/lightstep/sdk/metric/provider.go +++ b/lightstep/sdk/metric/provider.go @@ -23,6 +23,8 @@ import ( "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/resource" @@ -38,6 +40,7 @@ type MeterProvider struct { startTime time.Time lock sync.Mutex ordered []*meter + views []*view.Views meters map[instrumentation.Library]*meter } @@ -60,13 +63,22 @@ func NewMeterProvider(options ...Option) *MeterProvider { cfg = option.apply(cfg) } cfg.performance = cfg.performance.Validate() + p := &MeterProvider{ cfg: cfg, startTime: time.Now(), meters: map[instrumentation.Library]*meter{}, } for pipe := 0; pipe < len(cfg.readers); pipe++ { - cfg.readers[pipe].Register(p.producerFor(pipe)) + r := cfg.readers[pipe] + + v, err := view.Validate(view.New(r.String(), cfg.performance, cfg.vopts[pipe]...)) + if err != nil { + otel.Handle(err) + } + p.views = append(p.views, v) + + r.Register(p.producerFor(pipe)) } return p } @@ -106,7 +118,7 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri compilers: pipeline.NewRegister[*viewstate.Compiler](len(mp.cfg.readers)), } for pipe := range m.compilers { - m.compilers[pipe] = viewstate.New(lib, mp.cfg.views[pipe]) + m.compilers[pipe] = viewstate.New(lib, mp.views[pipe]) } mp.ordered = append(mp.ordered, m) mp.meters[lib] = m diff --git a/lightstep/sdk/metric/sdkinstrument/performance.go b/lightstep/sdk/metric/sdkinstrument/performance.go index e37495c0..5ec397c8 100644 --- a/lightstep/sdk/metric/sdkinstrument/performance.go +++ b/lightstep/sdk/metric/sdkinstrument/performance.go @@ -18,6 +18,15 @@ package sdkinstrument // delay before removing records from memory. const DefaultInactiveCollectionPeriods = 10 +// DefaultAggregatorCardinalityLimit is a hard limit on the number of +// aggregators that can be emitted in a single period. +const DefaultAggregatorCardinalityLimit = 2000 + +// DefaultInstrumentCardinalityLimit is a hard limit on the number of +// aggregators that can be accumulated in intermediate state belonging +// to the instrument. +const DefaultInstrumentCardinalityLimit = 3000 + // Performace configures features that allow the user to control // performance. type Performance struct { @@ -30,6 +39,15 @@ type Performance struct { // collection periods having no updates before the record is // removed from memory. InactiveCollectionPeriods uint32 + + // InstrumentCardinalityLimit is the point at which the + // SDK's emergency overflow breaker begins dropping attributes + // to avoid memory buildup at intermediate pipeline stages. + InstrumentCardinalityLimit uint32 + + // AggregatorCardinalityLimit is a hard limit on output + // cardinality for all aggregators in the SDK. + AggregatorCardinalityLimit uint32 } // Validate returns a Performance object with 0 values replaced by @@ -41,5 +59,12 @@ func (p Performance) Validate() Performance { if p.InactiveCollectionPeriods == 0 { p.InactiveCollectionPeriods = DefaultInactiveCollectionPeriods } + if p.InstrumentCardinalityLimit == 0 { + p.InstrumentCardinalityLimit = DefaultInstrumentCardinalityLimit + } + if p.AggregatorCardinalityLimit == 0 { + p.AggregatorCardinalityLimit = DefaultAggregatorCardinalityLimit + } + return p } diff --git a/lightstep/sdk/metric/view/standard.go b/lightstep/sdk/metric/view/standard.go index 6ec43c8e..3b549149 100644 --- a/lightstep/sdk/metric/view/standard.go +++ b/lightstep/sdk/metric/view/standard.go @@ -54,7 +54,16 @@ func DeltaPreferredTemporality(ik sdkinstrument.Kind) aggregation.Temporality { } } -// StandardConfig returns a function that configures two default aggregator.Configs. -func StandardConfig(ik sdkinstrument.Kind) (ints, floats aggregator.Config) { - return aggregator.Config{}, aggregator.Config{} +// StandardConfigForPerformance returns a function that configures two +// default aggregator.Configs using the specified performance +// defaults. +func StandardConfigForPerformance(perf sdkinstrument.Performance) func(ik sdkinstrument.Kind) (ints, floats aggregator.Config) { + perf = perf.Validate() + return func(ik sdkinstrument.Kind) (ints, floats aggregator.Config) { + return aggregator.Config{ + CardinalityLimit: perf.AggregatorCardinalityLimit, + }, aggregator.Config{ + CardinalityLimit: perf.AggregatorCardinalityLimit, + } + } } diff --git a/lightstep/sdk/metric/view/view.go b/lightstep/sdk/metric/view/view.go index 9d6b0c65..0517eeb4 100644 --- a/lightstep/sdk/metric/view/view.go +++ b/lightstep/sdk/metric/view/view.go @@ -108,7 +108,9 @@ func WithDefaultAggregationTemporalitySelector(d aggregation.TemporalitySelector func WithDefaultAggregationConfigSelector(d aggregator.ConfigSelector) Option { return optionFunction(func(cfg Config) Config { for k := sdkinstrument.Kind(0); k < sdkinstrument.NumKinds; k++ { - cfg.Defaults.ByInstrumentKind[k].Int64, cfg.Defaults.ByInstrumentKind[k].Float64 = d(k) + ic, fc := d(k) + cfg.Defaults.ByInstrumentKind[k].Int64 = ic + cfg.Defaults.ByInstrumentKind[k].Float64 = fc } return cfg }) @@ -128,11 +130,11 @@ func (of optionFunction) apply(in Config) Config { } // NewConfig returns a new and configured view Config. -func NewConfig(options ...Option) Config { +func NewConfig(perf sdkinstrument.Performance, options ...Option) Config { standard := []Option{ WithDefaultAggregationKindSelector(StandardAggregationKind), WithDefaultAggregationTemporalitySelector(StandardTemporality), - WithDefaultAggregationConfigSelector(StandardConfig), + WithDefaultAggregationConfigSelector(StandardConfigForPerformance(perf)), } var cfg Config for _, option := range append(standard, options...) { diff --git a/lightstep/sdk/metric/view/views.go b/lightstep/sdk/metric/view/views.go index e06460d4..3f89e7a1 100644 --- a/lightstep/sdk/metric/view/views.go +++ b/lightstep/sdk/metric/view/views.go @@ -30,17 +30,22 @@ type Views struct { // Config is the configuration for these views. Config + + // Performance defaults used in these views. + sdkinstrument.Performance } // New configures the clauses and default settings of a Views. -func New(name string, opts ...Option) *Views { +func New(name string, perf sdkinstrument.Performance, opts ...Option) *Views { + perf = perf.Validate() return &Views{ - Name: name, - Config: NewConfig(opts...), + Name: name, + Config: NewConfig(perf, opts...), + Performance: perf, } } -func checkAggregation(err error, agg *aggregation.Kind, def aggregation.Kind) error { +func (v *Views) checkAggregation(err error, agg *aggregation.Kind, def aggregation.Kind) error { if !agg.Valid() { err = multierr.Append(err, fmt.Errorf("invalid aggregation: %v", *agg)) *agg = def @@ -48,7 +53,7 @@ func checkAggregation(err error, agg *aggregation.Kind, def aggregation.Kind) er return err } -func checkTemporality(err error, tempo *aggregation.Temporality, def aggregation.Temporality) error { +func (v *Views) checkTemporality(err error, tempo *aggregation.Temporality, def aggregation.Temporality) error { if !tempo.Valid() { err = multierr.Append(err, fmt.Errorf("invalid temporality: %v", *tempo)) *tempo = def @@ -56,8 +61,14 @@ func checkTemporality(err error, tempo *aggregation.Temporality, def aggregation return err } -func checkAggConfig(err error, acfg *aggregator.Config) error { +func (v *Views) checkAggConfig(err error, acfg *aggregator.Config) error { var newErr error + // Use performance-specific defaults. + if acfg.CardinalityLimit == 0 { + acfg.CardinalityLimit = v.AggregatorCardinalityLimit + } + // TODO: Use a Performance setting for default histogram size. + // the call to Validate below fills in the hard-coded default. *acfg, newErr = acfg.Validate() if newErr != nil { err = multierr.Append(err, newErr) @@ -86,10 +97,10 @@ func Validate(v *Views) (*Views, error) { for i := range valid.Defaults.ByInstrumentKind { kind := sdkinstrument.Kind(i) - err = checkAggregation(err, &valid.Defaults.ByInstrumentKind[i].Aggregation, StandardAggregationKind(kind)) - err = checkTemporality(err, &valid.Defaults.ByInstrumentKind[i].Temporality, StandardTemporality(kind)) - err = checkAggConfig(err, &valid.Defaults.ByInstrumentKind[i].Int64) - err = checkAggConfig(err, &valid.Defaults.ByInstrumentKind[i].Float64) + err = v.checkAggregation(err, &valid.Defaults.ByInstrumentKind[i].Aggregation, StandardAggregationKind(kind)) + err = v.checkTemporality(err, &valid.Defaults.ByInstrumentKind[i].Temporality, StandardTemporality(kind)) + err = v.checkAggConfig(err, &valid.Defaults.ByInstrumentKind[i].Int64) + err = v.checkAggConfig(err, &valid.Defaults.ByInstrumentKind[i].Float64) } for i := range valid.Clauses { @@ -100,8 +111,8 @@ func Validate(v *Views) (*Views, error) { err = multierr.Append(err, fmt.Errorf("multi-instrument view specifies a single name")) } - err = checkAggregation(err, &clause.aggregation, aggregation.UndefinedKind) - err = checkAggConfig(err, &clause.acfg) + err = v.checkAggregation(err, &clause.aggregation, aggregation.UndefinedKind) + err = v.checkAggConfig(err, &clause.acfg) if clause.instrumentName != "" && clause.instrumentNameRegexp != nil { err = multierr.Append(err, fmt.Errorf("view has instrument name and regexp matches")) diff --git a/lightstep/sdk/metric/view/views_test.go b/lightstep/sdk/metric/view/views_test.go index fec99e6b..6099990f 100644 --- a/lightstep/sdk/metric/view/views_test.go +++ b/lightstep/sdk/metric/view/views_test.go @@ -29,6 +29,8 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" ) +var safePerf sdkinstrument.Performance + func TestClauseMatches(t *testing.T) { re := regexp.MustCompile("s.+") lib0 := instrumentation.Library{ @@ -39,6 +41,7 @@ func TestClauseMatches(t *testing.T) { Version: "vF.G.H", } views := New("test", + sdkinstrument.Performance{}, WithClause(MatchInstrumentName("single")), WithClause(MatchInstrumentNameRegexp(re)), WithClause(MatchInstrumentKind(sdkinstrument.AsyncCounter)), @@ -101,6 +104,9 @@ func TestClauseMatches(t *testing.T) { func TestClauseProperties(t *testing.T) { views := New("test", + sdkinstrument.Performance{ + AggregatorCardinalityLimit: 1777, + }, WithClause(WithName("longname"), MatchInstrumentName("single")), WithClause(WithDescription("very interesting")), WithClause(WithKeys(nil)), @@ -118,11 +124,14 @@ func TestClauseProperties(t *testing.T) { require.Equal(t, []attribute.Key(nil), views.Clauses[2].Keys()) require.Equal(t, []attribute.Key{}, views.Clauses[3].Keys()) require.Equal(t, aggregation.DropKind, views.Clauses[4].Aggregation()) - require.Equal(t, aggregator.Config{Histogram: histogram.NewConfig(histogram.WithMaxSize(177))}, views.Clauses[5].AggregatorConfig()) + require.Equal(t, aggregator.Config{ + Histogram: histogram.NewConfig(histogram.WithMaxSize(177)), + CardinalityLimit: 1777, + }, views.Clauses[5].AggregatorConfig()) } func TestNameAndRegexp(t *testing.T) { - views := New("test", WithClause( + views := New("test", safePerf, WithClause( MatchInstrumentName("yes"), MatchInstrumentNameRegexp(regexp.MustCompile("no")), )) @@ -134,7 +143,7 @@ func TestNameAndRegexp(t *testing.T) { } func TestEmptyKeyString(t *testing.T) { - views := New("test", WithClause( + views := New("test", safePerf, WithClause( WithKeys([]attribute.Key{ attribute.Key(""), }), @@ -147,7 +156,7 @@ func TestEmptyKeyString(t *testing.T) { } func TestSingleNameConflict(t *testing.T) { - views := New("test", WithClause( + views := New("test", safePerf, WithClause( WithName("aha"), )) @@ -158,7 +167,7 @@ func TestSingleNameConflict(t *testing.T) { } func TestStandardTemporality(t *testing.T) { - views := New("test", + views := New("test", safePerf, WithDefaultAggregationTemporalitySelector(StandardTemporality), ) expectStandardTemporality(t, views) @@ -171,7 +180,7 @@ func expectStandardTemporality(t *testing.T, v *Views) { } func TestDeltaPreferredTemporality(t *testing.T) { - views := New("test", + views := New("test", safePerf, WithDefaultAggregationTemporalitySelector(DeltaPreferredTemporality), ) for i := sdkinstrument.Kind(0); i < sdkinstrument.NumKinds; i++ { @@ -185,7 +194,7 @@ func TestDeltaPreferredTemporality(t *testing.T) { } func TestStandardAggregation(t *testing.T) { - views := New("test", + views := New("test", safePerf, WithDefaultAggregationKindSelector(StandardAggregationKind), ) expectStandardAggregation(t, views) @@ -209,7 +218,7 @@ func expectStandardAggregation(t *testing.T, v *Views) { } func TestInvalidViewDefaults(t *testing.T) { - views := New("", + views := New("", safePerf, WithDefaultAggregationKindSelector(func(_ sdkinstrument.Kind) aggregation.Kind { return 1999 }), diff --git a/pipelines/go.mod b/pipelines/go.mod index 1cc0e8c3..21c506f2 100644 --- a/pipelines/go.mod +++ b/pipelines/go.mod @@ -51,8 +51,8 @@ require ( ) require ( - github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.13.4 - github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.4 + github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.14.0 + github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.14.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.35.0 )