Skip to content

Commit

Permalink
use golden package to cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
colelaven committed Oct 4, 2024
1 parent acce1d1 commit 0073f64
Show file tree
Hide file tree
Showing 10 changed files with 485 additions and 151 deletions.
2 changes: 2 additions & 0 deletions processor/logdedupprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ go 1.22.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.111.0
Expand Down
200 changes: 49 additions & 151 deletions processor/logdedupprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func Test_newProcessor(t *testing.T) {
Expand Down Expand Up @@ -155,21 +156,8 @@ func TestProcessorConsume(t *testing.T) {
err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

// Create plog payload
logRecord1 := generateTestLogRecord(t, "Body of the log")
logRecord2 := generateTestLogRecord(t, "Body of the log")

// Differ by timestamp and attribute to be removed
logRecord1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Minute)))
logRecord2.Attributes().PutBool("remove_me", false)

logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutInt("one", 1)

sl := rl.ScopeLogs().AppendEmpty()
logRecord1.CopyTo(sl.LogRecords().AppendEmpty())
logRecord2.CopyTo(sl.LogRecords().AppendEmpty())
logs, err := golden.ReadLogs("testdata/input/basicLogs.yaml")
require.NoError(t, err)

// Consume the payload
err = p.ConsumeLogs(context.Background(), logs)
Expand All @@ -180,22 +168,14 @@ func TestProcessorConsume(t *testing.T) {
return logsSink.LogRecordCount() > 0
}, 3*time.Second, 200*time.Millisecond)

expectedLogs, err := golden.ReadLogs("testdata/expected/basicLogs.yaml")
require.NoError(t, err)

allSinkLogs := logsSink.AllLogs()
require.Len(t, allSinkLogs, 1)

consumedLogs := allSinkLogs[0]
require.Equal(t, 1, consumedLogs.LogRecordCount())

require.Equal(t, 1, consumedLogs.ResourceLogs().Len())
consumedRl := consumedLogs.ResourceLogs().At(0)
require.Equal(t, 1, consumedRl.ScopeLogs().Len())
consumedSl := consumedRl.ScopeLogs().At(0)
require.Equal(t, 1, consumedSl.LogRecords().Len())
consumedLogRecord := consumedSl.LogRecords().At(0)

countVal, ok := consumedLogRecord.Attributes().Get(cfg.LogCountAttribute)
require.True(t, ok)
require.Equal(t, int64(2), countVal.Int())
removeTimestamps(expectedLogs, allSinkLogs[0])
require.NoError(t, plogtest.CompareLogs(expectedLogs, allSinkLogs[0], plogtest.IgnoreObservedTimestamp()))

// Cleanup
err = p.Shutdown(context.Background())
Expand Down Expand Up @@ -257,73 +237,33 @@ func TestProcessorConsumeCondition(t *testing.T) {
err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

// Create plog payload
logRecord1 := generateTestLogRecord(t, "Body of the log1")
logRecord2 := generateTestLogRecord(t, "Body of the log1")
logRecord3 := generateTestLogRecord(t, "Body of the log2")
logRecord4 := generateTestLogRecord(t, "Body of the log2")

// Differ by timestamps
logRecord1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Minute)))
logRecord2.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(2 * time.Minute)))
logRecord3.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(3 * time.Minute)))
logRecord4.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(4 * time.Minute)))

// Set ID attributes to use for conditions
logRecord1.Attributes().PutInt("ID", 1)
logRecord2.Attributes().PutInt("ID", 1)
logRecord3.Attributes().PutInt("ID", 2)
logRecord4.Attributes().PutInt("ID", 2)

logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
logRecord1.CopyTo(sl.LogRecords().AppendEmpty())
logRecord3.CopyTo(sl.LogRecords().AppendEmpty())
logRecord2.CopyTo(sl.LogRecords().AppendEmpty())
logRecord4.CopyTo(sl.LogRecords().AppendEmpty())
logs, err := golden.ReadLogs("testdata/input/conditionLogs.yaml")
require.NoError(t, err)

// Consume the payload
err = p.ConsumeLogs(context.Background(), logs)
require.NoError(t, err)

// Wait for the logs to be emitted
require.Eventually(t, func() bool {
return logsSink.LogRecordCount() > 2
return logsSink.LogRecordCount() > 4
}, 3*time.Second, 200*time.Millisecond)

allSinkLogs := logsSink.AllLogs()
require.Len(t, allSinkLogs, 2)

consumedLogs := allSinkLogs[0]
require.Equal(t, 2, consumedLogs.LogRecordCount())

require.Equal(t, 1, consumedLogs.ResourceLogs().Len())
consumedRl := consumedLogs.ResourceLogs().At(0)
require.Equal(t, 1, consumedRl.ScopeLogs().Len())
consumedSl := consumedRl.ScopeLogs().At(0)
require.Equal(t, 2, consumedSl.LogRecords().Len())

