Skip to content

Commit

Permalink
test cleanup, add CompareLogs option "IgnoreLogRecordAttributeValue"
Browse files Browse the repository at this point in the history
  • Loading branch information
colelaven committed Oct 7, 2024
1 parent 0073f64 commit 7502c7d
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 59 deletions.
8 changes: 8 additions & 0 deletions pkg/pdatatest/plogtest/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ func TestCompareLogs(t *testing.T) {
withoutOptions: errors.New(`resource "map[]": scope "collector": log record "map[]": timestamp doesn't match expected: 11651379494838206465, actual: 11651379494838206464`),
withOptions: nil,
},
{
name: "ignore-log-record-attribute-value",
compareOptions: []CompareLogsOption{
IgnoreLogRecordAttributeValue("Key1"),
},
withoutOptions: errors.New(`resource "map[]": scope "": missing expected log record: map[Key1:Val2]; resource "map[]": scope "": unexpected log record: map[Key1:Val1]`),
withOptions: nil,
},
}

for _, tc := range tcs {
Expand Down
38 changes: 36 additions & 2 deletions pkg/pdatatest/plogtest/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,47 @@ func (opt ignoreResourceAttributeValue) applyOnLogs(expected, actual plog.Logs)
opt.maskLogsResourceAttributeValue(actual)
}

func (opt ignoreResourceAttributeValue) maskLogsResourceAttributeValue(metrics plog.Logs) {
rls := metrics.ResourceLogs()
func (opt ignoreResourceAttributeValue) maskLogsResourceAttributeValue(logs plog.Logs) {
rls := logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
internal.MaskResourceAttributeValue(rls.At(i).Resource(), opt.attributeName)
}
}

// IgnoreLogRecordAttributeValue is a CompareLogsOption that sets the value of an attribute
// to empty bytes for every log record
func IgnoreLogRecordAttributeValue(attributeName string) CompareLogsOption {
return ignoreLogRecordAttributeValue{
attributeName: attributeName,
}
}

type ignoreLogRecordAttributeValue struct {
attributeName string
}

func (opt ignoreLogRecordAttributeValue) applyOnLogs(expected, actual plog.Logs) {
opt.maskLogRecordAttributeValue(expected)
opt.maskLogRecordAttributeValue(actual)
}

func (opt ignoreLogRecordAttributeValue) maskLogRecordAttributeValue(logs plog.Logs) {
rls := logs.ResourceLogs()
for i := 0; i < logs.ResourceLogs().Len(); i++ {
sls := rls.At(i).ScopeLogs()
for j := 0; j < sls.Len(); j++ {
lrs := sls.At(j).LogRecords()
for k := 0; k < lrs.Len(); k++ {
lr := lrs.At(k)
val, exists := lr.Attributes().Get(opt.attributeName)
if exists {
val.SetEmptyBytes()
}
}
}
}
}

