Skip to content

Commit

Permalink
Add resource_keys routing for the traces (#14208)
Browse files Browse the repository at this point in the history
  • Loading branch information
foadnh authored Oct 24, 2024
1 parent 6b1d3dd commit 75eff33
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 36 deletions.
8 changes: 5 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ const (
svcRouting
metricNameRouting
resourceRouting
resourceKeysRouting
)

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
ResourceKeys []string `mapstructure:"resource_keys"`
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
42 changes: 29 additions & 13 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ var _ exporter.Traces = (*traceExporterImp)(nil)
type exporterTraces map[*wrappedExporter]ptrace.Traces

type traceExporterImp struct {
loadBalancer *loadBalancer
routingKey routingKey
loadBalancer *loadBalancer
routingKey routingKey
routingResourceKeys []string

stopped bool
shutdownWg sync.WaitGroup
Expand All @@ -38,8 +39,9 @@ type traceExporterImp struct {
func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) {
exporterFactory := otlpexporter.NewFactory()

eCfg := cfg.(*Config)
lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) {
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
oCfg := buildExporterConfig(eCfg, endpoint)
return exporterFactory.CreateTracesExporter(ctx, params, &oCfg)
})
if err != nil {
Expand All @@ -48,9 +50,13 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t

traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting}

switch cfg.(*Config).RoutingKey {
switch eCfg.RoutingKey {
case "service":
traceExporter.routingKey = svcRouting
traceExporter.routingKey = resourceKeysRouting
traceExporter.routingResourceKeys = []string{"service.name"}
case "resource":
traceExporter.routingKey = resourceKeysRouting
traceExporter.routingResourceKeys = eCfg.ResourceKeys
case "traceID", "":
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
Expand Down Expand Up @@ -85,7 +91,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
exporterSegregatedTraces := make(exporterTraces)
endpoints := make(map[*wrappedExporter]string)
for _, batch := range batches {
routingID, err := routingIdentifiersFromTraces(batch, e.routingKey)
routingID, err := e.routingIdentifiersFromTraces(batch)
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +138,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
return errs
}

func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) {
func (e *traceExporterImp) routingIdentifiersFromTraces(td ptrace.Traces) (map[string]bool, error) {
ids := make(map[string]bool)
rs := td.ResourceSpans()
if rs.Len() == 0 {
Expand All @@ -149,15 +155,25 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]
return nil, errors.New("empty spans")
}

if key == svcRouting {
if e.routingKey == resourceKeysRouting {
var missingResourceKey bool
for i := 0; i < rs.Len(); i++ {
svc, ok := rs.At(i).Resource().Attributes().Get("service.name")
if !ok {
return nil, errors.New("unable to get service name")
var resourceKeyFound bool
rsi := rs.At(i)
for _, attrKey := range e.routingResourceKeys {
if v, ok := rsi.Resource().Attributes().Get(attrKey); ok {
ids[v.AsString()] = true
resourceKeyFound = true
break
}
}
if !resourceKeyFound {
missingResourceKey = true
}
ids[svc.Str()] = true
}
return ids, nil
if !missingResourceKey {
return ids, nil
}
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = true
Expand Down
189 changes: 169 additions & 20 deletions exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,52 @@ import (

func TestNewTracesExporter(t *testing.T) {
for _, tt := range []struct {
desc string
config *Config
err error
desc string
config *Config
wantRoutingKey routingKey
wantRoutingResourceKeys []string
err error
}{
{
"simple",
simpleConfig(),
traceIDRouting,
nil,
nil,
},
{
"service",
serviceBasedRoutingConfig(),
resourceKeysRouting,
[]string{conventions.AttributeServiceName},
nil,
},
{
"resource_keys",
resourceKeysBasedRoutingConfig(),
resourceKeysRouting,
[]string{"resource.key_1", "resource.key_2"},
nil,
},
{
"empty",
&Config{},
0,
nil,
errNoResolver,
},
} {
t.Run(tt.desc, func(t *testing.T) {
// test
_, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config)
te, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config)

// verify
require.Equal(t, tt.err, err)
if err != nil {
return
}
require.Equal(t, tt.wantRoutingKey, te.routingKey)
require.Equal(t, tt.wantRoutingResourceKeys, te.routingResourceKeys)
})
}
}
Expand Down Expand Up @@ -219,7 +244,8 @@ func TestConsumeTracesServiceBased(t *testing.T) {
p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig())
require.NotNil(t, p)
require.NoError(t, err)
assert.Equal(t, p.routingKey, svcRouting)
assert.Equal(t, p.routingKey, resourceKeysRouting)
assert.Equal(t, p.routingResourceKeys, []string{"service.name"})

// pre-load an exporter here, so that we don't use the actual OTLP exporter
lb.addMissingExporters(context.Background(), []string{"endpoint-1"})
Expand All @@ -245,29 +271,113 @@ func TestConsumeTracesServiceBased(t *testing.T) {
assert.Nil(t, res)
}

func TestConsumeTracesResourceKeysBased(t *testing.T) {
componentFactory := func(_ context.Context, _ string) (component.Component, error) {
return newNopMockTracesExporter(), nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newTracesExporter(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig())
require.NotNil(t, p)
require.NoError(t, err)
assert.Equal(t, p.routingKey, resourceKeysRouting)
assert.Equal(t, p.routingResourceKeys, []string{"resource.key_1", "resource.key_2"})

// pre-load an exporter here, so that we don't use the actual OTLP exporter
lb.addMissingExporters(context.Background(), []string{"endpoint-1"})
lb.addMissingExporters(context.Background(), []string{"endpoint-2"})
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(_ context.Context) ([]string, error) {
return []string{"endpoint-1", "endpoint-2"}, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

// test
res := p.ConsumeTraces(context.Background(), simpleTracesWithResourceKeys())

// verify
assert.Nil(t, res)
}

func TestServiceBasedRoutingForSameTraceId(t *testing.T) {
b := pcommon.TraceID([16]byte{1, 2, 3, 4})
for _, tt := range []struct {
desc string
batch ptrace.Traces
routingKey routingKey
res map[string]bool
te *traceExporterImp
desc string
batch ptrace.Traces
res map[string]bool
}{
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"service.name"},
},
"same trace id and different services - service based routing",
twoServicesWithSameTraceID(),
svcRouting,
map[string]bool{"ad-service-1": true, "get-recommendations-7": true},
},
{
&traceExporterImp{
routingKey: traceIDRouting,
},
"same trace id and different services - trace id routing",
twoServicesWithSameTraceID(),
traceIDRouting,
map[string]bool{string(b[:]): true},
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, nil)
assert.Equal(t, res, tt.res)
})
}
}

func TestResourceKeysBasedRoutingIdentifiers(t *testing.T) {
b := pcommon.TraceID([16]byte{1, 2, 3, 4})
for _, tt := range []struct {
te *traceExporterImp
desc string
batch ptrace.Traces
res map[string]bool
}{
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"resource.key_1", "resource.key_2"},
},
"two resource_keys values",
simpleTracesWithResourceKeys(),
map[string]bool{
"val-1": true,
"val-2": true,
},
},
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"resource.key_1"},
},
"single resource_keys value with trace ID as default",
simpleTracesWithResourceKeys(),
map[string]bool{
"val-1": true,
string(b[:]): true,
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, nil)
assert.Equal(t, res, tt.res)
})
Expand Down Expand Up @@ -405,40 +515,46 @@ func TestBatchWithTwoTraces(t *testing.T) {

func TestNoTracesInBatch(t *testing.T) {
for _, tt := range []struct {
desc string
batch ptrace.Traces
routingKey routingKey
err error
te *traceExporterImp
desc string
batch ptrace.Traces
err error
}{
{
&traceExporterImp{
routingKey: svcRouting,
},
"no resource spans",
ptrace.NewTraces(),
traceIDRouting,
errors.New("empty resource spans"),
},
{
&traceExporterImp{
routingKey: traceIDRouting,
},
"no instrumentation library spans",
func() ptrace.Traces {
batch := ptrace.NewTraces()
batch.ResourceSpans().AppendEmpty()
return batch
}(),
traceIDRouting,
errors.New("empty scope spans"),
},
{
&traceExporterImp{
routingKey: svcRouting,
},
"no spans",
func() ptrace.Traces {
batch := ptrace.NewTraces()
batch.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
return batch
}(),
svcRouting,
errors.New("empty spans"),
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, tt.err)
assert.Equal(t, res, map[string]bool(nil))
})
Expand Down Expand Up @@ -684,6 +800,29 @@ func simpleTraces() ptrace.Traces {
return traces
}

func simpleTracesWithResourceKeys() ptrace.Traces {
traces := ptrace.NewTraces()
traces.ResourceSpans().EnsureCapacity(1)

rSpans := traces.ResourceSpans().AppendEmpty()
rAttrs := rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_1", "val-1")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

rSpans = traces.ResourceSpans().AppendEmpty()
rAttrs = rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_2", "val-2")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

rSpans = traces.ResourceSpans().AppendEmpty()
rAttrs = rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_1", "val-1")
rAttrs.PutStr("resource.key_2", "val-2")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

return traces
}

func simpleTracesWithServiceName() ptrace.Traces {
traces := ptrace.NewTraces()
traces.ResourceSpans().EnsureCapacity(1)
Expand Down Expand Up @@ -736,6 +875,16 @@ func serviceBasedRoutingConfig() *Config {
}
}

func resourceKeysBasedRoutingConfig() *Config {
return &Config{
Resolver: ResolverSettings{
Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}},
},
RoutingKey: "resource_keys",
ResourceKeys: []string{"resource.key_1", "resource.key_2"},
}
}

type mockTracesExporter struct {
component.Component
ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error
Expand Down

0 comments on commit 75eff33

Please sign in to comment.