Skip to content

Commit

Permalink
Add basic filtering (closes #64)
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Jul 13, 2021
1 parent a4affba commit bf7aab8
Show file tree
Hide file tree
Showing 23 changed files with 688 additions and 103 deletions.
11 changes: 11 additions & 0 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
transformed := tr(messages)
// no error as errors should be returned in the failures array of TransformationResult

// Ack filtered messages with no further action
messagesToFilter := transformed.Filtered
for _, msg := range messagesToFilter {
if msg.AckFunc != nil {
msg.AckFunc()
}
}
// Push filter result to observer
filterRes := models.NewFilterResult(messagesToFilter)
o.Filtered(filterRes)

// Send message buffer
messagesToSend := transformed.Result

Expand Down
8 changes: 7 additions & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,15 @@ func (c *Config) GetTransformations() (transform.TransformationApplyFunction, er
funcs = append(funcs, transform.SpEnrichedToJson)
case "spEnrichedSetPk":
funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(funcOpts[1]))
case "spEnrichedFilter":
filterFunc, err := transform.NewSpEnrichedFilterFunction(funcOpts[1])
if err != nil {
return nil, err
}
funcs = append(funcs, filterFunc)
case "none":
default:
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got '%s'", c.Transformation))
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got '%s'", c.Transformation))
}
}
return transform.NewTransformation(funcs...), nil
Expand Down
19 changes: 18 additions & 1 deletion cmd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,24 @@ func TestNewConfig_InvalidTransformation(t *testing.T) {
transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}' and got 'fake'", err.Error())
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got 'fake'", err.Error())
}

func TestNewConfig_FilterFailure(t *testing.T) {
assert := assert.New(t)

defer os.Unsetenv("MESSAGE_TRANSFORMATION")

os.Setenv("MESSAGE_TRANSFORMATION", "spEnrichedFilter:incompatibleArg")

c, err := NewConfig()
assert.NotNil(c)
assert.Nil(err)

transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal(`Invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`, err.Error())
}

