Skip to content

Commit

Permalink
Limit of 0 disables max_bytes_per_tag_values_query (grafana#1447)
Browse files Browse the repository at this point in the history
* Don't enforce tag size limits with 0

Signed-off-by: Joe Elliott <[email protected]>

* Updated docs

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* lint

Signed-off-by: Joe Elliott <[email protected]>

* moved changelog entry

Signed-off-by: Joe Elliott <[email protected]>

* Added a test to confirm an error is returned

Signed-off-by: Joe Elliott <[email protected]>

* consolidated writing logic

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored May 31, 2022
1 parent 33ef70d commit ac5a136
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 81 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
## main / unreleased

* [CHANGE] metrics-generator: Changed added metric label `instance` to `__metrics_gen_instance` to reduce collisions with custom dimensions. [#1439](https://github.com/grafana/tempo/pull/1439) (@joe-elliott)
* [CHANGE] Don't enforce `max_bytes_per_tag_values_query` when set to 0. [#1447](https://github.com/grafana/tempo/pull/1447) (@joe-elliott)
* [FEATURE] metrics-generator: support per-tenant processor configuration [#1434](https://github.com/grafana/tempo/pull/1434) (@kvrhdn)
* [ENHANCEMENT] Added the ability to have a per tenant max search duration. [#1421](https://github.com/grafana/tempo/pull/1421) (@joe-elliott)
* [BUGFIX] Fix nil pointer panic when the trace by id path errors. [1441](https://github.com/grafana/tempo/pull/1441) (@joe-elliott)
* [BUGFIX] Fix nil pointer panic when the trace by id path errors. [#1441](https://github.com/grafana/tempo/pull/1441) (@joe-elliott)

## v1.4.1 / 2022-05-05

Expand Down
1 change: 1 addition & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ overrides:
# to populate the autocomplete dropdown. This limit protects the system from
# tags with high cardinality or large values such as HTTP URLs or SQL queries.
# This override limit is used by the ingester and the querier.
# A value of 0 disables the limit.
[max_bytes_per_tag_values_query: <int> | default = 5000000 (5MB) ]

# Metrics-generator configurations
Expand Down
22 changes: 8 additions & 14 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"fmt"
"sort"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -221,8 +222,6 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
if err != nil {
return nil, err
}
// get limit from override
maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)

kv := &tempofb.KeyValues{}
tagNameBytes := []byte(tagName)
Expand All @@ -243,12 +242,10 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
}

// check if size of values map is within limit after scanning live traces
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
if maxBytesPerTagValuesQuery > 0 && !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", maxBytesPerTagValuesQuery)
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
Expand All @@ -259,12 +256,9 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
}

// check if size of values map is within limit after scanning all blocks
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
if maxBytesPerTagValuesQuery > 0 && !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", maxBytesPerTagValuesQuery)
}

return &tempopb.SearchTagValuesResponse{
Expand Down
209 changes: 152 additions & 57 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"testing"
"time"

Expand All @@ -24,21 +26,6 @@ import (
"github.com/grafana/tempo/tempodb/search"
)

func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) {
for _, meta := range sr.Traces {
parsedTraceID, err := util.HexStringToTraceID(meta.TraceID)
assert.NoError(t, err)

present := false
for _, id := range ids {
if bytes.Equal(parsedTraceID, id) {
present = true
}
}
assert.True(t, present)
}
}

func TestInstanceSearch(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
Expand All @@ -50,45 +37,9 @@ func TestInstanceSearch(t *testing.T) {
i, err := newInstance("fake", limiter, ingester.store, ingester.local)
assert.NoError(t, err, "unexpected error creating new instance")

// This matches the encoding for live traces, since
// we are pushing to the instance directly it must match.
dec := model.MustNewSegmentDecoder(model.CurrentEncoding)

numTraces := 500
searchAnnotatedFractionDenominator := 100
ids := [][]byte{}

// add dummy search data
var tagKey = "foo"
var tagValue = "bar"

for j := 0; j < numTraces; j++ {
id := make([]byte, 16)
rand.Read(id)

testTrace := test.MakeTrace(10, id)
trace.SortTrace(testTrace)
traceBytes, err := dec.PrepareForWrite(testTrace, 0, 0)
require.NoError(t, err)

// annotate just a fraction of traces with search data
var searchData []byte
if j%searchAnnotatedFractionDenominator == 0 {
data := &tempofb.SearchEntryMutable{}
data.TraceID = id
data.AddTag(tagKey, tagValue)
searchData = data.ToBytes()

// these are the only ids we want to test against
ids = append(ids, id)
}

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchData)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}
ids, _ := writeTracesWithSearchData(t, i, tagKey, tagValue, false)

var req = &tempopb.SearchRequest{
Tags: map[string]string{},
Expand All @@ -97,7 +48,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err := i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
// todo: test that returned results are in sorted time order, create order of id's beforehand
checkEqual(t, ids, sr)

Expand All @@ -108,7 +59,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)

// Test after cutting new headblock
Expand All @@ -118,7 +69,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)

// Test after completing a block
Expand All @@ -127,7 +78,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)

err = ingester.stopping(nil)
Expand All @@ -141,10 +92,154 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)
}

func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) {
for _, meta := range sr.Traces {
parsedTraceID, err := util.HexStringToTraceID(meta.TraceID)
assert.NoError(t, err)

present := false
for _, id := range ids {
if bytes.Equal(parsedTraceID, id) {
present = true
}
}
assert.True(t, present)
}
}

