diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 1a6ae007..bcd2d477 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -170,6 +170,17 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform transformed := tr(messages) // no error as errors should be returned in the failures array of TransformationResult + // Ack filtered messages with no further action + messagesToFilter := transformed.Filtered + for _, msg := range messagesToFilter { + if msg.AckFunc != nil { + msg.AckFunc() + } + } + // Push filter result to observer + filterRes := models.NewFilterResult(messagesToFilter) + o.Filtered(filterRes) + // Send message buffer messagesToSend := transformed.Result diff --git a/cmd/config.go b/cmd/config.go index 3248bdda..366522ab 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -389,9 +389,15 @@ func (c *Config) GetTransformations() (transform.TransformationApplyFunction, er funcs = append(funcs, transform.SpEnrichedToJson) case "spEnrichedSetPk": funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(funcOpts[1])) + case "spEnrichedFilter": + filterFunc, err := transform.NewSpEnrichedFilterFunction(funcOpts[1]) + if err != nil { + return nil, err + } + funcs = append(funcs, filterFunc) case "none": default: - return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got '%s'", c.Transformation)) + return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got '%s'", c.Transformation)) } } return transform.NewTransformation(funcs...), nil diff --git a/cmd/config_test.go b/cmd/config_test.go index 5f1b260c..55682ef0 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -109,7 +109,24 @@ func TestNewConfig_InvalidTransformation(t *testing.T) { transformation, err := c.GetTransformations() assert.Nil(transformation) assert.NotNil(err) - assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got 'fake'", err.Error()) + assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got 'fake'", err.Error()) +} + +func TestNewConfig_FilterFailure(t *testing.T) { + assert := assert.New(t) + + defer os.Unsetenv("MESSAGE_TRANSFORMATION") + + os.Setenv("MESSAGE_TRANSFORMATION", "spEnrichedFilter:incompatibleArg") + + c, err := NewConfig() + assert.NotNil(c) + assert.Nil(err) + + transformation, err := c.GetTransformations() + assert.Nil(transformation) + assert.NotNil(err) + assert.Equal(`Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`, err.Error()) } func TestNewConfig_InvalidTarget(t *testing.T) { diff --git a/cmd/serverless.go b/cmd/serverless.go index 3c6c54dc..ef0b65d9 100644 --- a/cmd/serverless.go +++ b/cmd/serverless.go @@ -50,6 +50,14 @@ func ServerlessRequestHandler(messages []*models.Message) error { transformed := tr(messages) // no error as errors should be returned in the failures array of TransformationResult + // Ack filtered messages with no further action + messagesToFilter := transformed.Filtered + for _, msg := range messagesToFilter { + if msg.AckFunc != nil { + msg.AckFunc() + } + } + res, err := t.Write(transformed.Result) if err != nil { log.WithFields(log.Fields{"error": err}).Error(err) diff --git a/go.mod b/go.mod index 3914873d..0bce47e0 100644 --- a/go.mod +++ b/go.mod @@ -22,8 +22,8 @@ require ( github.com/twitchscience/kinsumer v0.0.0-00010101000000-000000000000 github.com/urfave/cli v1.22.5 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c - golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect - golang.org/x/tools v0.1.3 // indirect + golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect + golang.org/x/tools v0.1.4 // indirect gopkg.in/stretchr/testify.v1 v1.2.2 // indirect ) diff --git a/go.sum b/go.sum index 3d527c06..7d3888d0 100644 --- a/go.sum +++ b/go.sum @@ -521,8 +521,8 @@ golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3 h1:kzM6+9dur93BcC2kVlYl34cHU golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio= -golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -585,8 +585,8 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.3 h1:L69ShwSZEyCsLKoAxDKeMvLDZkumEe8gXUZAjab0tX8= -golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs= +golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/models/filter_result.go b/pkg/models/filter_result.go new file mode 100644 index 00000000..b1c30da1 --- /dev/null +++ b/pkg/models/filter_result.go @@ -0,0 +1,62 @@ +// PROPRIETARY AND CONFIDENTIAL +// +// Unauthorized copying of this file via any medium is strictly prohibited. +// +// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + +package models + +import ( + "time" + + "github.com/snowplow-devops/stream-replicator/pkg/common" +) + +// FilterResult contains the results from a target write operation +type FilterResult struct { + FilteredCount int64 + + // Filtered holds all the messages that were filtered out and acked without sending to the target + Filtered []*Message + + // Delta between TimePulled and TimeOfAck tells us how well the + // application is at processing filtered data internally + MaxFilterLatency time.Duration + MinFilterLatency time.Duration + AvgFilterLatency time.Duration +} + +// NewFilterResult uses the current time as the timeOfFilter and then calls NewFilterResultWithTime +func NewFilterResult(filtered []*Message) *FilterResult { + return NewFilterResultWithTime(filtered, time.Now().UTC()) +} + +// NewFilterResultWithTime builds a result structure to return from a filtered message slice +// attempt which contains the sfiltered message count as well as several +// derived latency measures. +func NewFilterResultWithTime(filtered []*Message, timeOfFilter time.Time) *FilterResult { + r := FilterResult{ + FilteredCount: int64(len(filtered)), + } + + filteredLen := int64(len(filtered)) + + var sumFilterLatency time.Duration + + for _, msg := range filtered { + filterLatency := timeOfFilter.Sub(msg.TimePulled) + if r.MaxFilterLatency < filterLatency { + r.MaxFilterLatency = filterLatency + } + if r.MinFilterLatency > filterLatency || r.MinFilterLatency == time.Duration(0) { + r.MinFilterLatency = filterLatency + } + sumFilterLatency += filterLatency + } + + if filteredLen > 0 { + r.AvgFilterLatency = common.GetAverageFromDuration(sumFilterLatency, filteredLen) + } + + return &r +} diff --git a/pkg/models/filter_result_test.go b/pkg/models/filter_result_test.go new file mode 100644 index 00000000..5ea0bf09 --- /dev/null +++ b/pkg/models/filter_result_test.go @@ -0,0 +1,72 @@ +// PROPRIETARY AND CONFIDENTIAL +// +// Unauthorized copying of this file via any medium is strictly prohibited. +// +// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + +package models + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewFilterResult_EmptyWithoutTime(t *testing.T) { + assert := assert.New(t) + + r := NewFilterResult(nil) + assert.NotNil(r) + + assert.Equal(int64(0), r.FilteredCount) + + assert.Equal(time.Duration(0), r.MaxFilterLatency) + assert.Equal(time.Duration(0), r.MinFilterLatency) + assert.Equal(time.Duration(0), r.AvgFilterLatency) +} + +func TestNewFilterResult_EmptyWithTime(t *testing.T) { + assert := assert.New(t) + + r := NewFilterResultWithTime(nil, time.Now().UTC()) + assert.NotNil(r) + + assert.Equal(int64(0), r.FilteredCount) + + assert.Equal(time.Duration(0), r.MaxFilterLatency) + assert.Equal(time.Duration(0), r.MinFilterLatency) + assert.Equal(time.Duration(0), r.AvgFilterLatency) +} + +func TestNewFilterResult_WithMessages(t *testing.T) { + assert := assert.New(t) + + timeNow := time.Now().UTC() + + filtered := []*Message{ + { + Data: []byte("Baz"), + PartitionKey: "partition1", + TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute), + }, + { + Data: []byte("Bar"), + PartitionKey: "partition2", + TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-8) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), + }, + } + + r := NewFilterResultWithTime(filtered, timeNow) + assert.NotNil(r) + + assert.Equal(int64(2), r.FilteredCount) + + assert.Equal(time.Duration(8)*time.Minute, r.MaxFilterLatency) + assert.Equal(time.Duration(4)*time.Minute, r.MinFilterLatency) + assert.Equal(time.Duration(6)*time.Minute, r.AvgFilterLatency) +} diff --git a/pkg/models/observer_buffer.go b/pkg/models/observer_buffer.go index 05d5fa51..9c6c05a0 100644 --- a/pkg/models/observer_buffer.go +++ b/pkg/models/observer_buffer.go @@ -20,6 +20,8 @@ type ObserverBuffer struct { MsgFailed int64 MsgTotal int64 + MsgFiltered int64 + OversizedTargetResults int64 OversizedMsgSent int64 OversizedMsgFailed int64 @@ -39,6 +41,9 @@ type ObserverBuffer struct { MaxTransformLatency time.Duration MinTransformLatency time.Duration SumTransformLatency time.Duration + MaxFilterLatency time.Duration + MinFilterLatency time.Duration + SumFilterLatency time.Duration } // AppendWrite adds a normal TargetWriteResult onto the buffer and stores the result @@ -52,7 +57,7 @@ func (b *ObserverBuffer) AppendWrite(res *TargetWriteResult) { b.MsgFailed += res.FailedCount b.MsgTotal += res.Total() - b.append(res) + b.appendWriteResult(res) } // AppendWriteOversized adds an oversized TargetWriteResult onto the buffer and stores the result @@ -66,7 +71,7 @@ func (b *ObserverBuffer) AppendWriteOversized(res *TargetWriteResult) { b.OversizedMsgFailed += res.FailedCount b.OversizedMsgTotal += res.Total() - b.append(res) + b.appendWriteResult(res) } // AppendWriteInvalid adds an invalid TargetWriteResult onto the buffer and stores the result @@ -80,10 +85,10 @@ func (b *ObserverBuffer) AppendWriteInvalid(res *TargetWriteResult) { b.InvalidMsgFailed += res.FailedCount b.InvalidMsgTotal += res.Total() - b.append(res) + b.appendWriteResult(res) } -func (b *ObserverBuffer) append(res *TargetWriteResult) { +func (b *ObserverBuffer) appendWriteResult(res *TargetWriteResult) { if b.MaxProcLatency < res.MaxProcLatency { b.MaxProcLatency = res.MaxProcLatency } @@ -109,6 +114,26 @@ func (b *ObserverBuffer) append(res *TargetWriteResult) { b.SumTransformLatency += res.AvgTransformLatency } +// AppendFiltered adds a FilterResult onto the buffer and stores the result +func (b *ObserverBuffer) AppendFiltered(res *FilterResult) { + if res == nil { + return + } + + b.MsgFiltered += res.FilteredCount + b.appendFilterResult(res) +} + +func (b *ObserverBuffer) appendFilterResult(res *FilterResult) { + if b.MaxFilterLatency < res.MaxFilterLatency { + b.MaxFilterLatency = res.MaxFilterLatency + } + if b.MinFilterLatency > res.MinFilterLatency || b.MinFilterLatency == time.Duration(0) { + b.MinFilterLatency = res.MinFilterLatency + } + b.SumFilterLatency += res.AvgFilterLatency +} + // GetSumResults returns the total number of results logged in the buffer func (b *ObserverBuffer) GetSumResults() int64 { return b.TargetResults + b.OversizedTargetResults + b.InvalidTargetResults @@ -129,10 +154,16 @@ func (b *ObserverBuffer) GetAvgTransformLatency() time.Duration { return common.GetAverageFromDuration(b.SumTransformLatency, b.MsgTotal) } +// GetAvgFilterLatency calculates average filter latency +func (b *ObserverBuffer) GetAvgFilterLatency() time.Duration { + return common.GetAverageFromDuration(b.SumFilterLatency, b.MsgFiltered) +} + func (b *ObserverBuffer) String() string { return fmt.Sprintf( - "TargetResults:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxTransformLatency:%d", + "TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d", b.TargetResults, + b.MsgFiltered, b.MsgSent, b.MsgFailed, b.OversizedTargetResults, @@ -143,6 +174,7 @@ func (b *ObserverBuffer) String() string { b.InvalidMsgFailed, b.MaxProcLatency.Milliseconds(), b.MaxMsgLatency.Milliseconds(), + b.MaxFilterLatency.Milliseconds(), b.MaxTransformLatency.Milliseconds(), ) } diff --git a/pkg/models/observer_buffer_test.go b/pkg/models/observer_buffer_test.go index 833cb25f..1ba43848 100644 --- a/pkg/models/observer_buffer_test.go +++ b/pkg/models/observer_buffer_test.go @@ -46,6 +46,15 @@ func TestObserverBuffer(t *testing.T) { TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), }, } + filtered := []*Message{ + { + Data: []byte("FooBar"), + PartitionKey: "partition4", + TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), + }, + } r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow) @@ -59,11 +68,17 @@ func TestObserverBuffer(t *testing.T) { b.AppendWriteInvalid(r) b.AppendWriteInvalid(nil) + fr := NewFilterResultWithTime(filtered, timeNow) + + b.AppendFiltered(fr) + assert.Equal(int64(2), b.TargetResults) assert.Equal(int64(4), b.MsgSent) assert.Equal(int64(2), b.MsgFailed) assert.Equal(int64(6), b.MsgTotal) + assert.Equal(int64(1), b.MsgFiltered) + assert.Equal(int64(2), b.OversizedTargetResults) assert.Equal(int64(4), b.OversizedMsgSent) assert.Equal(int64(2), b.OversizedMsgFailed) @@ -84,5 +99,9 @@ func TestObserverBuffer(t *testing.T) { assert.Equal(time.Duration(1)*time.Minute, b.MinTransformLatency) assert.Equal(time.Duration(2)*time.Minute, b.GetAvgTransformLatency()) - assert.Equal("TargetResults:2,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxTransformLatency:180000", b.String()) + assert.Equal(time.Duration(10)*time.Minute, b.MaxFilterLatency) + assert.Equal(time.Duration(10)*time.Minute, b.MinFilterLatency) + assert.Equal(time.Duration(10)*time.Minute, b.GetAvgFilterLatency()) + + assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000", b.String()) } diff --git a/pkg/models/transformation_result.go b/pkg/models/transformation_result.go index dfe99dfd..f082d741 100644 --- a/pkg/models/transformation_result.go +++ b/pkg/models/transformation_result.go @@ -7,13 +7,18 @@ package models type TransformationResult struct { - ResultCount int64 - InvalidCount int64 + ResultCount int64 + FilteredCount int64 + InvalidCount int64 // Result holds all the messages that were successfully transformed and // are ready for attempts to send to the target Result []*Message + // Filtered holds all the messages that were designated to be filtered out + // they will all be acked without passing through to any target + Filtered []*Message + // Invalid contains all the messages that cannot be transformed // due to various parseability reasons. These messages cannot be retried // and need to be specially handled. @@ -21,11 +26,13 @@ type TransformationResult struct { } // NewTransformationResult contains slices successfully tranformed and unsuccessfully transformed messages, and their lengths. -func NewTransformationResult(result []*Message, invalid []*Message) *TransformationResult { +func NewTransformationResult(result []*Message, filtered []*Message, invalid []*Message) *TransformationResult { r := TransformationResult{ int64(len(result)), + int64(len(filtered)), int64(len(invalid)), result, + filtered, invalid, } return &r diff --git a/pkg/observer/observer.go b/pkg/observer/observer.go index c78f470a..c709b9b6 100644 --- a/pkg/observer/observer.go +++ b/pkg/observer/observer.go @@ -7,9 +7,10 @@ package observer import ( - log "github.com/sirupsen/logrus" "time" + log "github.com/sirupsen/logrus" + "github.com/snowplow-devops/stream-replicator/pkg/models" "github.com/snowplow-devops/stream-replicator/pkg/statsreceiver/statsreceiveriface" ) @@ -20,6 +21,7 @@ type Observer struct { statsClient statsreceiveriface.StatsReceiver exitSignal chan struct{} stopDone chan struct{} + filteredChan chan *models.FilterResult targetWriteChan chan *models.TargetWriteResult targetWriteOversizedChan chan *models.TargetWriteResult targetWriteInvalidChan chan *models.TargetWriteResult @@ -37,6 +39,7 @@ func New(statsClient statsreceiveriface.StatsReceiver, timeout time.Duration, re statsClient: statsClient, exitSignal: make(chan struct{}), stopDone: make(chan struct{}), + filteredChan: make(chan *models.FilterResult, 1000), targetWriteChan: make(chan *models.TargetWriteResult, 1000), targetWriteOversizedChan: make(chan *models.TargetWriteResult, 1000), targetWriteInvalidChan: make(chan *models.TargetWriteResult, 1000), @@ -73,6 +76,8 @@ func (o *Observer) Start() { o.isRunning = false break ObserverLoop + case res := <-o.filteredChan: + buffer.AppendFiltered(res) case res := <-o.targetWriteChan: buffer.AppendWrite(res) case res := <-o.targetWriteOversizedChan: @@ -107,6 +112,12 @@ func (o *Observer) Stop() { // --- Functions called to push information to observer +// Filtered pushes a filter result onto a channel for processing +// by the observer +func (o *Observer) Filtered(r *models.FilterResult) { + o.filteredChan <- r +} + // TargetWrite pushes a targets write result onto a channel for processing // by the observer func (o *Observer) TargetWrite(r *models.TargetWriteResult) { diff --git a/pkg/observer/observer_test.go b/pkg/observer/observer_test.go index 1a8a534c..8ee409ce 100644 --- a/pkg/observer/observer_test.go +++ b/pkg/observer/observer_test.go @@ -7,10 +7,11 @@ package observer import ( - "github.com/stretchr/testify/assert" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/snowplow-devops/stream-replicator/pkg/models" ) diff --git a/pkg/transform/snowplow_enriched_filter.go b/pkg/transform/snowplow_enriched_filter.go new file mode 100644 index 00000000..2c1c656b --- /dev/null +++ b/pkg/transform/snowplow_enriched_filter.go @@ -0,0 +1,90 @@ +// PROPRIETARY AND CONFIDENTIAL +// +// Unauthorized copying of this file via any medium is strictly prohibited. +// +// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + +package transform + +import ( + "errors" + "fmt" + "regexp" + "strings" + + "github.com/snowplow-devops/stream-replicator/pkg/models" +) + +// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event. +// The filterconfig should describe the conditions for including a message. +// For example "aid=abc|def" includes all events with app IDs of abc or def, and filters out the rest. +// aid!=abc|def includes all events whose app IDs do not match abc or def, and filters out the rest. +func NewSpEnrichedFilterFunction(filterConfig string) (TransformationFunction, error) { + + // This regex prevents whitespace characters in the value provided + regex := `\S+(!=|==)[^\s\|]+((?:\|[^\s|]+)*)$` + re := regexp.MustCompile(regex) + + if !(re.MatchString(filterConfig)) { + // If invalid, return an error which will be returned by the main function + return nil, errors.New("Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]") + } + + // Check for a negation condition first + keyValues := strings.SplitN(filterConfig, "!=", 2) + + // isNegationFilter determines whether a match sets shouldKeepMessage to true or false, and consequently whether message is kept or filtered + var isNegationFilter bool + if len(keyValues) > 1 { + // If negation condition is found, default to keep the message, and change this when match found + isNegationFilter = true + } else { + // Otherwise, look for affirmation condition, default to drop the message and change when match found + keyValues = strings.SplitN(filterConfig, "==", 2) + isNegationFilter = false + } + + return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + // Start by resetting shouldKeepMessage to isNegationFilter + shouldKeepMessage := isNegationFilter + + // Evalute intermediateState to parsedEvent + parsedMessage, parseErr := intermediateAsSpEnrichedParsed(intermediateState, message) + if parseErr != nil { + message.SetError(parseErr) + return nil, nil, message, nil + } + + valueFound, err := parsedMessage.GetValue(keyValues[0]) + + // GetValue returns an error if the field requested is empty. Check for that particular error before failing the message. + if err != nil && err.Error() == fmt.Sprintf("Field %s is empty", keyValues[0]) { + valueFound = nil + } else if err != nil { + message.SetError(err) + return nil, nil, message, nil + } + + evaluation: + for _, valueToMatch := range strings.Split(keyValues[1], "|") { + if valueToMatch == fmt.Sprintf("%v", valueFound) { // coerce to string as valueFound may be any type found in a Snowplow event + if isNegationFilter { + shouldKeepMessage = false + } else { + shouldKeepMessage = true + } + break evaluation + // Once config value is matched once, change shouldKeepMessage, and stop looking for matches + } + } + + // If message is not to be kept, return it as a filtered message to be acked in the main function + if !shouldKeepMessage { + + return nil, message, nil, nil + } + + // Otherwise, return the message and intermediateState for further processing. + return message, nil, nil, parsedMessage + }, nil +} diff --git a/pkg/transform/snowplow_enriched_filter_test.go b/pkg/transform/snowplow_enriched_filter_test.go new file mode 100644 index 00000000..40c8d2e5 --- /dev/null +++ b/pkg/transform/snowplow_enriched_filter_test.go @@ -0,0 +1,214 @@ +// PROPRIETARY AND CONFIDENTIAL +// +// Unauthorized copying of this file via any medium is strictly prohibited. +// +// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + +package transform + +import ( + "testing" + + "github.com/snowplow-devops/stream-replicator/pkg/models" + "github.com/stretchr/testify/assert" +) + +func TestNewSpEnrichedFilterFunction(t *testing.T) { + assert := assert.New(t) + + var messageGood = models.Message{ + Data: snowplowTsv3, + PartitionKey: "some-key", + } + + // Single value cases + aidFilterFuncKeep, _ := NewSpEnrichedFilterFunction("app_id==test-data3") + + // TODO: sort out numbering for fail cases... + aidKeepIn, aidKeepOut, fail, _ := aidFilterFuncKeep(&messageGood, nil) + + assert.Equal(snowplowTsv3, aidKeepIn.Data) + assert.Nil(aidKeepOut) + assert.Nil(fail) + + aidFilterFuncDiscard, _ := NewSpEnrichedFilterFunction("app_id==failThis") + + aidDiscardIn, aidDiscardOut, fail2, _ := aidFilterFuncDiscard(&messageGood, nil) + + assert.Nil(aidDiscardIn) + assert.Equal(snowplowTsv3, aidDiscardOut.Data) + assert.Nil(fail2) + + // int value + urlPrtFilterFuncKeep, _ := NewSpEnrichedFilterFunction("page_urlport==80") + + urlPrtKeepIn, urlPrtKeepOut, fail, _ := urlPrtFilterFuncKeep(&messageGood, nil) + + assert.Equal(snowplowTsv3, urlPrtKeepIn.Data) + assert.Nil(urlPrtKeepOut) + assert.Nil(fail) + + // Multiple value cases + aidFilterFuncKeepWithMultiple, _ := NewSpEnrichedFilterFunction("app_id==someotherValue|test-data3") + + aidMultipleNegationFailedIn, aidMultipleKeepOut, fail3, _ := aidFilterFuncKeepWithMultiple(&messageGood, nil) + + assert.Equal(snowplowTsv3, aidMultipleNegationFailedIn.Data) + assert.Nil(aidMultipleKeepOut) + assert.Nil(fail3) + + aidFilterFuncDiscardWithMultiple, _ := NewSpEnrichedFilterFunction("app_id==someotherValue|failThis") + + aidNegationMultipleIn, aidMultipleDiscardOut, fail3, _ := aidFilterFuncDiscardWithMultiple(&messageGood, nil) + + assert.Nil(aidNegationMultipleIn) + assert.Equal(snowplowTsv3, aidMultipleDiscardOut.Data) + assert.Nil(fail3) + + // Single value negation cases + + aidFilterFuncNegationDiscard, _ := NewSpEnrichedFilterFunction("app_id!=test-data3") + + aidNegationIn, aidNegationOut, fail4, _ := aidFilterFuncNegationDiscard(&messageGood, nil) + + assert.Nil(aidNegationIn) + assert.Equal(snowplowTsv3, aidNegationOut.Data) + assert.Nil(fail4) + + aidFilterFuncNegationKeep, _ := NewSpEnrichedFilterFunction("app_id!=failThis") + + aidNegationFailedIn, aidNegationFailedOut, fail5, _ := aidFilterFuncNegationKeep(&messageGood, nil) + + assert.Equal(snowplowTsv3, aidNegationFailedIn.Data) + assert.Nil(aidNegationFailedOut) + assert.Nil(fail5) + + // Multiple value negation cases + aidFilterFuncNegationDiscardMultiple, _ := NewSpEnrichedFilterFunction("app_id!=someotherValue|test-data1|test-data2|test-data3") + + aidNegationMultipleIn, aidNegationMultipleOut, fail6, _ := aidFilterFuncNegationDiscardMultiple(&messageGood, nil) + + assert.Nil(aidNegationMultipleIn) + assert.Equal(snowplowTsv3, aidNegationMultipleOut.Data) + assert.Nil(fail6) + + aidFilterFuncNegationKeptMultiple, _ := NewSpEnrichedFilterFunction("app_id!=someotherValue|failThis") + + aidMultipleNegationFailedIn, aidMultipleNegationFailedOut, fail7, _ := aidFilterFuncNegationKeptMultiple(&messageGood, nil) + + assert.Equal(snowplowTsv3, aidMultipleNegationFailedIn.Data) + assert.Nil(aidMultipleNegationFailedOut) + assert.Nil(fail7) + + // Filters on a nil field + txnFilterFunctionAffirmation, _ := NewSpEnrichedFilterFunction("txn_id==something") + + nilAffirmationIn, nilAffirmationOut, fail8, _ := txnFilterFunctionAffirmation(&messageGood, nil) + + assert.Nil(nilAffirmationIn) + assert.Equal(snowplowTsv3, nilAffirmationOut.Data) + assert.Nil(fail8) + + txnFilterFunctionNegation, _ := NewSpEnrichedFilterFunction("txn_id!=something") + + nilNegationIn, nilNegationOut, fail8, _ := txnFilterFunctionNegation(&messageGood, nil) + + assert.Equal(snowplowTsv3, nilNegationIn.Data) + assert.Nil(nilNegationOut) + assert.Nil(fail8) +} + +func TestNewSpEnrichedFilterFunction_Error(t *testing.T) { + assert := assert.New(t) + error := `Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]` + + filterFunc, err1 := NewSpEnrichedFilterFunction("") + + assert.Nil(filterFunc) + assert.Equal(error, err1.Error()) + + filterFunc, err2 := NewSpEnrichedFilterFunction("app_id==abc|") + + assert.Nil(filterFunc) + assert.Equal(error, err2.Error()) + + filterFunc, err3 := NewSpEnrichedFilterFunction("!=abc") + + assert.Nil(filterFunc) + assert.Equal(error, err3.Error()) +} + +func TestSpEnrichedFilterFunction_Slice(t *testing.T) { + assert := assert.New(t) + + var filter1Kept = []*models.Message{ + { + Data: snowplowTsv1, + PartitionKey: "some-key", + }, + } + + var filter1Discarded = []*models.Message{ + { + Data: snowplowTsv2, + PartitionKey: "some-key1", + }, + { + Data: snowplowTsv3, + PartitionKey: "some-key2", + }, + } + + filterFunc, _ := NewSpEnrichedFilterFunction("app_id==test-data1") + + filter1 := NewTransformation(filterFunc) + filter1Res := filter1(messages) + + assert.Equal(len(filter1Kept), len(filter1Res.Result)) + assert.Equal(len(filter1Discarded), len(filter1Res.Filtered)) + assert.Equal(1, len(filter1Res.Invalid)) + + var filter2Kept = []*models.Message{ + { + Data: snowplowTsv1, + PartitionKey: "some-key", + }, + { + Data: snowplowTsv2, + PartitionKey: "some-key1", + }, + } + + var filter2Discarded = []*models.Message{ + + { + Data: snowplowTsv3, + PartitionKey: "some-key2", + }, + } + + filterFunc2, _ := NewSpEnrichedFilterFunction("app_id==test-data1|test-data2") + + filter2 := NewTransformation(filterFunc2) + filter2Res := filter2(messages) + + assert.Equal(len(filter2Kept), len(filter2Res.Result)) + assert.Equal(len(filter2Discarded), len(filter2Res.Filtered)) + assert.Equal(1, len(filter2Res.Invalid)) + + var expectedFilter3 = []*models.Message{ + { + Data: snowplowTsv3, + PartitionKey: "some-key3", + }, + } + + filterFunc3, _ := NewSpEnrichedFilterFunction("app_id!=test-data1|test-data2") + + filter3 := NewTransformation(filterFunc3) + filter3Res := filter3(messages) + + assert.Equal(len(expectedFilter3), len(filter3Res.Result)) + assert.Equal(1, len(filter3Res.Invalid)) + +} diff --git a/pkg/transform/snowplow_enriched_set_pk.go b/pkg/transform/snowplow_enriched_set_pk.go index 5ae3c7f8..e981a968 100644 --- a/pkg/transform/snowplow_enriched_set_pk.go +++ b/pkg/transform/snowplow_enriched_set_pk.go @@ -10,32 +10,24 @@ import ( "fmt" "github.com/snowplow-devops/stream-replicator/pkg/models" - "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) // NewSpEnrichedSetPkFunction returns a TransformationFunction which sets the partition key of a message to a field within a Snowplow enriched event func NewSpEnrichedSetPkFunction(pkField string) TransformationFunction { - return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, interface{}) { - // To avoid parsing message multiple times, we check for intermediateState and save the parsed message to it if there is none. - // Note that this will overwrite any differently typed intermediateState - in such a case order of execution matters. - var parsedMessage, ok = intermediateState.(analytics.ParsedEvent) - var parseErr error - if ok { - parsedMessage = intermediateState.(analytics.ParsedEvent) - } else { - parsedMessage, parseErr = analytics.ParseEvent(string(message.Data)) - if parseErr != nil { - message.SetError(parseErr) - return nil, message, nil - } - intermediateState = parsedMessage + return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + // Evalute intermediateState to parsedEvent + parsedMessage, parseErr := intermediateAsSpEnrichedParsed(intermediateState, message) + if parseErr != nil { + message.SetError(parseErr) + return nil, nil, message, nil } + pk, err := parsedMessage.GetValue(pkField) if err != nil { message.SetError(err) - return nil, message, nil + return nil, nil, message, nil } message.PartitionKey = fmt.Sprintf("%v", pk) - return message, nil, intermediateState + return message, nil, nil, parsedMessage } } diff --git a/pkg/transform/snowplow_enriched_set_pk_test.go b/pkg/transform/snowplow_enriched_set_pk_test.go index 717ea403..1ba055a5 100644 --- a/pkg/transform/snowplow_enriched_set_pk_test.go +++ b/pkg/transform/snowplow_enriched_set_pk_test.go @@ -29,15 +29,15 @@ func TestNewSpEnrichedSetPkFunction(t *testing.T) { // Simple success cases for different datatypes aidSetPkFunc := NewSpEnrichedSetPkFunction("app_id") - stringAsPk, fail, intermediate := aidSetPkFunc(&messageGood, nil) + stringAsPk, _, fail, intermediate := aidSetPkFunc(&messageGood, nil) - assert.Equal("test-data", stringAsPk.PartitionKey) + assert.Equal("test-data3", stringAsPk.PartitionKey) assert.Equal(spTsv3Parsed, intermediate) assert.Nil(fail) ctstampSetPkFunc := NewSpEnrichedSetPkFunction("collector_tstamp") - tstampAsPk, fail, intermediate := ctstampSetPkFunc(&messageGood, nil) + tstampAsPk, _, fail, intermediate := ctstampSetPkFunc(&messageGood, nil) assert.Equal("2019-05-10 14:40:29.576 +0000 UTC", tstampAsPk.PartitionKey) assert.Equal(spTsv3Parsed, intermediate) @@ -45,14 +45,14 @@ func TestNewSpEnrichedSetPkFunction(t *testing.T) { pgurlportSetPkFunc := NewSpEnrichedSetPkFunction("page_urlport") - intAsPk, fail, intermediate := pgurlportSetPkFunc(&messageGood, nil) + intAsPk, _, fail, intermediate := pgurlportSetPkFunc(&messageGood, nil) assert.Equal("80", intAsPk.PartitionKey) assert.Equal(spTsv3Parsed, intermediate) assert.Nil(fail) // Simple failure case - failureCase, fail, intermediate := aidSetPkFunc(&messageBad, nil) + failureCase, _, fail, intermediate := aidSetPkFunc(&messageBad, nil) assert.Nil(failureCase) assert.Nil(intermediate) @@ -68,12 +68,12 @@ func TestNewSpEnrichedSetPkFunction(t *testing.T) { expected := models.Message{ Data: snowplowTsv1, - PartitionKey: "test-data", + PartitionKey: "test-data1", } incompatibleIntermediate := "Incompatible intermediate state" // When we have some incompatible intermediateState, expected behaviour is to replace it with this transformation's intermediateState - stringAsPkIncompat, failIncompat, intermediate := aidSetPkFunc(&incompatibleIntermediateMessage, incompatibleIntermediate) + stringAsPkIncompat, _, failIncompat, intermediate := aidSetPkFunc(&incompatibleIntermediateMessage, incompatibleIntermediate) assert.Equal(&expected, stringAsPkIncompat) assert.Equal(spTsv1Parsed, intermediate) diff --git a/pkg/transform/snowplow_enriched_to_json.go b/pkg/transform/snowplow_enriched_to_json.go index 97ae4d6d..505ba812 100644 --- a/pkg/transform/snowplow_enriched_to_json.go +++ b/pkg/transform/snowplow_enriched_to_json.go @@ -8,30 +8,22 @@ package transform import ( "github.com/snowplow-devops/stream-replicator/pkg/models" - "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) // SpEnrichedToJson is a specific transformation implementation to transform good enriched data within a message to Json -func SpEnrichedToJson(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, interface{}) { - // To avoid parsing message multiple times, we check for intermediateState and save the parsed message to it if there is none. - // Note that this will overwrite any differently typed intermediateState - in such a case order of execution matters. - var parsedMessage, ok = intermediateState.(analytics.ParsedEvent) - var parseErr error - if ok { - parsedMessage = intermediateState.(analytics.ParsedEvent) - } else { - parsedMessage, parseErr = analytics.ParseEvent(string(message.Data)) - if parseErr != nil { - message.SetError(parseErr) - return nil, message, nil - } - intermediateState = parsedMessage +func SpEnrichedToJson(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + // Evalute intermediateState to parsedEvent + parsedMessage, parseErr := intermediateAsSpEnrichedParsed(intermediateState, message) + if parseErr != nil { + message.SetError(parseErr) + return nil, nil, message, nil } + jsonMessage, err := parsedMessage.ToJson() if err != nil { message.SetError(err) - return nil, message, nil + return nil, nil, message, nil } message.Data = jsonMessage - return message, nil, intermediateState + return message, nil, nil, parsedMessage } diff --git a/pkg/transform/snowplow_enriched_to_json_test.go b/pkg/transform/snowplow_enriched_to_json_test.go index de72ed5c..42323b0e 100644 --- a/pkg/transform/snowplow_enriched_to_json_test.go +++ b/pkg/transform/snowplow_enriched_to_json_test.go @@ -32,14 +32,14 @@ func TestSpEnrichedToJson(t *testing.T) { } // Simple success case - transformSuccess, failure, intermediate := SpEnrichedToJson(&messageGood, nil) + transformSuccess, _, failure, intermediate := SpEnrichedToJson(&messageGood, nil) assert.Equal(&expectedGood, transformSuccess) assert.Equal(spTsv1Parsed, intermediate) assert.Nil(failure) // Simple failure case - success, transformFailure, intermediate := SpEnrichedToJson(&messageBad, nil) + success, _, transformFailure, intermediate := SpEnrichedToJson(&messageBad, nil) // Not matching equivalence of whole object because error stacktrace makes it unfeasible. Doing each component part instead. assert.Equal("Cannot parse tsv event - wrong number of fields provided: 4", transformFailure.GetError().Error()) @@ -62,7 +62,7 @@ func TestSpEnrichedToJson(t *testing.T) { incompatibleIntermediate := "Incompatible intermediate state" // When we have some incompatible IntermediateState, expected behaviour is to replace it with this transformation's IntermediateState - transformSuccess2, failure2, intermediate2 := SpEnrichedToJson(&incompatibleIntermediateMessage, incompatibleIntermediate) + transformSuccess2, _, failure2, intermediate2 := SpEnrichedToJson(&incompatibleIntermediateMessage, incompatibleIntermediate) assert.Equal(&expectedGood, transformSuccess2) assert.Equal(spTsv1Parsed, intermediate2) diff --git a/pkg/transform/snowplow_enriched_util.go b/pkg/transform/snowplow_enriched_util.go new file mode 100644 index 00000000..0ce986ac --- /dev/null +++ b/pkg/transform/snowplow_enriched_util.go @@ -0,0 +1,25 @@ +// PROPRIETARY AND CONFIDENTIAL +// +// Unauthorized copying of this file via any medium is strictly prohibited. +// +// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + +package transform + +import ( + "github.com/snowplow-devops/stream-replicator/pkg/models" + "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" +) + +func intermediateAsSpEnrichedParsed(intermediateState interface{}, message *models.Message) (analytics.ParsedEvent, error) { + var parsedMessage, ok = intermediateState.(analytics.ParsedEvent) + var parseErr error + if ok { + return parsedMessage, nil + } + parsedMessage, parseErr = analytics.ParseEvent(string(message.Data)) + if parseErr != nil { + return nil, parseErr + } + return parsedMessage, nil +} diff --git a/pkg/transform/transform.go b/pkg/transform/transform.go index 5d8f2678..6d389c96 100644 --- a/pkg/transform/transform.go +++ b/pkg/transform/transform.go @@ -13,7 +13,7 @@ import ( ) // TransformationFunctions modify their inputs -type TransformationFunction func(*models.Message, interface{}) (*models.Message, *models.Message, interface{}) +type TransformationFunction func(*models.Message, interface{}) (*models.Message, *models.Message, *models.Message, interface{}) // The transformationApplyFunction dereferences messages before running transformations type TransformationApplyFunction func([]*models.Message) *models.TransformationResult @@ -23,35 +23,40 @@ type TransformationGenerator func(...TransformationFunction) TransformationApply // NewTransformation constructs a function which applies all transformations to all messages, returning a TransformationResult. func NewTransformation(tranformFunctions ...TransformationFunction) TransformationApplyFunction { return func(messages []*models.Message) *models.TransformationResult { - successes := make([]*models.Message, 0, len(messages)) - failures := make([]*models.Message, 0, len(messages)) + successList := make([]*models.Message, 0, len(messages)) + filteredList := make([]*models.Message, 0, len(messages)) + failureList := make([]*models.Message, 0, len(messages)) // If no transformations, just return the result rather than shuffling data between slices if len(tranformFunctions) == 0 { - return models.NewTransformationResult(messages, failures) + return models.NewTransformationResult(messages, filteredList, failureList) } for _, message := range messages { msg := *message // dereference to avoid amending input success := &msg // success must be both input and output to a TransformationFunction, so we make this pointer. var failure *models.Message + var filtered *models.Message var intermediate interface{} for _, transformFunction := range tranformFunctions { // Overwrite the input for each iteration in sequence of transformations, // since the desired result is a single transformed message with a nil failure, or a nil message with a single failure - success, failure, intermediate = transformFunction(success, intermediate) - if failure != nil { + success, filtered, failure, intermediate = transformFunction(success, intermediate) + if failure != nil || filtered != nil { break } } if success != nil { success.TimeTransformed = time.Now().UTC() - successes = append(successes, success) + successList = append(successList, success) + } + // We don't append TimeTransformed in the failure or filtered cases, as it is less useful, and likely to skew metrics + if filtered != nil { + filteredList = append(filteredList, filtered) } if failure != nil { - // We don't append TimeTransformed in the failure case, as it is less useful, and likely to skew metrics - failures = append(failures, failure) + failureList = append(failureList, failure) } } - return models.NewTransformationResult(successes, failures) + return models.NewTransformationResult(successList, filteredList, failureList) } } diff --git a/pkg/transform/transform_test.go b/pkg/transform/transform_test.go index 5f73271a..e8ebf449 100644 --- a/pkg/transform/transform_test.go +++ b/pkg/transform/transform_test.go @@ -14,25 +14,6 @@ import ( "github.com/stretchr/testify/assert" ) -var messages = []*models.Message{ - { - Data: snowplowTsv1, - PartitionKey: "some-key", - }, - { - Data: snowplowTsv2, - PartitionKey: "some-key1", - }, - { - Data: snowplowTsv3, - PartitionKey: "some-key2", - }, - { - Data: nonSnowplowString, - PartitionKey: "some-key4", - }, -} - // To test a function which creates a function, we're creating the function then testing that. Not sure if there's a better way? func TestNewTransformation_Passthrough(t *testing.T) { assert := assert.New(t) @@ -57,7 +38,7 @@ func TestNewTransformation_Passthrough(t *testing.T) { }, } - expectedNoTransformRes := models.NewTransformationResult(expected, make([]*models.Message, 0, 0)) + expectedNoTransformRes := models.NewTransformationResult(expected, make([]*models.Message, 0, 0), make([]*models.Message, 0, 0)) noTransform := NewTransformation(make([]TransformationFunction, 0, 0)...) noTransformResult := noTransform(messages) @@ -104,21 +85,39 @@ func TestNewTransformation_EnrichedToJson(t *testing.T) { assert.Equal("some-key4", enrichJsonRes.Invalid[0].PartitionKey) } +func Benchmark_Transform_EnrichToJson(b *testing.B) { + tranformEnrichJson := NewTransformation(SpEnrichedToJson) + for i := 0; i < b.N; i++ { + tranformEnrichJson(messages) + } +} + +func testfunc(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + return message, nil, nil, nil +} + +func Benchmark_Transform_Passthrough(b *testing.B) { + tranformPassthrough := NewTransformation(testfunc) + for i := 0; i < b.N; i++ { + tranformPassthrough(messages) + } +} + func TestNewTransformation_Multiple(t *testing.T) { assert := assert.New(t) var expectedGood = []*models.Message{ { Data: snowplowJson1, - PartitionKey: "test-data", + PartitionKey: "test-data1", }, { Data: snowplowJson2, - PartitionKey: "test-data", + PartitionKey: "test-data2", }, { Data: snowplowJson3, - PartitionKey: "test-data", + PartitionKey: "test-data3", }, } diff --git a/pkg/transform/transform_test_variables.go b/pkg/transform/transform_test_variables.go index afe6fc83..6c3c08b9 100644 --- a/pkg/transform/transform_test_variables.go +++ b/pkg/transform/transform_test_variables.go @@ -1,17 +1,39 @@ package transform -import "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" +import ( + "github.com/snowplow-devops/stream-replicator/pkg/models" + "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" +) -var snowplowTsv1 = []byte(`test-data pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `) +var snowplowTsv1 = []byte(`test-data1 pc 2019-05-10 14:40:37.436 2019-05-10 14:40:35.972 2019-05-10 14:40:35.551 unstruct e9234345-f042-46ad-b1aa-424464066a33 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 d26822f5-52cc-4292-8f77-14ef6b7a27e2 {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0","data":{"sku":"item41","quantity":2,"unitPrice":32.4,"currency":"GBP"}}} python-requests/2.21.0 2019-05-10 14:40:35.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:35.972 com.snowplowanalytics.snowplow add_to_cart jsonschema 1-0-0 `) var spTsv1Parsed, _ = analytics.ParseEvent(string(snowplowTsv1)) -var snowplowJson1 = []byte(`{"app_id":"test-data","collector_tstamp":"2019-05-10T14:40:35.972Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:35.972Z","dvce_created_tstamp":"2019-05-10T14:40:35.551Z","dvce_sent_tstamp":"2019-05-10T14:40:35Z","etl_tstamp":"2019-05-10T14:40:37.436Z","event":"unstruct","event_format":"jsonschema","event_id":"e9234345-f042-46ad-b1aa-424464066a33","event_name":"add_to_cart","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"d26822f5-52cc-4292-8f77-14ef6b7a27e2","platform":"pc","unstruct_event_com_snowplowanalytics_snowplow_add_to_cart_1":{"currency":"GBP","quantity":2,"sku":"item41","unitPrice":32.4},"user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) +var snowplowJson1 = []byte(`{"app_id":"test-data1","collector_tstamp":"2019-05-10T14:40:35.972Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:35.972Z","dvce_created_tstamp":"2019-05-10T14:40:35.551Z","dvce_sent_tstamp":"2019-05-10T14:40:35Z","etl_tstamp":"2019-05-10T14:40:37.436Z","event":"unstruct","event_format":"jsonschema","event_id":"e9234345-f042-46ad-b1aa-424464066a33","event_name":"add_to_cart","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"d26822f5-52cc-4292-8f77-14ef6b7a27e2","platform":"pc","unstruct_event_com_snowplowanalytics_snowplow_add_to_cart_1":{"currency":"GBP","quantity":2,"sku":"item41","unitPrice":32.4},"user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) -var snowplowTsv2 = []byte(`test-data pc 2019-05-10 14:40:32.392 2019-05-10 14:40:31.105 2019-05-10 14:40:30.218 transaction_item 5071169f-3050-473f-b03f-9748319b1ef2 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 68220ade-307b-4898-8e25-c4c8ac92f1d7 transaction item58 35.87 1 python-requests/2.21.0 2019-05-10 14:40:30.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:31.105 com.snowplowanalytics.snowplow transaction_item jsonschema 1-0-0 `) +var snowplowTsv2 = []byte(`test-data2 pc 2019-05-10 14:40:32.392 2019-05-10 14:40:31.105 2019-05-10 14:40:30.218 transaction_item 5071169f-3050-473f-b03f-9748319b1ef2 py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 68220ade-307b-4898-8e25-c4c8ac92f1d7 transaction item58 35.87 1 python-requests/2.21.0 2019-05-10 14:40:30.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:31.105 com.snowplowanalytics.snowplow transaction_item jsonschema 1-0-0 `) var spTsv2Parsed, _ = analytics.ParseEvent(string(snowplowTsv2)) -var snowplowJson2 = []byte(`{"app_id":"test-data","collector_tstamp":"2019-05-10T14:40:31.105Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:31.105Z","dvce_created_tstamp":"2019-05-10T14:40:30.218Z","dvce_sent_tstamp":"2019-05-10T14:40:30Z","etl_tstamp":"2019-05-10T14:40:32.392Z","event":"transaction_item","event_format":"jsonschema","event_id":"5071169f-3050-473f-b03f-9748319b1ef2","event_name":"transaction_item","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"68220ade-307b-4898-8e25-c4c8ac92f1d7","platform":"pc","ti_orderid":"transaction\u003cbuilt-in function input\u003e","ti_price":35.87,"ti_quantity":1,"ti_sku":"item58","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) +var snowplowJson2 = []byte(`{"app_id":"test-data2","collector_tstamp":"2019-05-10T14:40:31.105Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:31.105Z","dvce_created_tstamp":"2019-05-10T14:40:30.218Z","dvce_sent_tstamp":"2019-05-10T14:40:30Z","etl_tstamp":"2019-05-10T14:40:32.392Z","event":"transaction_item","event_format":"jsonschema","event_id":"5071169f-3050-473f-b03f-9748319b1ef2","event_name":"transaction_item","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"68220ade-307b-4898-8e25-c4c8ac92f1d7","platform":"pc","ti_orderid":"transaction\u003cbuilt-in function input\u003e","ti_price":35.87,"ti_quantity":1,"ti_sku":"item58","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) -var snowplowTsv3 = []byte(`test-data pc 2019-05-10 14:40:30.836 2019-05-10 14:40:29.576 2019-05-10 14:40:29.204 page_view e8aef68d-8533-45c6-a672-26a0f01be9bd py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 b66c4a12-8584-4c7a-9a5d-7c96f59e2556 www.demo-site.com/campaign-landing-page landing-page 80 www.demo-site.com/campaign-landing-page python-requests/2.21.0 2019-05-10 14:40:29.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:29.576 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0 `) +var snowplowTsv3 = []byte(`test-data3 pc 2019-05-10 14:40:30.836 2019-05-10 14:40:29.576 2019-05-10 14:40:29.204 page_view e8aef68d-8533-45c6-a672-26a0f01be9bd py-0.8.2 ssc-0.15.0-googlepubsub beam-enrich-0.2.0-common-0.36.0 user 18.194.133.57 b66c4a12-8584-4c7a-9a5d-7c96f59e2556 www.demo-site.com/campaign-landing-page landing-page 80 www.demo-site.com/campaign-landing-page python-requests/2.21.0 2019-05-10 14:40:29.000 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0","data":{"deviceBrand":"Unknown","deviceName":"Unknown","operatingSystemName":"Unknown","agentVersionMajor":"2","layoutEngineVersionMajor":"??","deviceClass":"Unknown","agentNameVersionMajor":"python-requests 2","operatingSystemClass":"Unknown","layoutEngineName":"Unknown","agentName":"python-requests","agentVersion":"2.21.0","layoutEngineClass":"Unknown","agentNameVersion":"python-requests 2.21.0","operatingSystemVersion":"??","agentClass":"Special","layoutEngineVersion":"??"}}]} 2019-05-10 14:40:29.576 com.snowplowanalytics.snowplow page_view jsonschema 1-0-0 `) var spTsv3Parsed, _ = analytics.ParseEvent(string(snowplowTsv3)) -var snowplowJson3 = []byte(`{"app_id":"test-data","collector_tstamp":"2019-05-10T14:40:29.576Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:29.576Z","dvce_created_tstamp":"2019-05-10T14:40:29.204Z","dvce_sent_tstamp":"2019-05-10T14:40:29Z","etl_tstamp":"2019-05-10T14:40:30.836Z","event":"page_view","event_format":"jsonschema","event_id":"e8aef68d-8533-45c6-a672-26a0f01be9bd","event_name":"page_view","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"b66c4a12-8584-4c7a-9a5d-7c96f59e2556","page_title":"landing-page","page_url":"www.demo-site.com/campaign-landing-page","page_urlpath":"www.demo-site.com/campaign-landing-page","page_urlport":80,"platform":"pc","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) +var snowplowJson3 = []byte(`{"app_id":"test-data3","collector_tstamp":"2019-05-10T14:40:29.576Z","contexts_nl_basjes_yauaa_context_1":[{"agentClass":"Special","agentName":"python-requests","agentNameVersion":"python-requests 2.21.0","agentNameVersionMajor":"python-requests 2","agentVersion":"2.21.0","agentVersionMajor":"2","deviceBrand":"Unknown","deviceClass":"Unknown","deviceName":"Unknown","layoutEngineClass":"Unknown","layoutEngineName":"Unknown","layoutEngineVersion":"??","layoutEngineVersionMajor":"??","operatingSystemClass":"Unknown","operatingSystemName":"Unknown","operatingSystemVersion":"??"}],"derived_tstamp":"2019-05-10T14:40:29.576Z","dvce_created_tstamp":"2019-05-10T14:40:29.204Z","dvce_sent_tstamp":"2019-05-10T14:40:29Z","etl_tstamp":"2019-05-10T14:40:30.836Z","event":"page_view","event_format":"jsonschema","event_id":"e8aef68d-8533-45c6-a672-26a0f01be9bd","event_name":"page_view","event_vendor":"com.snowplowanalytics.snowplow","event_version":"1-0-0","network_userid":"b66c4a12-8584-4c7a-9a5d-7c96f59e2556","page_title":"landing-page","page_url":"www.demo-site.com/campaign-landing-page","page_urlpath":"www.demo-site.com/campaign-landing-page","page_urlport":80,"platform":"pc","user_id":"user\u003cbuilt-in function input\u003e","user_ipaddress":"18.194.133.57","useragent":"python-requests/2.21.0","v_collector":"ssc-0.15.0-googlepubsub","v_etl":"beam-enrich-0.2.0-common-0.36.0","v_tracker":"py-0.8.2"}`) var nonSnowplowString = []byte(`not a snowplow event`) + +var messages = []*models.Message{ + { + Data: snowplowTsv1, + PartitionKey: "some-key", + }, + { + Data: snowplowTsv2, + PartitionKey: "some-key1", + }, + { + Data: snowplowTsv3, + PartitionKey: "some-key2", + }, + { + Data: nonSnowplowString, + PartitionKey: "some-key4", + }, +}