Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spEnrichedFilter #60

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ba71200
Add spEnrichedFilter
colmsnowplow Jun 16, 2021
fc495fd
Address type coercion in valueFound and add test case for it
colmsnowplow Jun 23, 2021
d9cd827
Remove unnecessary note
colmsnowplow Jun 23, 2021
cd5e8a4
Move initial evaluation of filterConfig outside returned function and…
colmsnowplow Jun 28, 2021
39c4c8c
Add missing copyright notice
colmsnowplow Jun 28, 2021
d09af8a
Move interpretation of intermediateState into utility function
colmsnowplow Jun 28, 2021
1469b99
Use utility function to handle intermediateState for legacy implement…
colmsnowplow Jun 28, 2021
5a5002a
Refactor filtering
colmsnowplow Jul 7, 2021
03793bf
Cleanup
colmsnowplow Jul 7, 2021
c5f7064
Break transformation loop if message is filtered
colmsnowplow Jul 8, 2021
0e019f7
Update observer model to accommodate filtered messages
colmsnowplow Jul 8, 2021
e98d953
Fix bug which fails messages where the requested field is empty
colmsnowplow Jul 8, 2021
8428a31
Cleanup
colmsnowplow Jul 8, 2021
3b10912
Improve readability of filtering logic
colmsnowplow Jul 13, 2021
2d46288
Improve invalid filter config error message
colmsnowplow Jul 13, 2021
79ebc62
Fix doc for observer Filtered function
colmsnowplow Jul 13, 2021
09a8e0d
Patch broken config test
colmsnowplow Jul 13, 2021
9e49dfd
Rename intermediateAsParsed to intermediateAsSpEnrichedParsed
colmsnowplow Jul 13, 2021
83f36db
Fix lint issues
colmsnowplow Jul 13, 2021
1c40689
Remove unnecessary else block in intermediateAsSpEnrichedParsed
colmsnowplow Jul 13, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
transformed := tr(messages)
// no error as errors should be returned in the failures array of TransformationResult

// Ack filtered messages with no further action
messagesToFilter := transformed.Filtered
for _, msg := range messagesToFilter {
if msg.AckFunc != nil {
msg.AckFunc()
}
}
// Push filter result to observer
filterRes := models.NewFilterResult(messagesToFilter)
o.Filtered(filterRes)

// Send message buffer
messagesToSend := transformed.Result

Expand Down
8 changes: 7 additions & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,15 @@ func (c *Config) GetTransformations() (transform.TransformationApplyFunction, er
funcs = append(funcs, transform.SpEnrichedToJson)
case "spEnrichedSetPk":
funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(funcOpts[1]))
case "spEnrichedFilter":
colmsnowplow marked this conversation as resolved.
Show resolved Hide resolved
filterFunc, err := transform.NewSpEnrichedFilterFunction(funcOpts[1])
if err != nil {
return nil, err
}
funcs = append(funcs, filterFunc)
case "none":
default:
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got '%s'", c.Transformation))
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got '%s'", c.Transformation))
}
}
return transform.NewTransformation(funcs...), nil
Expand Down
19 changes: 18 additions & 1 deletion cmd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,24 @@ func TestNewConfig_InvalidTransformation(t *testing.T) {
transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got 'fake'", err.Error())
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got 'fake'", err.Error())
}

