Skip to content

Commit

Permalink
fix statement eval not in order
Browse files Browse the repository at this point in the history
  • Loading branch information
Frapschen committed Sep 4, 2024
1 parent 255aebe commit 48d676d
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 24 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-router-random-statements.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix OTTL statement not eval in order

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34860]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 5 additions & 5 deletions processor/routingprocessor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,21 @@ func (p *logProcessor) route(ctx context.Context, l plog.Logs) error {
)

matchCount := len(p.router.routes)
for key, route := range p.router.routes {
for _, route := range p.router.routes {
_, isMatch, err := route.statement.Execute(ctx, ltx)
if err != nil {
if p.config.ErrorMode == ottl.PropagateError {
return err
}
p.group("", groups, p.router.defaultExporters, rlogs)
p.recordNonRoutedResourceLogs(ctx, key, rlogs)
p.recordNonRoutedResourceLogs(ctx, route.key, rlogs)
continue
}
if !isMatch {
matchCount--
continue
}
p.group(key, groups, route.exporters, rlogs)
p.group(route.key, groups, route.exporters, rlogs)
}

if matchCount == 0 {
Expand All @@ -151,14 +151,14 @@ func (p *logProcessor) group(
key string,
groups map[string]logsGroup,
exporters []exporter.Logs,
spans plog.ResourceLogs,
logs plog.ResourceLogs,
) {
group, ok := groups[key]
if !ok {
group.logs = plog.NewLogs()
group.exporters = exporters
}
spans.CopyTo(group.logs.ResourceLogs().AppendEmpty())
logs.CopyTo(group.logs.ResourceLogs().AppendEmpty())
groups[key] = group
}

Expand Down
164 changes: 164 additions & 0 deletions processor/routingprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package routingprocessor

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -127,6 +128,63 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
})
}

func TestLogs_RoutingWorks_Context_Ordered(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExpFirst := &mockLogsExporter{}
lExpSecond := &mockLogsExporter{}
lExpThird := &mockLogsExporter{}

host := newMockHost(map[component.DataType]map[component.ID]component.Component{
component.DataTypeLogs: {
component.MustNewID("otlp"): defaultExp,
component.MustNewIDWithName("otlp", "first"): lExpFirst,
component.MustNewIDWithName("otlp", "second"): lExpSecond,
component.MustNewIDWithName("otlp", "third"): lExpThird,
},
})

exp, err := newLogProcessor(noopTelemetrySettings, &Config{
FromAttribute: "X-Tenant",
AttributeSource: contextAttributeSource,
DefaultExporters: []string{"otlp"},
Table: []RoutingTableItem{
{
Value: "order-second",
Exporters: []string{"otlp/second"},
},
{
Value: "order-first",
Exporters: []string{"otlp/first"},
},
{
Value: "order-third",
Exporters: []string{"otlp/third"},
},
},
})
require.NoError(t, err)
require.NoError(t, exp.Start(context.Background(), host))

for i := 1; i <= 5; i++ {
t.Run(fmt.Sprintf("run %d time", i), func(t *testing.T) {
l := plog.NewLogs()
ll := l.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
ll.AppendEmpty().Body().SetStr("this is a log")

assert.NoError(t, exp.ConsumeLogs(
metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
"X-Tenant": "order-third",
})),
l,
))
assert.Len(t, defaultExp.AllLogs(), 0)
assert.Len(t, lExpFirst.AllLogs(), 0)
assert.Len(t, lExpSecond.AllLogs(), 0)
assert.Len(t, lExpThird.AllLogs(), i, "log should only be routed to lExpThird exporter")
})
}
}

func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExp := &mockLogsExporter{}
Expand Down Expand Up @@ -182,6 +240,59 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
})
}

func TestLogs_RoutingWorks_ResourceAttribute_Ordered(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExpFirst := &mockLogsExporter{}
lExpSecond := &mockLogsExporter{}
lExpThird := &mockLogsExporter{}

host := newMockHost(map[component.DataType]map[component.ID]component.Component{
component.DataTypeLogs: {
component.MustNewID("otlp"): defaultExp,
component.MustNewIDWithName("otlp", "first"): lExpFirst,
component.MustNewIDWithName("otlp", "second"): lExpSecond,
component.MustNewIDWithName("otlp", "third"): lExpThird,
},
})

exp, err := newLogProcessor(noopTelemetrySettings, &Config{
FromAttribute: "X-Tenant",
AttributeSource: resourceAttributeSource,
DefaultExporters: []string{"otlp"},
Table: []RoutingTableItem{
{
Value: "order-second",
Exporters: []string{"otlp/second"},
},
{
Value: "order-first",
Exporters: []string{"otlp/first"},
},
{
Value: "order-third",
Exporters: []string{"otlp/third"},
},
},
})
require.NoError(t, err)
require.NoError(t, exp.Start(context.Background(), host))

for i := 1; i <= 5; i++ {
t.Run(fmt.Sprintf("run %d time", i), func(t *testing.T) {
l := plog.NewLogs()
rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "order-third")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("this is a log")

assert.NoError(t, exp.ConsumeLogs(context.Background(), l))
assert.Len(t, defaultExp.AllLogs(), 0)
assert.Len(t, lExpFirst.AllLogs(), 0)
assert.Len(t, lExpSecond.AllLogs(), 0)
assert.Len(t, lExpThird.AllLogs(), i, "log should only be routed to lExpThird exporter")
})
}
}

func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExp := &mockLogsExporter{}
Expand Down Expand Up @@ -400,6 +511,59 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
})
}