for i := 0; i < consumedSl.LogRecords().Len(); i++ {
consumedLogRecord := consumedSl.LogRecords().At(i)
ID, ok := consumedLogRecord.Attributes().Get("ID")
require.True(t, ok)
require.Equal(t, int64(2), ID.Int())
}
expectedConsumedLogs, err := golden.ReadLogs("testdata/expected/conditionConsumedLogs.yaml")
require.NoError(t, err)
expectedDedupedLogs, err := golden.ReadLogs("testdata/expected/conditionDedupedLogs.yaml")
require.NoError(t, err)

consumedLogs := allSinkLogs[0]
dedupedLogs := allSinkLogs[1]
require.Equal(t, 1, dedupedLogs.LogRecordCount())

require.Equal(t, 1, dedupedLogs.ResourceLogs().Len())
dedupedRl := dedupedLogs.ResourceLogs().At(0)
require.Equal(t, 1, dedupedRl.ScopeLogs().Len())
dedupedSl := dedupedRl.ScopeLogs().At(0)
require.Equal(t, 1, dedupedSl.LogRecords().Len())
dedupedLogRecord := dedupedSl.LogRecords().At(0)
removeTimestamps(expectedConsumedLogs, expectedDedupedLogs, consumedLogs, dedupedLogs)

countVal, ok := dedupedLogRecord.Attributes().Get(cfg.LogCountAttribute)
require.True(t, ok)
require.Equal(t, int64(2), countVal.Int())
require.NoError(t, plogtest.CompareLogs(expectedConsumedLogs, consumedLogs, plogtest.IgnoreObservedTimestamp()))
require.NoError(t, plogtest.CompareLogs(expectedDedupedLogs, dedupedLogs, plogtest.IgnoreObservedTimestamp()))

// Cleanup
err = p.Shutdown(context.Background())
Expand All @@ -350,39 +290,8 @@ func TestProcessorConsumeMultipleConditions(t *testing.T) {
err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

// Create plog payload
logRecord1 := generateTestLogRecord(t, "Body of the log1")
logRecord2 := generateTestLogRecord(t, "Body of the log1")
logRecord3 := generateTestLogRecord(t, "Body of the log2")
logRecord4 := generateTestLogRecord(t, "Body of the log2")
logRecord5 := generateTestLogRecord(t, "Body of the log3")
logRecord6 := generateTestLogRecord(t, "Body of the log3")

// Differ by timestamps
logRecord1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Minute)))
logRecord2.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(2 * time.Minute)))
logRecord3.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(3 * time.Minute)))
logRecord4.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(4 * time.Minute)))
logRecord5.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(5 * time.Minute)))
logRecord6.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(6 * time.Minute)))

// Set ID attributes to use for conditions
logRecord1.Attributes().PutInt("ID", 1)
logRecord2.Attributes().PutInt("ID", 1)
logRecord3.Attributes().PutInt("ID", 2)
logRecord4.Attributes().PutInt("ID", 2)
logRecord5.Attributes().PutInt("ID", 3)
logRecord6.Attributes().PutInt("ID", 3)

logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
logRecord1.CopyTo(sl.LogRecords().AppendEmpty())
logRecord3.CopyTo(sl.LogRecords().AppendEmpty())
logRecord2.CopyTo(sl.LogRecords().AppendEmpty())
logRecord4.CopyTo(sl.LogRecords().AppendEmpty())
logRecord5.CopyTo(sl.LogRecords().AppendEmpty())
logRecord6.CopyTo(sl.LogRecords().AppendEmpty())
logs, err := golden.ReadLogs("testdata/input/conditionLogs.yaml")
require.NoError(t, err)

