Skip to content

Commit

Permalink
Refactor for alternative block formats (grafana#1346)
Browse files Browse the repository at this point in the history
* commit1

* Make pool interface instead of bytes, fix tests

* simplify tests

* Replace tempodb IterateObjects() with Search(), internalize backendblock.find

* Replace another use of BackendBlock.Iterator() with .Search()

* lint

* lint

* Review feedback: fix default, remove concurrency

* Replace ...SearchOptions pattern with regular struct
  • Loading branch information
mdisibio authored Mar 23, 2022
1 parent 3db3938 commit 8cd5084
Show file tree
Hide file tree
Showing 31 changed files with 471 additions and 577 deletions.
10 changes: 2 additions & 8 deletions cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/google/uuid"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -149,20 +148,15 @@ func queryBlock(ctx context.Context, r backend.Reader, c backend.Compactor, bloc
return nil, err
}

obj, err := block.Find(ctx, traceID)
trace, err := block.FindTraceByID(ctx, traceID)
if err != nil {
return nil, err
}

if obj == nil {
if trace == nil {
return nil, nil
}

trace, err := model.MustNewObjectDecoder(meta.DataEncoding).PrepareForRead(obj)
if err != nil {
return nil, err
}

return &queryResults{
blockID: id,
trace: trace,
Expand Down
90 changes: 19 additions & 71 deletions cmd/tempo-cli/cmd-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ package main
import (
"context"
"fmt"
"io"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)

const (
Expand Down Expand Up @@ -48,6 +44,11 @@ func (cmd *searchBlocksCmd) Run(opts *globalOptions) error {
return err
}

searchReq := &tempopb.SearchRequest{
Tags: map[string]string{cmd.Name: cmd.Value},
Limit: limit,
}

ctx := context.Background()

blockIDs, err := r.Blocks(ctx, cmd.TenantID)
Expand All @@ -72,7 +73,7 @@ func (cmd *searchBlocksCmd) Run(opts *globalOptions) error {
return
}
if err != nil {
fmt.Println("Error querying block:", err)
fmt.Println("Error reading block meta:", err)
return
}
if meta.StartTime.Unix() <= endTime.Unix() &&
Expand All @@ -90,92 +91,39 @@ func (cmd *searchBlocksCmd) Run(opts *globalOptions) error {
blockmetas = append(blockmetas, q)
}

searchOpts := encoding.DefaultSearchOptions()
searchOpts.ChunkSizeBytes = chunkSize
searchOpts.PrefetchTraceCount = iteratorBuffer

fmt.Println("Blocks In Range:", len(blockmetas))
foundids := []common.ID{}
foundids := []string{}
for _, meta := range blockmetas {
block, err := encoding.NewBackendBlock(meta, r)
if err != nil {
return err
}

// todo : graduated chunk sizes will increase throughput. i.e. first request should be small to feed the below parsing faster
// later queries should use larger chunk sizes to be more efficient
iter, err := block.Iterator(chunkSize)
resp, err := block.Search(ctx, searchReq, searchOpts)
if err != nil {
return err
fmt.Println("Error searching block:", err)
return nil
}

prefetchIter := encoding.NewPrefetchIterator(ctx, iter, iteratorBuffer)
ids, err := searchIterator(prefetchIter, meta.DataEncoding, cmd.Name, cmd.Value, limit)
prefetchIter.Close()
if err != nil {
return err
if resp != nil {
for _, r := range resp.Traces {
foundids = append(foundids, r.TraceID)
}
}

foundids = append(foundids, ids...)
if len(foundids) >= limit {
break
}
}

fmt.Println("Matching Traces:", len(foundids))
for _, id := range foundids {
fmt.Println(" ", util.TraceIDToHexString(id))
fmt.Println(" ", id)
}

return nil
}

func searchIterator(iter encoding.Iterator, dataEncoding string, name string, value string, limit int) ([]common.ID, error) {
ctx := context.Background()
found := []common.ID{}

for {
id, obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}

// todo : parrallelize unmarshal and search
trace, err := model.MustNewObjectDecoder(dataEncoding).PrepareForRead(obj)
if err != nil {
return nil, err
}

if traceContainsKeyValue(trace, name, value) {
found = append(found, id)
}

if len(found) >= limit {
break
}
}

return found, nil
}

func traceContainsKeyValue(trace *tempopb.Trace, name string, value string) bool {
// todo : support other attribute types besides string
for _, b := range trace.Batches {
for _, a := range b.Resource.Attributes {
if a.Key == name && a.Value.GetStringValue() == value {
return true
}
}

for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
for _, a := range s.Attributes {
if a.Key == name && a.Value.GetStringValue() == value {
return true
}
}
}
}
}

return false
}
54 changes: 7 additions & 47 deletions cmd/tempo-serverless/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package serverless
import (
"bytes"
"fmt"
"io"
"net/http"
"reflect"
"runtime"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/google/uuid"
"github.com/grafana/dskit/flagext"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -95,53 +93,15 @@ func Handler(r *http.Request) (*tempopb.SearchResponse, *HTTPError) {
return nil, httpError("creating backend block", err, http.StatusInternalServerError)
}

// tempodb exposes an IterateObjects() method to basically perform the below loop. currently we are purposefully
// not using that so that the serverless function doesn't have to instantiate a full tempodb instance.
iter, err := block.PartialIterator(cfg.Search.ChunkSizeBytes, int(searchReq.StartPage), int(searchReq.PagesToSearch))
if err != nil {
return nil, httpError("creating partial iterator", err, http.StatusInternalServerError)
}
iter = encoding.NewPrefetchIterator(r.Context(), iter, cfg.Search.PrefetchTraceCount)
defer iter.Close()
opts := encoding.DefaultSearchOptions()
opts.StartPage = int(searchReq.StartPage)
opts.TotalPages = int(searchReq.PagesToSearch)
opts.PrefetchTraceCount = cfg.Search.PrefetchTraceCount
opts.MaxBytes = maxBytes

resp := &tempopb.SearchResponse{
Metrics: &tempopb.SearchMetrics{},
}

decoder, err := model.NewObjectDecoder(searchReq.DataEncoding)
resp, err := block.Search(r.Context(), searchReq.SearchReq, opts)
if err != nil {
return nil, httpError("creating decoder", err, http.StatusInternalServerError)
}

for {
id, obj, err := iter.Next(r.Context())
if err == io.EOF {
break
}
if err != nil {
return nil, httpError("iterating", err, http.StatusInternalServerError)
}

resp.Metrics.InspectedTraces++
resp.Metrics.InspectedBytes += uint64(len(obj))

if maxBytes > 0 && len(obj) > maxBytes {
resp.Metrics.SkippedTraces++
continue
}

metadata, err := decoder.Matches(id, obj, searchReq.SearchReq)
if err != nil {
return nil, httpError("matching", err, http.StatusInternalServerError)
}
if metadata == nil {
continue
}

resp.Traces = append(resp.Traces, metadata)
if len(resp.Traces) >= int(searchReq.SearchReq.Limit) {
break
}
return nil, httpError("searching block", err, http.StatusInternalServerError)
}

runtime.GC()
Expand Down
2 changes: 1 addition & 1 deletion modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (c *Compactor) Owns(hash string) bool {

// Combine implements tempodb.CompactorSharder
func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error) {
combinedObj, wasCombined, err := model.ObjectCombiner.Combine(dataEncoding, objs...)
combinedObj, wasCombined, err := model.StaticCombiner.Combine(dataEncoding, objs...)
if err != nil {
return nil, false, err
}
Expand Down
16 changes: 8 additions & 8 deletions modules/distributor/search_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package distributor
import (
"strconv"

"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/grafana/tempo/tempodb/search"
)

type extractTagFunc func(tag string) bool
Expand All @@ -25,12 +25,12 @@ func extractSearchDataAll(traces []*rebatchedTrace, extractTag extractTagFunc) [
// extractSearchData returns the flatbuffer search data for the given trace. It is extracted here
// in the distributor because this is the only place on the ingest path where the trace is available
// in object form.
func extractSearchData(trace *tempopb.Trace, id []byte, extractTag extractTagFunc) []byte {
func extractSearchData(tr *tempopb.Trace, id []byte, extractTag extractTagFunc) []byte {
data := &tempofb.SearchEntryMutable{}

data.TraceID = id

for _, b := range trace.Batches {
for _, b := range tr.Batches {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
Expand All @@ -50,24 +50,24 @@ func extractSearchData(trace *tempopb.Trace, id []byte, extractTag extractTagFun
if len(s.ParentSpanId) == 0 {

// Collect root.name
data.AddTag(search.RootSpanNameTag, s.Name)
data.AddTag(trace.RootSpanNameTag, s.Name)

// Collect root.service.name
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if a.Key == search.ServiceNameTag {
if a.Key == trace.ServiceNameTag {
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(search.RootServiceNameTag, s)
data.AddTag(trace.RootServiceNameTag, s)
}
}
}
}
}

// Collect for any spans
data.AddTag(search.SpanNameTag, s.Name)
data.AddTag(trace.SpanNameTag, s.Name)
if s.Status != nil {
data.AddTag(search.StatusCodeTag, strconv.Itoa(int(s.Status.Code)))
data.AddTag(trace.StatusCodeTag, strconv.Itoa(int(s.Status.Code)))
}
data.SetStartTimeUnixNano(s.StartTimeUnixNano)
data.SetEndTimeUnixNano(s.EndTimeUnixNano)
Expand Down
12 changes: 6 additions & 6 deletions modules/distributor/search_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (

"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1"
v1_resource "github.com/grafana/tempo/pkg/tempopb/resource/v1"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/tempodb/search"
)

func TestExtractSearchData(t *testing.T) {
Expand Down Expand Up @@ -64,11 +64,11 @@ func TestExtractSearchData(t *testing.T) {
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.NewSearchDataMapWithData(map[string][]string{
"foo": {"bar"},
search.RootSpanNameTag: {"firstSpan"},
search.SpanNameTag: {"firstSpan"},
search.RootServiceNameTag: {"baz"},
search.ServiceNameTag: {"baz"},
"foo": {"bar"},
trace.RootSpanNameTag: {"firstSpan"},
trace.SpanNameTag: {"firstSpan"},
trace.RootServiceNameTag: {"baz"},
trace.ServiceNameTag: {"baz"},
}),
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
Expand Down
10 changes: 5 additions & 5 deletions modules/frontend/searchsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"github.com/google/uuid"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/blocklist"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -32,15 +32,15 @@ type mockReader struct {
metas []*backend.BlockMeta
}

func (m *mockReader) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([][]byte, []string, []error, error) {
return nil, nil, nil, nil
func (m *mockReader) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([]*tempopb.Trace, []error, error) {
return nil, nil, nil
}

func (m *mockReader) BlockMetas(tenantID string) []*backend.BlockMeta {
return m.metas
}
func (m *mockReader) IterateObjects(ctx context.Context, meta *backend.BlockMeta, startPage int, totalPages int, callback tempodb.IterateObjectCallback) error {
return nil
func (m *mockReader) Search(ctx context.Context, meta *backend.BlockMeta, req *tempopb.SearchRequest, opts encoding.SearchOptions) (*tempopb.SearchResponse, error) {
return nil, nil
}
func (m *mockReader) EnablePolling(sharder blocklist.JobSharder) {}
func (m *mockReader) Shutdown() {}
Expand Down
Loading

0 comments on commit 8cd5084

Please sign in to comment.