Skip to content

Commit

Permalink
Ingester: Validate completed blocks (#4256)
Browse files Browse the repository at this point in the history
* Add validate method to block

Signed-off-by: Joe Elliott <[email protected]>

* Add Validate usage in the ingester

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* add test and fix replay

Signed-off-by: Joe Elliott <[email protected]>

* increment metric

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Nov 7, 2024
1 parent 3449ef6 commit e9ebaa1
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137)
* [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero)
* [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137]
* [ENHANCEMENT] Add completed block validation on startup.[#4256](https://github.com/grafana/tempo/pull/4256) (@joe-elliott)
* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero)
* [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero)
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
Expand Down
2 changes: 2 additions & 0 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,5 @@ func (m *mockBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, tra
}

func (m *mockBlock) BlockMeta() *backend.BlockMeta { return m.meta }

func (m *mockBlock) Validate(context.Context) error { return nil }
92 changes: 92 additions & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
)

Expand Down Expand Up @@ -259,6 +261,96 @@ func TestSearchWAL(t *testing.T) {
require.Equal(t, uint32(1), results.Metrics.InspectedTraces)
}

func TestRediscoverLocalBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, traces, traceIDs := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks
ingester, _, _ = defaultIngester(t, tmpDir)

// should be able to find old traces that were replayed
for i, traceID := range traceIDs {
foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{
TraceID: traceID,
})
require.NoError(t, err, "unexpected error querying")
require.NotNil(t, foundTrace.Trace)
trace.SortTrace(foundTrace.Trace)
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
}
}

func TestRediscoverDropsInvalidBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, _, _ := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks. there should be 1 block
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok := ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// now mangle a complete block
instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// this cheats by reaching into the internals of the block and overwriting the parquet file directly. if this test starts failing
// it could be b/c the block internals changed and this no longer breaks a block
block := instance.completeBlocks[0]
err := block.writer.Write(ctx, vparquet4.DataFileName, uuid.UUID(block.BlockMeta().BlockID), "test", []byte("mangled"), nil)
require.NoError(t, err)

// create new ingester. this should rediscover local blocks. there should be 0 blocks
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 0)
}

// TODO - This test is flaky and commented out until it's fixed
// TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed
// to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs
Expand Down
16 changes: 15 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er
var rediscoveredBlocks []*LocalBlock

for _, id := range ids {

// Ignore blocks that have a matching wal. The wal will be replayed and the local block recreated.
// NOTE - Wal replay must be done beforehand.
if hasWal(id) {
Expand Down Expand Up @@ -629,6 +628,21 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er
return nil, err
}

// validate the block before adding it to the list. if we drop a block here and its not in the wal this is data loss, but there is no way to recover. this is likely due to disk
// level corruption
err = b.Validate(ctx)
if err != nil && !errors.Is(err, common.ErrUnsupported) {
level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err)
metricReplayErrorsTotal.WithLabelValues(i.instanceID).Inc()

err = i.local.ClearBlock(id, i.instanceID)
if err != nil {
return nil, fmt.Errorf("deleting invalid local block tenant %v block %v: %w", i.instanceID, id.String(), err)
}

continue
}

ib := NewLocalBlock(ctx, b, i.local)
rediscoveredBlocks = append(rediscoveredBlocks, ib)

Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type BackendBlock interface {
Searcher

BlockMeta() *backend.BlockMeta
Validate(ctx context.Context) error
}