func TestLogs_RoutingWorks_ResourceAttribute_WithOTTL_Ordered(t *testing.T) {
defaultExp := &mockLogsExporter{}
lExpFirst := &mockLogsExporter{}
lExpSecond := &mockLogsExporter{}

host := newMockHost(map[component.DataType]map[component.ID]component.Component{
component.DataTypeLogs: {
component.MustNewID("otlp"): defaultExp,
component.MustNewIDWithName("otlp", "first"): lExpFirst,
component.MustNewIDWithName("otlp", "second"): lExpSecond,
},
})

exp, err := newLogProcessor(noopTelemetrySettings, &Config{
FromAttribute: "__otel_enabled__",
AttributeSource: resourceAttributeSource,
DefaultExporters: []string{"otlp"},
Table: []RoutingTableItem{
{
Statement: `route() where resource.attributes["__otel_enabled__"] == nil`,
Exporters: []string{"otlp/first"},
},
{
Statement: `delete_key(resource.attributes, "__otel_enabled__") where resource.attributes["__otel_enabled__"] == "true"`,
Exporters: []string{"otlp/first"},
},
{
Statement: `delete_key(resource.attributes, "__otel_enabled__") where resource.attributes["__otel_enabled__"] == "false"`,
Exporters: []string{"otlp/second"},
},
},
})
require.NoError(t, err)
require.NoError(t, exp.Start(context.Background(), host))

for i := 1; i <= 5; i++ {
t.Run(fmt.Sprintf("run %d time", i), func(t *testing.T) {
l := plog.NewLogs()
rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("__otel_enabled__", "false")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("this is a log")

assert.NoError(t, exp.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultExp.AllLogs(), 0)
assert.Len(t, lExpFirst.AllLogs(), 0)

// we expect Statement eval in order and log should only be routed to lExpSecond exporter
assert.Len(t, lExpSecond.AllLogs(), i, "log should only be routed to lExpSecond exporter")
})
}
}

// see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26462
func TestLogsAttributeWithOTTLDoesNotCauseCrash(t *testing.T) {
// prepare
Expand Down
4 changes: 2 additions & 2 deletions processor/routingprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *metricsProcessor) route(ctx context.Context, tm pmetric.Metrics) error
)

matchCount := len(p.router.routes)
for key, route := range p.router.routes {
for _, route := range p.router.routes {
_, isMatch, err := route.statement.Execute(ctx, mtx)
if err != nil {
if p.config.ErrorMode == ottl.PropagateError {
Expand All @@ -128,7 +128,7 @@ func (p *metricsProcessor) route(ctx context.Context, tm pmetric.Metrics) error
matchCount--
continue
}
p.group(key, groups, route.exporters, rmetrics)
p.group(route.key, groups, route.exporters, rmetrics)
}

if matchCount == 0 {
Expand Down
32 changes: 18 additions & 14 deletions processor/routingprocessor/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type router[E component.Component, K any] struct {
table []RoutingTableItem

defaultExporters []E
routes map[string]routingItem[E, K]
routes []routingItem[E, K]
}

// newRouter creates a new router instance with its type parameter constrained
Expand All @@ -44,12 +44,11 @@ func newRouter[E component.Component, K any](

table: table,
defaultExporterIDs: defaultExporterIDs,

routes: make(map[string]routingItem[E, K]),
}
}

type routingItem[E component.Component, K any] struct {
key string
exporters []E
statement *ottl.Statement[K]
}
Expand Down Expand Up @@ -90,17 +89,14 @@ func (r *router[E, K]) registerDefaultExporters(available map[component.ID]compo
// registerRouteExporters registers route exporters using the provided
// available exporters map to check if they were available.
func (r *router[E, K]) registerRouteExporters(available map[component.ID]component.Component) error {
var routes []routingItem[E, K]
for _, item := range r.table {
statement, err := r.getStatementFrom(item)
if err != nil {
return err
}

route, ok := r.routes[key(item)]
if !ok {
route.statement = statement
}

var exporters []E
for _, name := range item.Exporters {
e, err := r.extractExporter(name, available)
if errors.Is(err, errExporterNotFound) {
Expand All @@ -109,10 +105,16 @@ func (r *router[E, K]) registerRouteExporters(available map[component.ID]compone
if err != nil {
return err
}
route.exporters = append(route.exporters, e)
exporters = append(exporters, e)
}
r.routes[key(item)] = route
routes = append(routes, routingItem[E, K]{
key: key(item),
exporters: exporters,
statement: statement,
})
}
r.routes = routes

return nil
}

Expand Down Expand Up @@ -171,9 +173,11 @@ func (r *router[E, K]) extractExporter(name string, available map[component.ID]c
}

func (r *router[E, K]) getExporters(key string) []E {
e, ok := r.routes[key]
if !ok {
return r.defaultExporters
for _, route := range r.routes {
if route.key == key {
return route.exporters
}
}
return e.exporters

return r.defaultExporters
}
6 changes: 3 additions & 3 deletions processor/routingprocessor/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,21 @@ func (p *tracesProcessor) route(ctx context.Context, t ptrace.Traces) error {
)

matchCount := len(p.router.routes)
for key, route := range p.router.routes {
for _, route := range p.router.routes {
_, isMatch, err := route.statement.Execute(ctx, stx)
if err != nil {
if p.config.ErrorMode == ottl.PropagateError {
return err
}
p.group("", groups, p.router.defaultExporters, rspans)
p.recordNonRoutedResourceSpans(ctx, key, rspans)
p.recordNonRoutedResourceSpans(ctx, route.key, rspans)
continue
}
if !isMatch {
matchCount--
continue
}
p.group(key, groups, route.exporters, rspans)
p.group(route.key, groups, route.exporters, rspans)
}

if matchCount == 0 {
Expand Down

0 comments on commit 48d676d

Please sign in to comment.