Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Return start and end timestamps from FindTraceIDs in v2 api #6770

Merged
merged 10 commits into from
Feb 24, 2025
3 changes: 1 addition & 2 deletions cmd/jaeger/internal/integration/trace_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"math"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -138,7 +137,7 @@ func (r *traceReader) FindTraces(
func (*traceReader) FindTraceIDs(
_ context.Context,
_ tracestore.TraceQueryParams,
) iter.Seq2[[]pcommon.TraceID, error] {
) iter.Seq2[tracestore.FindTraceIDsChunk, error] {
panic("not implemented")
}

Expand Down
10 changes: 4 additions & 6 deletions internal/storage/v2/api/tracestore/mocks/Reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 24 additions & 1 deletion internal/storage/v2/api/tracestore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Reader interface {
// of matching trace IDs. This is useful in some contexts, such as batch jobs, where a
// large list of trace IDs may be queried first and then the full traces are loaded
// in batches.
FindTraceIDs(ctx context.Context, query TraceQueryParams) iter.Seq2[[]pcommon.TraceID, error]
FindTraceIDs(ctx context.Context, query TraceQueryParams) iter.Seq2[FindTraceIDsChunk, error]
}

// GetTraceParams contains single-trace parameters for a GetTraces request.
Expand All @@ -88,6 +88,29 @@ type TraceQueryParams struct {
NumTraces int
}

// FindTraceIDsChunk represents a chunk of trace IDs that match specific query parameters.
type FindTraceIDsChunk struct {
// TraceIDs is a slice of trace IDs that match the query parameters.
TraceIDs []pcommon.TraceID

// Start is the start time of the earliest trace in the chunk.
//
// This field is provided as an optimization hint for some storage backends
// that can perform more efficient queries when they know the approximate time range.
// The value should not be used for precise time-based filtering or assumptions.
// It is meant as a rough boundary for the traces contained in the chunk and may not
// be populated in all cases.
Start time.Time

// End is the end time of the latest trace in the chunk.
//
// Similar to the Start field, this serves as an optimization for certain
// storage backends that can benefit from knowing the approximate end time
// of the traces. It should not be relied upon for any exact time-based logic
// and is only a hint for optimization purposes that may not be populated in all cases.
End time.Time
}

func (t *TraceQueryParams) ToSpanStoreQueryParameters() *spanstore.TraceQueryParameters {
return &spanstore.TraceQueryParameters{
ServiceName: t.ServiceName,
Expand Down
6 changes: 4 additions & 2 deletions internal/storage/v2/v1adapter/spanreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,10 @@ func TestSpanReader_FindTraceIDs(t *testing.T) {
for _, test := range tests {
tr := tracestoremocks.Reader{}
tr.On("FindTraceIDs", mock.Anything, test.expectedQuery).
Return(iter.Seq2[[]pcommon.TraceID, error](func(yield func([]pcommon.TraceID, error) bool) {
yield(test.traceIDs, test.err)
Return(iter.Seq2[tracestore.FindTraceIDsChunk, error](func(yield func(tracestore.FindTraceIDsChunk, error) bool) {
yield(tracestore.FindTraceIDsChunk{
TraceIDs: test.traceIDs,
}, test.err)
})).Once()

sr := NewSpanReader(&tr)
Expand Down
18 changes: 14 additions & 4 deletions internal/storage/v2/v1adapter/tracereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,28 @@ func (tr *TraceReader) FindTraces(
func (tr *TraceReader) FindTraceIDs(
ctx context.Context,
query tracestore.TraceQueryParams,
) iter.Seq2[[]pcommon.TraceID, error] {
return func(yield func([]pcommon.TraceID, error) bool) {
) iter.Seq2[tracestore.FindTraceIDsChunk, error] {
return func(yield func(tracestore.FindTraceIDsChunk, error) bool) {
traceIDs, err := tr.spanReader.FindTraceIDs(ctx, query.ToSpanStoreQueryParameters())
if err != nil {
yield(nil, err)
yield(tracestore.FindTraceIDsChunk{
TraceIDs: nil,
}, err)
return
}
if traceIDs == nil {
yield(tracestore.FindTraceIDsChunk{
TraceIDs: nil,
}, nil)
return
}
otelIDs := make([]pcommon.TraceID, 0, len(traceIDs))
for _, traceID := range traceIDs {
otelIDs = append(otelIDs, FromV1TraceID(traceID))
}
yield(otelIDs, nil)
yield(tracestore.FindTraceIDsChunk{
TraceIDs: otelIDs,
}, nil)
}
}

Expand Down
45 changes: 28 additions & 17 deletions internal/storage/v2/v1adapter/tracereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestTraceReader_FindTraceIDsDelegatesResponse(t *testing.T) {
tests := []struct {
name string
modelTraceIDs []model.TraceID
expectedTraceIDs []pcommon.TraceID
expectedTraceIDs tracestore.FindTraceIDsChunk
err error
}{
{
Expand All @@ -413,26 +413,34 @@ func TestTraceReader_FindTraceIDsDelegatesResponse(t *testing.T) {
{Low: 3, High: 2},
{Low: 4, High: 3},
},
expectedTraceIDs: []pcommon.TraceID{
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4}),
expectedTraceIDs: tracestore.FindTraceIDsChunk{
TraceIDs: []pcommon.TraceID{
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}),
pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4}),
},
},
},
{
name: "empty response",
modelTraceIDs: []model.TraceID{},
expectedTraceIDs: nil,
name: "empty response",
modelTraceIDs: []model.TraceID{},
expectedTraceIDs: tracestore.FindTraceIDsChunk{
TraceIDs: []pcommon.TraceID{},
},
},
{
name: "nil response",
modelTraceIDs: nil,
expectedTraceIDs: nil,
name: "nil response",
modelTraceIDs: nil,
expectedTraceIDs: tracestore.FindTraceIDsChunk{
TraceIDs: nil,
},
},
{
name: "error response",
modelTraceIDs: nil,
expectedTraceIDs: nil,
err: errors.New("test error"),
name: "error response",
modelTraceIDs: nil,
expectedTraceIDs: tracestore.FindTraceIDsChunk{
TraceIDs: nil,
},
err: errors.New("test error"),
},
}
for _, test := range tests {
Expand All @@ -456,7 +464,7 @@ func TestTraceReader_FindTraceIDsDelegatesResponse(t *testing.T) {
traceReader := &TraceReader{
spanReader: sr,
}
traceIDs, err := jiter.FlattenWithErrors(traceReader.FindTraceIDs(
traceIDs, err := jiter.CollectWithErrors(traceReader.FindTraceIDs(
context.Background(),
tracestore.TraceQueryParams{
ServiceName: "service",
Expand All @@ -469,8 +477,11 @@ func TestTraceReader_FindTraceIDsDelegatesResponse(t *testing.T) {
NumTraces: 10,
},
))
require.ErrorIs(t, err, test.err)
require.Equal(t, test.expectedTraceIDs, traceIDs)
if test.err != nil {
require.ErrorIs(t, err, test.err)
} else {
require.Equal(t, test.expectedTraceIDs, traceIDs[0])
}
})
}
}
Expand Down
7 changes: 4 additions & 3 deletions internal/storage/v2/v1adapter/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore"
)

// V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces)
Expand Down Expand Up @@ -62,17 +63,17 @@ func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace
return jaegerTraces, nil
}

func V1TraceIDsFromSeq2(traceIDsIter iter.Seq2[[]pcommon.TraceID, error]) ([]model.TraceID, error) {
func V1TraceIDsFromSeq2(traceIDsIter iter.Seq2[tracestore.FindTraceIDsChunk, error]) ([]model.TraceID, error) {
var (
iterErr error
modelTraceIDs []model.TraceID
)
traceIDsIter(func(traceIDs []pcommon.TraceID, err error) bool {
traceIDsIter(func(traceIDsResponse tracestore.FindTraceIDsChunk, err error) bool {
if err != nil {
iterErr = err
return false
}
for _, traceID := range traceIDs {
for _, traceID := range traceIDsResponse.TraceIDs {
modelTraceIDs = append(modelTraceIDs, ToV1TraceID(traceID))
}
return true
Expand Down
23 changes: 14 additions & 9 deletions internal/storage/v2/v1adapter/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore"
)

func TestProtoFromTraces_AddsWarnings(t *testing.T) {
Expand Down Expand Up @@ -287,30 +288,34 @@ func TestV1TraceToOtelTrace_ReturnEmptyOtelTrace(t *testing.T) {
func TestV1TraceIDsFromSeq2(t *testing.T) {
testCases := []struct {
name string
seqTraceIDs iter.Seq2[[]pcommon.TraceID, error]
seqTraceIDs iter.Seq2[tracestore.FindTraceIDsChunk, error]
expectedIDs []model.TraceID
expectedError error
}{
{
name: "empty sequence",
seqTraceIDs: func(func([]pcommon.TraceID, error) bool) {},
seqTraceIDs: func(func(tracestore.FindTraceIDsChunk, error) bool) {},
expectedIDs: nil,
expectedError: nil,
},
{
name: "sequence with error",
seqTraceIDs: func(yield func([]pcommon.TraceID, error) bool) {
yield(nil, assert.AnError)
seqTraceIDs: func(yield func(tracestore.FindTraceIDsChunk, error) bool) {
yield(tracestore.FindTraceIDsChunk{
TraceIDs: nil,
}, assert.AnError)
},
expectedIDs: nil,
expectedError: assert.AnError,
},
{
name: "sequence with one chunk of trace IDs",
seqTraceIDs: func(yield func([]pcommon.TraceID, error) bool) {
seqTraceIDs: func(yield func(tracestore.FindTraceIDsChunk, error) bool) {
traceID1 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3})
traceID2 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5})
yield([]pcommon.TraceID{traceID1, traceID2}, nil)
yield(tracestore.FindTraceIDsChunk{
TraceIDs: []pcommon.TraceID{traceID1, traceID2},
}, nil)
},
expectedIDs: []model.TraceID{
model.NewTraceID(2, 3),
Expand All @@ -320,12 +325,12 @@ func TestV1TraceIDsFromSeq2(t *testing.T) {
},
{
name: "sequence with multiple chunks of trace IDs",
seqTraceIDs: func(yield func([]pcommon.TraceID, error) bool) {
seqTraceIDs: func(yield func(tracestore.FindTraceIDsChunk, error) bool) {
traceID1 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3})
traceID2 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5})
traceID3 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7})
yield([]pcommon.TraceID{traceID1}, nil)
yield([]pcommon.TraceID{traceID2, traceID3}, nil)
yield(tracestore.FindTraceIDsChunk{TraceIDs: []pcommon.TraceID{traceID1}}, nil)
yield(tracestore.FindTraceIDsChunk{TraceIDs: []pcommon.TraceID{traceID2, traceID3}}, nil)
},
expectedIDs: []model.TraceID{
model.NewTraceID(2, 3),
Expand Down
Loading