// Consume the payload
err = p.ConsumeLogs(context.Background(), logs)
Expand All @@ -397,51 +306,40 @@ func TestProcessorConsumeMultipleConditions(t *testing.T) {
require.Len(t, allSinkLogs, 2)

consumedLogs := allSinkLogs[0]
require.Equal(t, 2, consumedLogs.LogRecordCount())

require.Equal(t, 1, consumedLogs.ResourceLogs().Len())
consumedRl := consumedLogs.ResourceLogs().At(0)
require.Equal(t, 1, consumedRl.ScopeLogs().Len())
consumedSl := consumedRl.ScopeLogs().At(0)
require.Equal(t, 2, consumedSl.LogRecords().Len())

for i := 0; i < consumedSl.LogRecords().Len(); i++ {
consumedLogRecord := consumedSl.LogRecords().At(i)
ID, ok := consumedLogRecord.Attributes().Get("ID")
require.True(t, ok)
require.Equal(t, int64(2), ID.Int())
}

dedupedLogs := allSinkLogs[1]
require.Equal(t, 2, dedupedLogs.LogRecordCount())

require.Equal(t, 1, dedupedLogs.ResourceLogs().Len())
dedupedRl := dedupedLogs.ResourceLogs().At(0)
require.Equal(t, 1, dedupedRl.ScopeLogs().Len())
dedupedSl := dedupedRl.ScopeLogs().At(0)
require.Equal(t, 2, dedupedSl.LogRecords().Len())

dedupedLogRecord1 := dedupedSl.LogRecords().At(0)
countVal1, ok := dedupedLogRecord1.Attributes().Get(cfg.LogCountAttribute)
require.True(t, ok)
require.Equal(t, int64(2), countVal1.Int())
idVal1, ok := dedupedLogRecord1.Attributes().Get("ID")
require.True(t, ok)
require.True(t, int64(1) == idVal1.Int() || int64(3) == idVal1.Int())

dedupedLogRecord3 := dedupedSl.LogRecords().At(1)
countVal3, ok := dedupedLogRecord3.Attributes().Get(cfg.LogCountAttribute)
require.True(t, ok)
require.Equal(t, int64(2), countVal3.Int())
idVal3, ok := dedupedLogRecord3.Attributes().Get("ID")
require.True(t, ok)
require.True(t, int64(1) == idVal3.Int() || int64(3) == idVal3.Int())

expectedConsumedLogs, err := golden.ReadLogs("testdata/expected/multipleConditionsConsumedLogs.yaml")
require.NoError(t, err)
expectedDedupedLogs, err := golden.ReadLogs("testdata/expected/multipleConditionsDedupedLogs.yaml")
require.NoError(t, err)

removeTimestamps(expectedConsumedLogs, expectedDedupedLogs, consumedLogs, dedupedLogs)

require.NoError(t, plogtest.CompareLogs(expectedConsumedLogs, consumedLogs, plogtest.IgnoreObservedTimestamp()))
require.NoError(t, plogtest.CompareLogs(expectedDedupedLogs, dedupedLogs, plogtest.IgnoreObservedTimestamp()))

// Cleanup
err = p.Shutdown(context.Background())
require.NoError(t, err)
}

func removeTimestamps(plogs ...plog.Logs) {
for _, logs := range plogs {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(i)
for j := 0; j < rl.ScopeLogs().Len(); j++ {
sl := rl.ScopeLogs().At(j)
for k := 0; k < sl.LogRecords().Len(); k++ {
logRecord := sl.LogRecords().At(k)
logRecord.Attributes().Remove("first_observed_timestamp")
logRecord.Attributes().Remove("last_observed_timestamp")
logRecord.SetTimestamp(0)
}
}
}
}
}

func getConditions(t *testing.T, conditionsIn []string) expr.BoolExpr[ottllog.TransformContext] {
conditions, err := filterottl.NewBoolExprForLog(conditionsIn, filterottl.StandardLogFuncs(), ottl.PropagateError, componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
Expand Down
69 changes: 69 additions & 0 deletions processor/logdedupprocessor/testdata/conditionConsumedLogs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
resourceLogs:
- resource: {}
scopeLogs:
- logRecords:
- attributes:
- key: bool
value:
boolValue: true
- key: str
value:
stringValue: attr str
- key: ID
value:
intValue: "2"
body:
stringValue: Body of the log2
severityText: info
spanId: ""
timeUnixNano: "1728069505995028000"
traceId: ""
- attributes:
- key: bool
value:
boolValue: true
- key: str
value:
stringValue: attr str
- key: ID
value:
intValue: "2"
body:
stringValue: Body of the log2
severityText: info
spanId: ""
timeUnixNano: "1728069565995028000"
traceId: ""
- attributes:
- key: bool
value:
boolValue: true
- key: str
value:
stringValue: attr str
- key: ID
value:
intValue: "3"
body:
stringValue: Body of the log3
severityText: info
spanId: ""
timeUnixNano: "1728069625995028000"
traceId: ""
- attributes:
- key: bool
value:
boolValue: true
- key: str
value:
stringValue: attr str
- key: ID
value:
intValue: "3"
body:
stringValue: Body of the log3
severityText: info
spanId: ""
timeUnixNano: "1728069685995028000"
traceId: ""
scope: {}
32 changes: 32 additions & 0 deletions processor/logdedupprocessor/testdata/expected/basicLogs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
resourceLogs:
- resource:
attributes:
- key: one
value:
intValue: "1"
scopeLogs:
- logRecords:
- attributes:
- key: bool
value:
boolValue: true
- key: str
value:
stringValue: attr str
- key: log_count
value:
intValue: "2"
- key: first_observed_timestamp
value:
stringValue: "2024-10-04T19:21:47Z"
- key: last_observed_timestamp
value:
stringValue: "2024-10-04T19:21:47Z"
body:
stringValue: Body of the log
observedTimeUnixNano: "1728069707998122000"
severityText: info
spanId: ""
timeUnixNano: "1728069708998920000"
traceId: ""
scope: {}
Loading

0 comments on commit 0073f64

Please sign in to comment.