func IgnoreTimestamp() CompareLogsOption {
return compareLogsOptionFunc(func(expected, actual plog.Logs) {
now := pcommon.NewTimestampFromTime(time.Now())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
resourceLogs:
- resource: {}
scopeLogs:
- logRecords:
- attributes:
- key: Key1
value:
stringValue: Val1
body: {}
spanId: ""
traceId: ""
scope: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
resourceLogs:
- resource: {}
scopeLogs:
- logRecords:
- attributes:
- key: Key1
value:
stringValue: Val2
body: {}
spanId: ""
traceId: ""
scope: {}
78 changes: 21 additions & 57 deletions processor/logdedupprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"testing"
"time"

Expand All @@ -17,11 +18,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

Expand Down Expand Up @@ -96,8 +93,7 @@ func TestProcessorShutdownCtxError(t *testing.T) {
}

// Create a processor
p, err := newProcessor(cfg, logsSink, settings)
p.conditions = nil
p, err := createLogsProcessor(context.Background(), settings, cfg, logsSink)
require.NoError(t, err)

// Start then stop the processor checking for errors
Expand Down Expand Up @@ -126,8 +122,7 @@ func TestShutdownBeforeStart(t *testing.T) {
}

// Create a processor
p, err := newProcessor(cfg, logsSink, settings)
p.conditions = nil
p, err := createLogsProcessor(context.Background(), settings, cfg, logsSink)
require.NoError(t, err)
require.NotPanics(t, func() {
err := p.Shutdown(context.Background())
Expand All @@ -149,14 +144,13 @@ func TestProcessorConsume(t *testing.T) {
}

// Create a processor
p, err := newProcessor(cfg, logsSink, settings)
p.conditions = nil
p, err := createLogsProcessor(context.Background(), settings, cfg, logsSink)
require.NoError(t, err)

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

logs, err := golden.ReadLogs("testdata/input/basicLogs.yaml")
logs, err := golden.ReadLogs(filepath.Join("testdata", "input", "basicLogs.yaml"))
require.NoError(t, err)

// Consume the payload
Expand All @@ -168,14 +162,13 @@ func TestProcessorConsume(t *testing.T) {
return logsSink.LogRecordCount() > 0
}, 3*time.Second, 200*time.Millisecond)

expectedLogs, err := golden.ReadLogs("testdata/expected/basicLogs.yaml")
expectedLogs, err := golden.ReadLogs(filepath.Join("testdata", "expected", "basicLogs.yaml"))
require.NoError(t, err)

allSinkLogs := logsSink.AllLogs()
require.Len(t, allSinkLogs, 1)

removeTimestamps(expectedLogs, allSinkLogs[0])
require.NoError(t, plogtest.CompareLogs(expectedLogs, allSinkLogs[0], plogtest.IgnoreObservedTimestamp()))
require.NoError(t, plogtest.CompareLogs(expectedLogs, allSinkLogs[0], plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreTimestamp(), plogtest.IgnoreLogRecordAttributeValue("first_observed_timestamp"), plogtest.IgnoreLogRecordAttributeValue("last_observed_timestamp")))

// Cleanup
err = p.Shutdown(context.Background())
Expand All @@ -192,8 +185,7 @@ func Test_unsetLogsAreExportedOnShutdown(t *testing.T) {
}

// Create & start a processor
p, err := newProcessor(cfg, logsSink, processortest.NewNopSettings())
p.conditions = nil
p, err := createLogsProcessor(context.Background(), processortest.NewNopSettings(), cfg, logsSink)
require.NoError(t, err)
err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
Expand Down Expand Up @@ -230,14 +222,13 @@ func TestProcessorConsumeCondition(t *testing.T) {
}

// Create a processor
p, err := newProcessor(cfg, logsSink, processortest.NewNopSettings())
p.conditions = getConditions(t, cfg.Conditions)
p, err := createLogsProcessor(context.Background(), processortest.NewNopSettings(), cfg, logsSink)
require.NoError(t, err)

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

logs, err := golden.ReadLogs("testdata/input/conditionLogs.yaml")
logs, err := golden.ReadLogs(filepath.Join("testdata", "input", "conditionLogs.yaml"))
require.NoError(t, err)

// Consume the payload
Expand All @@ -252,18 +243,16 @@ func TestProcessorConsumeCondition(t *testing.T) {
allSinkLogs := logsSink.AllLogs()
require.Len(t, allSinkLogs, 2)

expectedConsumedLogs, err := golden.ReadLogs("testdata/expected/conditionConsumedLogs.yaml")
expectedConsumedLogs, err := golden.ReadLogs(filepath.Join("testdata", "expected", "conditionConsumedLogs.yaml"))
require.NoError(t, err)
expectedDedupedLogs, err := golden.ReadLogs("testdata/expected/conditionDedupedLogs.yaml")
expectedDedupedLogs, err := golden.ReadLogs(filepath.Join("testdata", "expected", "conditionDedupedLogs.yaml"))
require.NoError(t, err)

consumedLogs := allSinkLogs[0]
dedupedLogs := allSinkLogs[1]

removeTimestamps(expectedConsumedLogs, expectedDedupedLogs, consumedLogs, dedupedLogs)

require.NoError(t, plogtest.CompareLogs(expectedConsumedLogs, consumedLogs, plogtest.IgnoreObservedTimestamp()))
require.NoError(t, plogtest.CompareLogs(expectedDedupedLogs, dedupedLogs, plogtest.IgnoreObservedTimestamp()))
require.NoError(t, plogtest.CompareLogs(expectedConsumedLogs, consumedLogs, plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreTimestamp(), plogtest.IgnoreLogRecordAttributeValue("first_observed_timestamp"), plogtest.IgnoreLogRecordAttributeValue("last_observed_timestamp")))
require.NoError(t, plogtest.CompareLogs(expectedDedupedLogs, dedupedLogs, plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreTimestamp(), plogtest.IgnoreLogRecordAttributeValue("first_observed_timestamp"), plogtest.IgnoreLogRecordAttributeValue("last_observed_timestamp")))

// Cleanup
err = p.Shutdown(context.Background())
Expand All @@ -283,14 +272,13 @@ func TestProcessorConsumeMultipleConditions(t *testing.T) {
}

// Create a processor
p, err := newProcessor(cfg, logsSink, processortest.NewNopSettings())
p.conditions = getConditions(t, cfg.Conditions)
p, err := createLogsProcessor(context.Background(), processortest.NewNopSettings(), cfg, logsSink)
require.NoError(t, err)

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

logs, err := golden.ReadLogs("testdata/input/conditionLogs.yaml")
logs, err := golden.ReadLogs(filepath.Join("testdata", "input", "conditionLogs.yaml"))
require.NoError(t, err)

// Consume the payload
Expand All @@ -308,40 +296,16 @@ func TestProcessorConsumeMultipleConditions(t *testing.T) {
consumedLogs := allSinkLogs[0]
dedupedLogs := allSinkLogs[1]

expectedConsumedLogs, err := golden.ReadLogs("testdata/expected/multipleConditionsConsumedLogs.yaml")
expectedConsumedLogs, err := golden.ReadLogs(filepath.Join("testdata", "expected", "multipleConditionsConsumedLogs.yaml"))
require.NoError(t, err)
expectedDedupedLogs, err := golden.ReadLogs("testdata/expected/multipleConditionsDedupedLogs.yaml")
expectedDedupedLogs, err := golden.ReadLogs(filepath.Join("testdata", "expected", "multipleConditionsDedupedLogs.yaml"))
require.NoError(t, err)

removeTimestamps(expectedConsumedLogs, expectedDedupedLogs, consumedLogs, dedupedLogs)

require.NoError(t, plogtest.CompareLogs(expectedConsumedLogs, consumedLogs, plogtest.IgnoreObservedTimestamp()))
require.NoError(t, plogtest.CompareLogs(expectedDedupedLogs, dedupedLogs, plogtest.IgnoreObservedTimestamp()))
err = plogtest.CompareLogs(expectedConsumedLogs, consumedLogs, plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreTimestamp(), plogtest.IgnoreLogRecordAttributeValue("first_observed_timestamp"), plogtest.IgnoreLogRecordAttributeValue("last_observed_timestamp"))
require.NoError(t, err)
require.NoError(t, plogtest.CompareLogs(expectedDedupedLogs, dedupedLogs, plogtest.IgnoreObservedTimestamp(), plogtest.IgnoreTimestamp(), plogtest.IgnoreLogRecordAttributeValue("first_observed_timestamp"), plogtest.IgnoreLogRecordAttributeValue("last_observed_timestamp")))

// Cleanup
err = p.Shutdown(context.Background())
require.NoError(t, err)
}

func removeTimestamps(plogs ...plog.Logs) {
for _, logs := range plogs {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(i)
for j := 0; j < rl.ScopeLogs().Len(); j++ {
sl := rl.ScopeLogs().At(j)
for k := 0; k < sl.LogRecords().Len(); k++ {
logRecord := sl.LogRecords().At(k)
logRecord.Attributes().Remove("first_observed_timestamp")
logRecord.Attributes().Remove("last_observed_timestamp")
logRecord.SetTimestamp(0)
}
}
}
}
}

func getConditions(t *testing.T, conditionsIn []string) expr.BoolExpr[ottllog.TransformContext] {
conditions, err := filterottl.NewBoolExprForLog(conditionsIn, filterottl.StandardLogFuncs(), ottl.PropagateError, componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
return conditions
}

0 comments on commit 7502c7d

Please sign in to comment.