Skip to content

Commit

Permalink
Traces Cleanup (#4260)
Browse files Browse the repository at this point in the history
* traces cleanup

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Nov 1, 2024
1 parent 44c18cc commit 013a4ee
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#44236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
* [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott)

# v2.6.1

Expand Down
3 changes: 3 additions & 0 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ func (p *Processor) completeBlock() error {
b = firstWalBlock
)

ctx, span := tracer.Start(ctx, "Processor.CompleteBlock")
defer span.End()

iter, err := b.Iterator()
if err != nil {
return err
Expand Down
22 changes: 13 additions & 9 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}

if blockID != uuid.Nil {
level.Info(log.Logger).Log("msg", "head block cut. enqueueing flush op", "userid", instance.instanceID, "block", blockID)
level.Info(log.Logger).Log("msg", "head block cut. enqueueing flush op", "tenant", instance.instanceID, "block", blockID)
// jitter to help when flushing many instances at the same time
// no jitter if immediate (initiated via /flush handler for example)
i.enqueue(&flushOp{
Expand Down Expand Up @@ -211,7 +211,7 @@ func (i *Ingester) flushLoop(j int) {
var err error

if op.kind == opKindComplete {
retry, err = i.handleComplete(op)
retry, err = i.handleComplete(context.Background(), op)
} else {
retry, err = i.handleFlush(context.Background(), op.userID, op.blockID)
}
Expand Down Expand Up @@ -243,7 +243,11 @@ func handleAbandonedOp(op *flushOp) {
"op", op.kind, "block", op.blockID.String(), "attempts", op.attempts)
}

func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) {
func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, err error) {
ctx, sp := tracer.Start(ctx, "ingester.Complete", trace.WithAttributes(attribute.String("tenant", op.userID), attribute.String("blockID", op.blockID.String())))
defer sp.End()
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", op.userID, "block", op.blockID.String())

// No point in proceeding if shutdown has been initiated since
// we won't be able to queue up the next flush op
if i.flushQueues.IsStopped() {
Expand All @@ -252,20 +256,20 @@ func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) {
}

start := time.Now()
level.Info(log.Logger).Log("msg", "completing block", "userid", op.userID, "blockID", op.blockID)
level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
return false, err
}

err = instance.CompleteBlock(op.blockID)
level.Info(log.Logger).Log("msg", "block completed", "userid", op.userID, "blockID", op.blockID, "duration", time.Since(start))
err = instance.CompleteBlock(ctx, op.blockID)
level.Info(log.Logger).Log("msg", "block completed", "tenant", op.userID, "blockID", op.blockID, "duration", time.Since(start))
if err != nil {
handleFailedOp(op, err)

if op.attempts >= maxCompleteAttempts {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS",
"userID", op.userID, "attempts", op.attempts, "block", op.blockID.String())
"tenant", op.userID, "attempts", op.attempts, "block", op.blockID.String())

// Delete WAL and move on
err = instance.ClearCompletingBlock(op.blockID)
Expand Down Expand Up @@ -306,9 +310,9 @@ func withSpan(logger gklog.Logger, sp trace.Span) gklog.Logger {
}

func (i *Ingester) handleFlush(ctx context.Context, userID string, blockID uuid.UUID) (retry bool, err error) {
ctx, sp := tracer.Start(ctx, "flush", trace.WithAttributes(attribute.String("organization", userID), attribute.String("blockID", blockID.String())))
ctx, sp := tracer.Start(ctx, "ingester.Flush", trace.WithAttributes(attribute.String("tenant", userID), attribute.String("blockID", blockID.String())))
defer sp.End()
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "userid", userID, "block", blockID.String())
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", userID, "block", blockID.String())

instance, err := i.getOrCreateInstance(userID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestWalDropsZeroLength(t *testing.T) {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(blockID)
err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestDedicatedColumns(t *testing.T) {
inst.blocksMtx.RUnlock()

// Complete block
err = inst.CompleteBlock(blockID)
err = inst.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

// TODO: This check should be included as part of the read path
Expand Down
4 changes: 1 addition & 3 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes
}

// CompleteBlock moves a completingBlock to a completeBlock. The new completeBlock has the same ID.
func (i *instance) CompleteBlock(blockID uuid.UUID) error {
func (i *instance) CompleteBlock(ctx context.Context, blockID uuid.UUID) error {
i.blocksMtx.Lock()
var completingBlock common.WALBlock
for _, iterBlock := range i.completingBlocks {
Expand All @@ -328,8 +328,6 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error {
return fmt.Errorf("error finding completingBlock")
}

ctx := context.Background()

backendBlock, err := i.writer.CompleteBlockWithBackend(ctx, completingBlock, i.localReader, i.localWriter)
if err != nil {
return fmt.Errorf("error completing wal block with local backend: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (i *instance) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsRequ
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit)
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "tenant", userID, "limit", limit)
}

collected := distinctValues.Strings()
Expand Down Expand Up @@ -382,7 +382,7 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "size", distinctValues.Size())
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "tenant", userID, "limit", limit, "size", distinctValues.Size())
}

return &tempopb.SearchTagValuesResponse{
Expand Down Expand Up @@ -589,7 +589,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
}

if valueCollector.Exceeded() {
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", valueCollector.Size())
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "tenant", userID, "limit", limit, "size", valueCollector.Size())
}

resp := &tempopb.SearchTagValuesV2Response{
Expand Down
14 changes: 7 additions & 7 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestInstanceSearch(t *testing.T) {
checkEqual(t, ids, sr)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

sr, err = i.Search(context.Background(), req)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestInstanceSearchTraceQL(t *testing.T) {
checkEqual(t, ids, sr)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

sr, err = i.Search(context.Background(), req)
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestInstanceSearchWithStartAndEnd(t *testing.T) {
searchAndAssert(req, uint32(100))

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
searchAndAssert(req, uint32(200))

Expand Down Expand Up @@ -267,7 +267,7 @@ func TestInstanceSearchTags(t *testing.T) {
testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestInstanceSearchTagAndValuesV2(t *testing.T) {
testSearchTagsAndValuesV2(t, userCtx, i, tagKey, queryThatMatches, expectedTagValues, expectedEventTagValues, expectedLinkTagValues)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
require.NoError(t, i.ClearCompletingBlock(blockID)) // Clear the completing block

Expand Down Expand Up @@ -681,7 +681,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
// Cut wal, complete, delete wal, then flush
blockID, _ := i.CutBlockIfReady(0, 0, true)
if blockID != uuid.Nil {
err := i.CompleteBlock(blockID)
err := i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
err = i.ClearCompletingBlock(blockID)
require.NoError(t, err)
Expand Down Expand Up @@ -837,7 +837,7 @@ func TestInstanceSearchMetrics(t *testing.T) {
require.Less(t, numBytes, m.InspectedBytes)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
err = i.ClearCompletingBlock(blockID)
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestInstance(t *testing.T) {
require.NoError(t, err, "unexpected error cutting block")
require.NotEqual(t, blockID, uuid.Nil)

err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err, "unexpected error completing block")

block := i.GetBlockToBeFlushed(blockID)
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestInstanceFind(t *testing.T) {

queryAll(t, i, ids, traces)

err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

queryAll(t, i, ids, traces)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestInstanceDoesNotRace(t *testing.T) {
go concurrent(func() {
blockID, _ := i.CutBlockIfReady(0, 0, false)
if blockID != uuid.Nil {
err := i.CompleteBlock(blockID)
err := i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err, "unexpected error completing block")
block := i.GetBlockToBeFlushed(blockID)
require.NotNil(t, block)
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestInstanceCutBlockIfReady(t *testing.T) {
blockID, err := instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate)
require.NoError(t, err)

err = instance.CompleteBlock(blockID)
err = instance.CompleteBlock(context.Background(), blockID)
if tc.expectedToCutBlock {
require.NoError(t, err, "unexpected error completing block")
}
Expand Down Expand Up @@ -738,7 +738,7 @@ func BenchmarkInstanceFindTraceByIDFromCompleteBlock(b *testing.B) {
require.NoError(b, err)
id, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(b, err)
err = instance.CompleteBlock(id)
err = instance.CompleteBlock(context.Background(), id)
require.NoError(b, err)

require.Equal(b, 1, len(instance.completeBlocks))
Expand Down Expand Up @@ -776,7 +776,7 @@ func benchmarkInstanceSearch(b testing.TB) {
// force the traces to be in a complete block
id, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(b, err)
err = instance.CompleteBlock(id)
err = instance.CompleteBlock(context.Background(), id)
require.NoError(b, err)

require.Equal(b, 1, len(instance.completeBlocks))
Expand Down Expand Up @@ -880,7 +880,7 @@ func BenchmarkInstanceContention(t *testing.B) {
go concurrent(func() {
blockID, _ := i.CutBlockIfReady(0, 0, false)
if blockID != uuid.Nil {
err := i.CompleteBlock(blockID)
err := i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err, "unexpected error completing block")
err = i.ClearCompletingBlock(blockID)
require.NoError(t, err, "unexpected error clearing wal block")
Expand Down
6 changes: 6 additions & 0 deletions pkg/usagestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"

"github.com/grafana/tempo/cmd/tempo/build"
"github.com/grafana/tempo/tempodb/backend"
Expand All @@ -36,6 +37,8 @@ var (

stabilityCheckInterval = 5 * time.Second
stabilityMinimumRequired = 6

tracer = otel.Tracer("usagestats/Reporter")
)

type Reporter struct {
Expand Down Expand Up @@ -158,6 +161,9 @@ func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger)
}

func (rep *Reporter) init(ctx context.Context) {
ctx, span := tracer.Start(ctx, "UsageReporter.init")
defer span.End()

if rep.conf.Leader {
rep.cluster = rep.initLeader(ctx)
return
Expand Down
2 changes: 1 addition & 1 deletion tempodb/blocklist/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, span := tracer.Start(ctx, "Poller.Do")
ctx, span := tracer.Start(ctx, "Poller.Do")
defer span.End()

tenants, err := p.reader.Tenants(ctx)
Expand Down

0 comments on commit 013a4ee

Please sign in to comment.