type WALBlock interface {
Expand Down
4 changes: 4 additions & 0 deletions tempodb/encoding/v2/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,7 @@ func (b *BackendBlock) FetchTagValues(context.Context, traceql.FetchTagValuesReq
func (b *BackendBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, traceql.FetchTagsCallback, common.MetricsCallback, common.SearchOptions) error {
return common.ErrUnsupported
}

func (b *BackendBlock) Validate(_ context.Context) error {
return common.ErrUnsupported
}
4 changes: 4 additions & 0 deletions tempodb/encoding/v2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func (a *walBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, trac
return common.ErrUnsupported
}

func (a *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

func (a *walBlock) fullFilename() string {
filename := a.fullFilenameSeparator("+")
_, e1 := os.Stat(filename)
Expand Down
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet2/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ func (b *backendBlock) FetchTagValues(context.Context, traceql.FetchTagValuesReq
func (b *backendBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, traceql.FetchTagsCallback, common.MetricsCallback, common.SearchOptions) error {
return common.ErrUnsupported
}

func (b *backendBlock) Validate(context.Context) error {
return common.ErrUnsupported
}
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui
return nil
}

func (b *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
startOfRange := uint32(now.Add(-b.ingestionSlack).Unix())
Expand Down
5 changes: 5 additions & 0 deletions tempodb/encoding/vparquet3/block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vparquet3

import (
"context"
"sync"

"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -33,3 +34,7 @@ func newBackendBlock(meta *backend.BlockMeta, r backend.Reader) *backendBlock {
func (b *backendBlock) BlockMeta() *backend.BlockMeta {
return b.meta
}

func (b *backendBlock) Validate(context.Context) error {
return common.ErrUnsupported
}
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet3/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui
return nil
}

func (b *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
startOfRange := uint32(now.Add(-b.ingestionSlack).Unix())
Expand Down
42 changes: 42 additions & 0 deletions tempodb/encoding/vparquet4/block.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package vparquet4

import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -33,3 +38,40 @@ func newBackendBlock(meta *backend.BlockMeta, r backend.Reader) *backendBlock {
func (b *backendBlock) BlockMeta() *backend.BlockMeta {
return b.meta
}

// Validate will do a basic sanity check of the state of the parquet file. This can be extended to do more checks in the future.
// This method should lean towards being cost effective over complete.
func (b *backendBlock) Validate(ctx context.Context) error {
if b.meta == nil {
return errors.New("block meta is nil")
}

// read last 8 bytes of the file to confirm its at least complete. the last 4 should be ascii "PAR1"
// and the 4 bytes before that should be the length of the footer
buff := make([]byte, 8)
err := b.r.ReadRange(ctx, DataFileName, uuid.UUID(b.meta.BlockID), b.meta.TenantID, b.meta.Size_-8, buff, nil)
if err != nil {
return fmt.Errorf("failed to read parquet magic footer: %w", err)
}

if string(buff[4:]) != "PAR1" {
return fmt.Errorf("invalid parquet magic footer: %x", buff[4:])
}

footerSize := int64(binary.LittleEndian.Uint32(buff[:4]))
if footerSize != int64(b.meta.FooterSize) {
return fmt.Errorf("unexpected parquet footer size: %d", footerSize)
}

// read the first byte from all blooms to confirm they exist
buff = make([]byte, 1)
for i := 0; i < int(b.meta.BloomShardCount); i++ {
bloomName := common.BloomName(i)
err = b.r.ReadRange(ctx, bloomName, uuid.UUID(b.meta.BlockID), b.meta.TenantID, 0, buff, nil)
if err != nil {
return fmt.Errorf("failed to read first byte of bloom(%d): %w", i, err)
}
}

return nil
}
81 changes: 81 additions & 0 deletions tempodb/encoding/vparquet4/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package vparquet4

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/stretchr/testify/require"
)

func TestValidateFailsOnCorruptParquetFile(t *testing.T) {
ctx := context.Background()
block, w := validBlock(t)
meta := block.meta

err := block.Validate(ctx)
require.NoError(t, err)

// Corrupt the file
err = w.Write(ctx, DataFileName, uuid.UUID(meta.BlockID), meta.TenantID, []byte{0, 0, 0, 0, 0, 0, 0, 0}, nil)
require.NoError(t, err)

err = block.Validate(ctx)
require.Error(t, err)
}

func TestValidateFailsOnMissingBloom(t *testing.T) {
ctx := context.Background()
block, w := validBlock(t)
meta := block.meta

err := block.Validate(ctx)
require.NoError(t, err)

// remove a bloom
err = w.Delete(ctx, common.BloomName(0), backend.KeyPathForBlock(uuid.UUID(meta.BlockID), meta.TenantID))
require.NoError(t, err)

err = block.Validate(ctx)
require.Error(t, err)
}

func validBlock(t *testing.T) (*backendBlock, backend.Writer) {
t.Helper()

ctx := context.Background()

rawR, rawW, _, err := local.New(&local.Config{
Path: t.TempDir(),
})
require.NoError(t, err)

r := backend.NewReader(rawR)
w := backend.NewWriter(rawW)

iter := newTestIterator()

iter.Add(test.MakeTrace(10, nil), 100, 401)
iter.Add(test.MakeTrace(10, nil), 101, 402)
iter.Add(test.MakeTrace(10, nil), 102, 403)

cfg := &common.BlockConfig{
BloomFP: 0.01,
BloomShardSizeBytes: 100 * 1024,
}

meta := backend.NewBlockMeta("fake", uuid.New(), VersionString, backend.EncNone, "")
meta.TotalObjects = 1
meta.StartTime = time.Unix(300, 0)
meta.EndTime = time.Unix(305, 0)

outMeta, err := CreateBlock(ctx, cfg, meta, iter, r, w)
require.NoError(t, err)

return newBackendBlock(outMeta, r), w
}
5 changes: 5 additions & 0 deletions tempodb/encoding/vparquet4/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui
return nil
}

// TODO: potentially add validation to wal blocks and use in the wal replay code in the ingester.
func (b *walBlock) Validate(context.Context) error {
return common.ErrUnsupported
}

// It controls the block start/end date as a sliding window.
func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) {
now := time.Now()
Expand Down

0 comments on commit e9ebaa1

Please sign in to comment.