func TestNewConfig_FilterFailure(t *testing.T) {
assert := assert.New(t)

defer os.Unsetenv("MESSAGE_TRANSFORMATION")

os.Setenv("MESSAGE_TRANSFORMATION", "spEnrichedFilter:incompatibleArg")

c, err := NewConfig()
assert.NotNil(c)
assert.Nil(err)

transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal(`Filter Function Config does not match regex \S+(!=|==)[^\s\|]+((?:\|[^\s|]+)*)$`, err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice having something more meaningful in the error message than an enormous regex to help in actually solving the problem - if it's too much information to put in an error message maybe a link back to some documentation would help instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wrote this same comment during my review when I was trying to understand what the regex did but that comment seems to have vanished 🤷. This regex is hard enough to understand and I agree, doesn't make it clear what valid examples would be from my point of view. I think one or two, short, valid examples would be better in the error.

}

func TestNewConfig_InvalidTarget(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions cmd/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ func ServerlessRequestHandler(messages []*models.Message) error {
transformed := tr(messages)
// no error as errors should be returned in the failures array of TransformationResult

// Ack filtered messages with no further action
messagesToFilter := transformed.Filtered
for _, msg := range messagesToFilter {
if msg.AckFunc != nil {
msg.AckFunc()
}
}

res, err := t.Write(transformed.Result)
if err != nil {
log.WithFields(log.Fields{"error": err}).Error(err)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ require (
github.com/twitchscience/kinsumer v0.0.0-00010101000000-000000000000
github.com/urfave/cli v1.22.5
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect
golang.org/x/tools v0.1.3 // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/tools v0.1.4 // indirect
gopkg.in/stretchr/testify.v1 v1.2.2 // indirect
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3 h1:kzM6+9dur93BcC2kVlYl34cHU
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -585,8 +585,8 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.3 h1:L69ShwSZEyCsLKoAxDKeMvLDZkumEe8gXUZAjab0tX8=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
62 changes: 62 additions & 0 deletions pkg/models/filter_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// PROPRIETARY AND CONFIDENTIAL
//
// Unauthorized copying of this file via any medium is strictly prohibited.
//
// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.

package models

import (
"time"

"github.com/snowplow-devops/stream-replicator/pkg/common"
)

// TargetWriteResult contains the results from a target write operation
type FilterResult struct {
FilteredCount int64

// Filtered holds all the messages that were filtered out and acked without sending to the target
Filtered []*Message

// Delta between TimePulled and TimeOfAck tells us how well the
// application is at processing filtered data internally
MaxFilterLatency time.Duration
MinFilterLatency time.Duration
AvgFilterLatency time.Duration
}

// NewFilterWriteResult uses the current time as the timeOfFilter and then calls NewFilterResultWithTime
func NewFilterResult(filtered []*Message) *FilterResult {
return NewFilterResultWithTime(filtered, time.Now().UTC())
}

// NewFilterResultWithTime builds a result structure to return from a filtered message slice
// attempt which contains the sfiltered message count as well as several
// derived latency measures.
func NewFilterResultWithTime(filtered []*Message, timeOfFilter time.Time) *FilterResult {
r := FilterResult{
FilteredCount: int64(len(filtered)),
}

filteredLen := int64(len(filtered))

var sumFilterLatency time.Duration

for _, msg := range filtered {
filterLatency := timeOfFilter.Sub(msg.TimePulled)
if r.MaxFilterLatency < filterLatency {
r.MaxFilterLatency = filterLatency
}
if r.MinFilterLatency > filterLatency || r.MinFilterLatency == time.Duration(0) {
r.MinFilterLatency = filterLatency
}
sumFilterLatency += filterLatency
}

if filteredLen > 0 {
r.AvgFilterLatency = common.GetAverageFromDuration(sumFilterLatency, filteredLen)
}

return &r
}
72 changes: 72 additions & 0 deletions pkg/models/filter_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// PROPRIETARY AND CONFIDENTIAL
//
// Unauthorized copying of this file via any medium is strictly prohibited.
//
// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.

package models

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewFilterResult_EmptyWithoutTime(t *testing.T) {
assert := assert.New(t)

r := NewFilterResult(nil)
assert.NotNil(r)

assert.Equal(int64(0), r.FilteredCount)

assert.Equal(time.Duration(0), r.MaxFilterLatency)
assert.Equal(time.Duration(0), r.MinFilterLatency)
assert.Equal(time.Duration(0), r.AvgFilterLatency)
}

func TestNewFilterResult_EmptyWithTime(t *testing.T) {
assert := assert.New(t)

r := NewFilterResultWithTime(nil, time.Now().UTC())
assert.NotNil(r)

assert.Equal(int64(0), r.FilteredCount)

assert.Equal(time.Duration(0), r.MaxFilterLatency)
assert.Equal(time.Duration(0), r.MinFilterLatency)
assert.Equal(time.Duration(0), r.AvgFilterLatency)
}

func TestNewFilterResult_WithMessages(t *testing.T) {
assert := assert.New(t)

timeNow := time.Now().UTC()

filtered := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-8) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
},
}

r := NewFilterResultWithTime(filtered, timeNow)
assert.NotNil(r)

assert.Equal(int64(2), r.FilteredCount)

assert.Equal(time.Duration(8)*time.Minute, r.MaxFilterLatency)
assert.Equal(time.Duration(4)*time.Minute, r.MinFilterLatency)
assert.Equal(time.Duration(6)*time.Minute, r.AvgFilterLatency)
}
42 changes: 37 additions & 5 deletions pkg/models/observer_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type ObserverBuffer struct {
MsgFailed int64
MsgTotal int64

MsgFiltered int64

OversizedTargetResults int64
OversizedMsgSent int64
OversizedMsgFailed int64
Expand All @@ -39,6 +41,9 @@ type ObserverBuffer struct {
MaxTransformLatency time.Duration
MinTransformLatency time.Duration
SumTransformLatency time.Duration
MaxFilterLatency time.Duration
MinFilterLatency time.Duration
SumFilterLatency time.Duration
}

// AppendWrite adds a normal TargetWriteResult onto the buffer and stores the result
Expand All @@ -52,7 +57,7 @@ func (b *ObserverBuffer) AppendWrite(res *TargetWriteResult) {
b.MsgFailed += res.FailedCount
b.MsgTotal += res.Total()

b.append(res)
b.appendWriteResult(res)
}

// AppendWriteOversized adds an oversized TargetWriteResult onto the buffer and stores the result
Expand All @@ -66,7 +71,7 @@ func (b *ObserverBuffer) AppendWriteOversized(res *TargetWriteResult) {
b.OversizedMsgFailed += res.FailedCount
b.OversizedMsgTotal += res.Total()

b.append(res)
b.appendWriteResult(res)
}

// AppendWriteInvalid adds an invalid TargetWriteResult onto the buffer and stores the result
Expand All @@ -80,10 +85,10 @@ func (b *ObserverBuffer) AppendWriteInvalid(res *TargetWriteResult) {
b.InvalidMsgFailed += res.FailedCount
b.InvalidMsgTotal += res.Total()

b.append(res)
b.appendWriteResult(res)
}

func (b *ObserverBuffer) append(res *TargetWriteResult) {
func (b *ObserverBuffer) appendWriteResult(res *TargetWriteResult) {
if b.MaxProcLatency < res.MaxProcLatency {
b.MaxProcLatency = res.MaxProcLatency
}
Expand All @@ -109,6 +114,26 @@ func (b *ObserverBuffer) append(res *TargetWriteResult) {
b.SumTransformLatency += res.AvgTransformLatency
}

// AppendFiltered adds a FilterResult onto the buffer and stores the result
func (b *ObserverBuffer) AppendFiltered(res *FilterResult) {
if res == nil {
return
}

b.MsgFiltered += res.FilteredCount
b.appendFilterResult(res)
}

func (b *ObserverBuffer) appendFilterResult(res *FilterResult) {
if b.MaxFilterLatency < res.MaxFilterLatency {
b.MaxFilterLatency = res.MaxFilterLatency
}
if b.MinFilterLatency > res.MinFilterLatency || b.MinFilterLatency == time.Duration(0) {
b.MinFilterLatency = res.MinFilterLatency
}
b.SumFilterLatency += res.AvgFilterLatency
}

// GetSumResults returns the total number of results logged in the buffer
func (b *ObserverBuffer) GetSumResults() int64 {
return b.TargetResults + b.OversizedTargetResults + b.InvalidTargetResults
Expand All @@ -129,10 +154,16 @@ func (b *ObserverBuffer) GetAvgTransformLatency() time.Duration {
return common.GetAverageFromDuration(b.SumTransformLatency, b.MsgTotal)
}

// GetAvgFilterLatency calculates average filter latency
func (b *ObserverBuffer) GetAvgFilterLatency() time.Duration {
return common.GetAverageFromDuration(b.SumFilterLatency, b.MsgFiltered)
}

func (b *ObserverBuffer) String() string {
return fmt.Sprintf(
"TargetResults:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxTransformLatency:%d",
"TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d",
b.TargetResults,
b.MsgFiltered,
b.MsgSent,
b.MsgFailed,
b.OversizedTargetResults,
Expand All @@ -143,6 +174,7 @@ func (b *ObserverBuffer) String() string {
b.InvalidMsgFailed,
b.MaxProcLatency.Milliseconds(),
b.MaxMsgLatency.Milliseconds(),
b.MaxFilterLatency.Milliseconds(),
b.MaxTransformLatency.Milliseconds(),
)
}
21 changes: 20 additions & 1 deletion pkg/models/observer_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func TestObserverBuffer(t *testing.T) {
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
},
}
filtered := []*Message{
{
Data: []byte("FooBar"),
PartitionKey: "partition4",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)

Expand All @@ -59,11 +68,17 @@ func TestObserverBuffer(t *testing.T) {
b.AppendWriteInvalid(r)
b.AppendWriteInvalid(nil)

fr := NewFilterResultWithTime(filtered, timeNow)

b.AppendFiltered(fr)

assert.Equal(int64(2), b.TargetResults)
assert.Equal(int64(4), b.MsgSent)
assert.Equal(int64(2), b.MsgFailed)
assert.Equal(int64(6), b.MsgTotal)

assert.Equal(int64(1), b.MsgFiltered)

assert.Equal(int64(2), b.OversizedTargetResults)
assert.Equal(int64(4), b.OversizedMsgSent)
assert.Equal(int64(2), b.OversizedMsgFailed)
Expand All @@ -84,5 +99,9 @@ func TestObserverBuffer(t *testing.T) {
assert.Equal(time.Duration(1)*time.Minute, b.MinTransformLatency)
assert.Equal(time.Duration(2)*time.Minute, b.GetAvgTransformLatency())

assert.Equal("TargetResults:2,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxTransformLatency:180000", b.String())
assert.Equal(time.Duration(10)*time.Minute, b.MaxFilterLatency)
assert.Equal(time.Duration(10)*time.Minute, b.MinFilterLatency)
assert.Equal(time.Duration(10)*time.Minute, b.GetAvgFilterLatency())

assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000", b.String())
}
Loading