func TestInstanceSearchTags(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

tempDir := t.TempDir()

ingester, _, _ := defaultIngester(t, tempDir)
i, err := newInstance("fake", limiter, ingester.store, ingester.local)
assert.NoError(t, err, "unexpected error creating new instance")

// add dummy search data
var tagKey = "foo"
var tagValue = "bar"

_, expectedTagValues := writeTracesWithSearchData(t, i, tagKey, tagValue, true)

userCtx := user.InjectOrgID(context.Background(), "fake")
testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after appending to WAL
err = i.CutCompleteTraces(0, true)
require.NoError(t, err)
assert.Equal(t, int(i.traceCount.Load()), len(i.traces))

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after cutting new headblock
blockID, err := i.CutBlockIfReady(0, 0, true)
require.NoError(t, err)
assert.NotEqual(t, blockID, uuid.Nil)

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after completing a block
err = i.CompleteBlock(blockID)
require.NoError(t, err)

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)
}

// nolint:revive,unparam
func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tagName string, expectedTagValues []string) {
sr, err := i.SearchTags(ctx)
require.NoError(t, err)
srv, err := i.SearchTagValues(ctx, tagName)
require.NoError(t, err)

sort.Strings(srv.TagValues)
assert.Len(t, sr.TagNames, 1)
assert.Equal(t, tagName, sr.TagNames[0])
assert.Equal(t, expectedTagValues, srv.TagValues)
}

// TestInstanceSearchMaxBytesPerTagValuesQueryFails confirms that SearchTagValues returns
// an error if the bytes of the found tag value exceeds the MaxBytesPerTagValuesQuery limit
func TestInstanceSearchMaxBytesPerTagValuesQueryFails(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{
MaxBytesPerTagValuesQuery: 10,
})
assert.NoError(t, err, "unexpected error creating limits")
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

tempDir := t.TempDir()

ingester, _, _ := defaultIngester(t, tempDir)
i, err := newInstance("fake", limiter, ingester.store, ingester.local)
assert.NoError(t, err, "unexpected error creating new instance")

var tagKey = "foo"
var tagValue = "bar"

_, _ = writeTracesWithSearchData(t, i, tagKey, tagValue, true)

userCtx := user.InjectOrgID(context.Background(), "fake")
srv, err := i.SearchTagValues(userCtx, tagKey)
assert.Error(t, err)
assert.Nil(t, srv)
}

// writes traces to the given instance along with search data. returns
// ids expected to be returned from a tag search and strings expected to
// be returned from a tag value search
func writeTracesWithSearchData(t *testing.T, i *instance, tagKey string, tagValue string, postFixValue bool) ([][]byte, []string) {
// This matches the encoding for live traces, since
// we are pushing to the instance directly it must match.
dec := model.MustNewSegmentDecoder(model.CurrentEncoding)

numTraces := 100
searchAnnotatedFractionDenominator := 10
ids := [][]byte{}
expectedTagValues := []string{}

for j := 0; j < numTraces; j++ {
id := make([]byte, 16)
rand.Read(id)

testTrace := test.MakeTrace(10, id)
trace.SortTrace(testTrace)
traceBytes, err := dec.PrepareForWrite(testTrace, 0, 0)
require.NoError(t, err)

// annotate just a fraction of traces with search data
var searchData []byte
if j%searchAnnotatedFractionDenominator == 0 {
tv := tagValue
if postFixValue {
tv = tv + strconv.Itoa(j)
}

data := &tempofb.SearchEntryMutable{}
data.TraceID = id
data.AddTag(tagKey, tv)
searchData = data.ToBytes()

expectedTagValues = append(expectedTagValues, tv)
ids = append(ids, data.TraceID)
}

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchData)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}

return ids, expectedTagValues
}

func TestInstanceSearchNoData(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
Expand Down
4 changes: 2 additions & 2 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ type Limits struct {
// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`

// Querier enforced limits.
// Querier and Ingester enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`

// QueryFrontend enforced limits
MaxSearchDuration model.Duration `yaml:"max_search_duration" json:"max_search_duration"`

// MaxBytesPerTrace is enforced in the Ingester, Compactor, Querier (Search) and Serverless (Search). It
// it not enforce currently when doing a trace by id lookup.
// is not used when doing a trace by id lookup.
MaxBytesPerTrace int `yaml:"max_bytes_per_trace" json:"max_bytes_per_trace"`

// Configuration for overrides, convenient if it goes here.
Expand Down
12 changes: 5 additions & 7 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
return nil, errors.Wrap(err, "error extracting org id in Querier.SearchTagValues")
}

// fetch response size limit for tag-values query
tagValuesLimitBytes := q.limits.MaxBytesPerTagValuesQuery(userID)

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, errors.Wrap(err, "error finding ingesters in Querier.SearchTagValues")
Expand All @@ -375,10 +372,11 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
uniqueMap[v] = struct{}{}
}

if !util.MapSizeWithinLimit(uniqueMap, tagValuesLimitBytes) {
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
// fetch response size limit for tag-values query
tagValuesLimitBytes := q.limits.MaxBytesPerTagValuesQuery(userID)
if tagValuesLimitBytes > 0 && !util.MapSizeWithinLimit(uniqueMap, tagValuesLimitBytes) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", tagValuesLimitBytes)
}

// Final response (sorted)
Expand Down

0 comments on commit ac5a136

Please sign in to comment.