func TestNewConfig_InvalidTarget(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions cmd/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ func ServerlessRequestHandler(messages []*models.Message) error {
transformed := tr(messages)
// no error as errors should be returned in the failures array of TransformationResult

// Ack filtered messages with no further action
messagesToFilter := transformed.Filtered
for _, msg := range messagesToFilter {
if msg.AckFunc != nil {
msg.AckFunc()
}
}

res, err := t.Write(transformed.Result)
if err != nil {
log.WithFields(log.Fields{"error": err}).Error(err)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ require (
github.com/twitchscience/kinsumer v0.0.0-00010101000000-000000000000
github.com/urfave/cli v1.22.5
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect
golang.org/x/tools v0.1.3 // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/tools v0.1.4 // indirect
gopkg.in/stretchr/testify.v1 v1.2.2 // indirect
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3 h1:kzM6+9dur93BcC2kVlYl34cHU
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7sNFinVFvkx1c8SjBkio=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -585,8 +585,8 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.3 h1:L69ShwSZEyCsLKoAxDKeMvLDZkumEe8gXUZAjab0tX8=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
62 changes: 62 additions & 0 deletions pkg/models/filter_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// PROPRIETARY AND CONFIDENTIAL
//
// Unauthorized copying of this file via any medium is strictly prohibited.
//
// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.

package models

import (
"time"

"github.com/snowplow-devops/stream-replicator/pkg/common"
)

// FilterResult contains the results from a target write operation
type FilterResult struct {
FilteredCount int64

// Filtered holds all the messages that were filtered out and acked without sending to the target
Filtered []*Message

// Delta between TimePulled and TimeOfAck tells us how well the
// application is at processing filtered data internally
MaxFilterLatency time.Duration
MinFilterLatency time.Duration
AvgFilterLatency time.Duration
}

// NewFilterResult uses the current time as the timeOfFilter and then calls NewFilterResultWithTime
func NewFilterResult(filtered []*Message) *FilterResult {
return NewFilterResultWithTime(filtered, time.Now().UTC())
}

// NewFilterResultWithTime builds a result structure to return from a filtered message slice
// attempt which contains the sfiltered message count as well as several
// derived latency measures.
func NewFilterResultWithTime(filtered []*Message, timeOfFilter time.Time) *FilterResult {
r := FilterResult{
FilteredCount: int64(len(filtered)),
}

filteredLen := int64(len(filtered))

var sumFilterLatency time.Duration

for _, msg := range filtered {
filterLatency := timeOfFilter.Sub(msg.TimePulled)
if r.MaxFilterLatency < filterLatency {
r.MaxFilterLatency = filterLatency
}
if r.MinFilterLatency > filterLatency || r.MinFilterLatency == time.Duration(0) {
r.MinFilterLatency = filterLatency
}
sumFilterLatency += filterLatency
}

if filteredLen > 0 {
r.AvgFilterLatency = common.GetAverageFromDuration(sumFilterLatency, filteredLen)
}

return &r
}
72 changes: 72 additions & 0 deletions pkg/models/filter_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// PROPRIETARY AND CONFIDENTIAL
//
// Unauthorized copying of this file via any medium is strictly prohibited.
//
// Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.

package models

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewFilterResult_EmptyWithoutTime(t *testing.T) {
assert := assert.New(t)

r := NewFilterResult(nil)
assert.NotNil(r)

assert.Equal(int64(0), r.FilteredCount)

assert.Equal(time.Duration(0), r.MaxFilterLatency)
assert.Equal(time.Duration(0), r.MinFilterLatency)
assert.Equal(time.Duration(0), r.AvgFilterLatency)
}

func TestNewFilterResult_EmptyWithTime(t *testing.T) {
assert := assert.New(t)

r := NewFilterResultWithTime(nil, time.Now().UTC())
assert.NotNil(r)

assert.Equal(int64(0), r.FilteredCount)

assert.Equal(time.Duration(0), r.MaxFilterLatency)
assert.Equal(time.Duration(0), r.MinFilterLatency)
assert.Equal(time.Duration(0), r.AvgFilterLatency)
}

func TestNewFilterResult_WithMessages(t *testing.T) {
assert := assert.New(t)

timeNow := time.Now().UTC()

filtered := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-8) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
},
}

r := NewFilterResultWithTime(filtered, timeNow)
assert.NotNil(r)

assert.Equal(int64(2), r.FilteredCount)

assert.Equal(time.Duration(8)*time.Minute, r.MaxFilterLatency)
assert.Equal(time.Duration(4)*time.Minute, r.MinFilterLatency)
assert.Equal(time.Duration(6)*time.Minute, r.AvgFilterLatency)
}
42 changes: 37 additions & 5 deletions pkg/models/observer_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type ObserverBuffer struct {
MsgFailed int64
MsgTotal int64

MsgFiltered int64

OversizedTargetResults int64
OversizedMsgSent int64
OversizedMsgFailed int64
Expand All @@ -39,6 +41,9 @@ type ObserverBuffer struct {
MaxTransformLatency time.Duration
MinTransformLatency time.Duration
SumTransformLatency time.Duration
MaxFilterLatency time.Duration
MinFilterLatency time.Duration
SumFilterLatency time.Duration
}

// AppendWrite adds a normal TargetWriteResult onto the buffer and stores the result
Expand All @@ -52,7 +57,7 @@ func (b *ObserverBuffer) AppendWrite(res *TargetWriteResult) {
b.MsgFailed += res.FailedCount
b.MsgTotal += res.Total()

b.append(res)
b.appendWriteResult(res)
}

// AppendWriteOversized adds an oversized TargetWriteResult onto the buffer and stores the result
Expand All @@ -66,7 +71,7 @@ func (b *ObserverBuffer) AppendWriteOversized(res *TargetWriteResult) {
b.OversizedMsgFailed += res.FailedCount
b.OversizedMsgTotal += res.Total()

b.append(res)
b.appendWriteResult(res)
}

// AppendWriteInvalid adds an invalid TargetWriteResult onto the buffer and stores the result
Expand All @@ -80,10 +85,10 @@ func (b *ObserverBuffer) AppendWriteInvalid(res *TargetWriteResult) {
b.InvalidMsgFailed += res.FailedCount
b.InvalidMsgTotal += res.Total()

b.append(res)
b.appendWriteResult(res)
}

func (b *ObserverBuffer) append(res *TargetWriteResult) {
func (b *ObserverBuffer) appendWriteResult(res *TargetWriteResult) {
if b.MaxProcLatency < res.MaxProcLatency {
b.MaxProcLatency = res.MaxProcLatency
}
Expand All @@ -109,6 +114,26 @@ func (b *ObserverBuffer) append(res *TargetWriteResult) {
b.SumTransformLatency += res.AvgTransformLatency
}

// AppendFiltered adds a FilterResult onto the buffer and stores the result
func (b *ObserverBuffer) AppendFiltered(res *FilterResult) {
if res == nil {
return
}

b.MsgFiltered += res.FilteredCount
b.appendFilterResult(res)
}

func (b *ObserverBuffer) appendFilterResult(res *FilterResult) {
if b.MaxFilterLatency < res.MaxFilterLatency {
b.MaxFilterLatency = res.MaxFilterLatency
}
if b.MinFilterLatency > res.MinFilterLatency || b.MinFilterLatency == time.Duration(0) {
b.MinFilterLatency = res.MinFilterLatency
}
b.SumFilterLatency += res.AvgFilterLatency
}

// GetSumResults returns the total number of results logged in the buffer
func (b *ObserverBuffer) GetSumResults() int64 {
return b.TargetResults + b.OversizedTargetResults + b.InvalidTargetResults
Expand All @@ -129,10 +154,16 @@ func (b *ObserverBuffer) GetAvgTransformLatency() time.Duration {
return common.GetAverageFromDuration(b.SumTransformLatency, b.MsgTotal)
}

// GetAvgFilterLatency calculates average filter latency
func (b *ObserverBuffer) GetAvgFilterLatency() time.Duration {
return common.GetAverageFromDuration(b.SumFilterLatency, b.MsgFiltered)
}

func (b *ObserverBuffer) String() string {
return fmt.Sprintf(
"TargetResults:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxTransformLatency:%d",
"TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d",
b.TargetResults,
b.MsgFiltered,
b.MsgSent,
b.MsgFailed,
b.OversizedTargetResults,
Expand All @@ -143,6 +174,7 @@ func (b *ObserverBuffer) String() string {
b.InvalidMsgFailed,
b.MaxProcLatency.Milliseconds(),
b.MaxMsgLatency.Milliseconds(),
b.MaxFilterLatency.Milliseconds(),
b.MaxTransformLatency.Milliseconds(),
)
}
21 changes: 20 additions & 1 deletion pkg/models/observer_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func TestObserverBuffer(t *testing.T) {
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
},
}
filtered := []*Message{
{
Data: []byte("FooBar"),
PartitionKey: "partition4",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)

Expand All @@ -59,11 +68,17 @@ func TestObserverBuffer(t *testing.T) {
b.AppendWriteInvalid(r)
b.AppendWriteInvalid(nil)

fr := NewFilterResultWithTime(filtered, timeNow)

b.AppendFiltered(fr)

assert.Equal(int64(2), b.TargetResults)
assert.Equal(int64(4), b.MsgSent)
assert.Equal(int64(2), b.MsgFailed)
assert.Equal(int64(6), b.MsgTotal)

assert.Equal(int64(1), b.MsgFiltered)

assert.Equal(int64(2), b.OversizedTargetResults)
assert.Equal(int64(4), b.OversizedMsgSent)
assert.Equal(int64(2), b.OversizedMsgFailed)
Expand All @@ -84,5 +99,9 @@ func TestObserverBuffer(t *testing.T) {
assert.Equal(time.Duration(1)*time.Minute, b.MinTransformLatency)
assert.Equal(time.Duration(2)*time.Minute, b.GetAvgTransformLatency())

assert.Equal("TargetResults:2,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxTransformLatency:180000", b.String())
assert.Equal(time.Duration(10)*time.Minute, b.MaxFilterLatency)
assert.Equal(time.Duration(10)*time.Minute, b.MinFilterLatency)
assert.Equal(time.Duration(10)*time.Minute, b.GetAvgFilterLatency())

assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000", b.String())
}
Loading

0 comments on commit bf7aab8

Please sign in to comment.