Skip to content

Commit

Permalink
Updated according to jguiton's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Feb 14, 2025
1 parent 3ac26d3 commit 5998f4e
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 119 deletions.
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestBatchSenderWithTimeout(t *testing.T) {
assert.EqualValues(t, 12, sink.ItemsCount())
})
}
runTest("enable_queue_batcher", true)
// When queue_batcher is enabled, we don't propagate context deadline.
runTest("disable_queue_batcher", false)
}

Expand Down
78 changes: 8 additions & 70 deletions exporter/exporterhelper/internal/batcher/batch_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
)
Expand All @@ -13,81 +12,20 @@ type traceContextKeyType int

const batchSpanLinksKey traceContextKeyType = iota

// batchContext links the underlying context to all incoming span contexts.
type batchContext struct {
deadline time.Time
deadlineOk bool
deadlineCtx context.Context
ctx context.Context
}

func SpanLinksFromContext(ctx context.Context) []trace.Link {
// LinksFromContext returns a list of trace links registered in the context.
func LinksFromContext(ctx context.Context) []trace.Link {
if ctx == nil {
return []trace.Link{}
}

Check warning on line 19 in exporter/exporterhelper/internal/batcher/batch_context.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/batcher/batch_context.go#L18-L19

Added lines #L18 - L19 were not covered by tests

if bctx, ok := ctx.(batchContext); ok {
if links, ok := bctx.Value(batchSpanLinksKey).([]trace.Link); ok {
return links
}
} else {
panic("lalal")
if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok {
return links
}
return []trace.Link{}
return []trace.Link{trace.LinkFromContext(ctx)}
}

func newBatchContext(ctx context.Context) batchContext {
deadline, ok := ctx.Deadline()
underlyingCtx := context.WithValue(
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
return context.WithValue(
context.Background(),
batchSpanLinksKey,
[]trace.Link{trace.LinkFromContext(ctx)})
return batchContext{
deadline: deadline,
deadlineOk: ok,
deadlineCtx: ctx,
ctx: underlyingCtx,
}
}

// Merge links the span from incoming context to the span from the first context.
func (mc batchContext) Merge(other context.Context) batchContext {
deadline, deadlineOk := mc.Deadline()
deadlineCtx := mc.deadlineCtx
if otherDeadline, ok := other.Deadline(); ok {
deadlineOk = true
if deadline.Before(otherDeadline) {
deadline = otherDeadline
deadlineCtx = other
}
}

links := append(SpanLinksFromContext(mc), trace.LinkFromContext(other))
underlyingCtx := context.WithValue(
mc.ctx,
batchSpanLinksKey,
links)
return batchContext{
deadline: deadline,
deadlineOk: deadlineOk,
deadlineCtx: deadlineCtx,
ctx: underlyingCtx,
}
}

// Deadline returns the latest deadline of all context.
func (mc batchContext) Deadline() (time.Time, bool) {
return mc.deadline, mc.deadlineOk
}

func (mc batchContext) Done() <-chan struct{} {
return mc.deadlineCtx.Done()
}

func (mc batchContext) Err() error {
return mc.deadlineCtx.Err()
}

func (mc batchContext) Value(key any) any {
return mc.ctx.Value(key)
append(LinksFromContext(ctx1), LinksFromContext(ctx2)...))
}
44 changes: 4 additions & 40 deletions exporter/exporterhelper/internal/batcher/batch_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,13 @@ package batcher
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/otel/trace"
)

func TestBatchContextDeadline(t *testing.T) {
now := time.Now()
ctx1 := context.Background()
batchContext := newBatchContext(ctx1)

var ok bool
deadline, ok := batchContext.Deadline()
require.Equal(t, time.Time{}, deadline)
require.False(t, ok)

ctx2, cancel2 := context.WithDeadline(context.Background(), now.Add(200))
defer cancel2()
batchContext = batchContext.Merge(ctx2)

deadline, ok = batchContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(200), deadline)

ctx3, cancel3 := context.WithDeadline(context.Background(), now.Add(300))
defer cancel3()
ctx4, cancel4 := context.WithDeadline(context.Background(), now.Add(100))
defer cancel4()
batchContext = batchContext.Merge(ctx3)
batchContext = batchContext.Merge(ctx4)

deadline, ok = batchContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(300), deadline)

time.Sleep(300)
require.Equal(t, ctx3.Err(), batchContext.Err())
}

func TestBatchContextLink(t *testing.T) {
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
Expand All @@ -55,19 +21,17 @@ func TestBatchContextLink(t *testing.T) {

ctx2, span2 := tracer.Start(ctx1, "span2")
defer span2.End()
batchContext := newBatchContext(ctx2)

ctx3, span3 := tracer.Start(ctx1, "span3")
defer span3.End()
batchContext = batchContext.Merge(ctx3)

ctx4, span4 := tracer.Start(ctx1, "span4")
defer span4.End()
batchContext = batchContext.Merge(ctx4)

span2.AddEvent("This is an event.")
batchContext := contextWithMergedLinks(ctx2, ctx3)
batchContext = contextWithMergedLinks(batchContext, ctx4)

actualLinks := SpanLinksFromContext(batchContext)
actualLinks := LinksFromContext(batchContext)
require.Len(t, actualLinks, 3)
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/batcher/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type batch struct {
ctx batchContext
ctx context.Context
req request.Request
done multiDone
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: newBatchContext(ctx),
ctx: ctx,
req: lastReq,
done: multiDone{done},
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Logic on how to deal with the current batch:
qb.currentBatch.req = reqList[0]
qb.currentBatch.done = append(qb.currentBatch.done, done)
qb.currentBatch.ctx = qb.currentBatch.ctx.Merge(ctx)
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)

// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
// cannot unlock and re-lock because we are not done processing all the responses.
Expand All @@ -142,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: newBatchContext(ctx),
ctx: ctx,
req: lastReq,
done: multiDone{done},
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
// StartOp creates the span used to trace the operation. Returning
// the updated context and the created span.
func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context {
spanLinks := batcher.SpanLinksFromContext(ctx)

// This span should contain the links to spans of all batched requests.
ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs, trace.WithLinks(spanLinks...))
ctx, _ = ors.tracer.Start(ctx,
ors.spanName,
ors.spanAttrs,
trace.WithLinks(batcher.LinksFromContext(ctx)...))
return ctx
}

Expand Down
6 changes: 6 additions & 0 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"context"
"errors"

"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/featuregate"
Expand Down Expand Up @@ -111,7 +113,11 @@ func NewQueueSender(
}

q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req request.Request, done exporterqueue.Done) {
// TODO: move start of span to enqueue instead to dequeue.
// Figure out how to preserve span context across persistent storage.
ctx, _ = metadata.Tracer(qSet.ExporterSettings.TelemetrySettings).Start(ctx, "exporter/enqueue")
done.OnDone(exportFunc(ctx, req))
trace.SpanFromContext(ctx).End()
}))
if err != nil {
return nil, err
Expand Down

0 comments on commit 5998f4e

Please sign in to comment.