Skip to content

Commit

Permalink
Refactor structure to allow Batch Transformations
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Jun 17, 2024
1 parent 31a4b09 commit 3c8e391
Show file tree
Hide file tree
Showing 20 changed files with 510 additions and 99 deletions.
3 changes: 2 additions & 1 deletion cmd/aws/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package main
import (
"github.com/snowplow/snowbridge/cmd/cli"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig"
kafkasource "github.com/snowplow/snowbridge/pkg/source/kafka"
kinesissource "github.com/snowplow/snowbridge/pkg/source/kinesis"
pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub"
Expand All @@ -27,5 +28,5 @@ func main() {
sourceConfigPairs := []config.ConfigurationPair{stdinsource.ConfigPair, sqssource.ConfigPair,
pubsubsource.ConfigPair, kafkasource.ConfigPair, kinesissource.ConfigPair}

cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations)
cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations)
}
25 changes: 21 additions & 4 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"github.com/snowplow/snowbridge/cmd"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/batchtransform"
"github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig"
"github.com/snowplow/snowbridge/pkg/failure/failureiface"
"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/observer"
Expand All @@ -47,7 +49,11 @@ const (
)

// RunCli runs the app
func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) {
func RunCli(
supportedSources []config.ConfigurationPair,
supportedTransformations []config.ConfigurationPair,
supportedBatchTransformations []config.ConfigurationPair,
) {
cfg, sentryEnabled, err := cmd.Init()
if err != nil {
exitWithError(err, sentryEnabled)
Expand Down Expand Up @@ -95,6 +101,11 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
return err
}

btr, err := batchtransformconfig.GetBatchTransformations(cfg, batchtransformconfig.SupportedTransformations)
if err != nil {
return err
}

t, err := cfg.GetTarget()
if err != nil {
return err
Expand Down Expand Up @@ -158,7 +169,7 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
WriteToTarget: sourceWriteFunc(t, ft, tr, btr, o),
}

// Read is a long running process and will only return when the source
Expand Down Expand Up @@ -189,7 +200,13 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
// 4. Observing these results
//
// All with retry logic baked in to remove any of this handling from the implementations
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error {
func sourceWriteFunc(
t targetiface.Target,
ft failureiface.Failure,
tr transform.TransformationApplyFunction,
btr batchtransform.BatchTransformationApplyFunction,
o *observer.Observer,
) func(messages []*models.Message) error {
return func(messages []*models.Message) error {

// Apply transformations
Expand All @@ -211,7 +228,7 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
messagesToSend := transformed.Result

res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
res, err := t.Write(messagesToSend)
res, err := t.Write(messagesToSend, btr)

o.TargetWrite(res)
messagesToSend = res.Failed
Expand Down
3 changes: 2 additions & 1 deletion cmd/main/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package main
import (
"github.com/snowplow/snowbridge/cmd/cli"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig"
kafkasource "github.com/snowplow/snowbridge/pkg/source/kafka"
pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub"
sqssource "github.com/snowplow/snowbridge/pkg/source/sqs"
Expand All @@ -28,5 +29,5 @@ func main() {
kafkasource.ConfigPair, pubsubsource.ConfigPair,
}

cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations)
cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations)
}
21 changes: 11 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ type Config struct {

// configurationData for holding all configuration options
type configurationData struct {
Source *component `hcl:"source,block" envPrefix:"SOURCE_"`
Target *component `hcl:"target,block" envPrefix:"TARGET_"`
FailureTarget *failureConfig `hcl:"failure_target,block"`
Sentry *sentryConfig `hcl:"sentry,block"`
StatsReceiver *statsConfig `hcl:"stats_receiver,block"`
Transformations []*component `hcl:"transform,block"`
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"`
License *licenseConfig `hcl:"license,block"`
Source *component `hcl:"source,block" envPrefix:"SOURCE_"`
Target *component `hcl:"target,block" envPrefix:"TARGET_"`
FailureTarget *failureConfig `hcl:"failure_target,block"`
Sentry *sentryConfig `hcl:"sentry,block"`
StatsReceiver *statsConfig `hcl:"stats_receiver,block"`
Transformations []*component `hcl:"transform,block"`
BatchTransformations []*component `hcl:"batch_transform,block"`
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"`
License *licenseConfig `hcl:"license,block"`
}

// component is a type to abstract over configuration blocks.
Expand Down
61 changes: 61 additions & 0 deletions pkg/batchtransform/batch_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package batchtransform

import "github.com/snowplow/snowbridge/pkg/models"

// BatchTransformationFunction is a transformation function which operates across a batch of events
// It takes a batch as an input, and returns a successful batch and a slice of invalid messages
type BatchTransformationFunction func([]models.MessageBatch) (success []models.MessageBatch, invalid []*models.Message, oversized []*models.Message)

// BatchTransformationApplyFunction combines batch into one callable function
type BatchTransformationApplyFunction func([]*models.Message, []BatchTransformationFunction, []BatchTransformationFunction) models.BatchTransformationResult

// BatchTransformationGenerator returns a BatchTransformationApplyFunction from a provided set of BatchTransformationFunctions
type BatchTransformationGenerator func(...BatchTransformationFunction) BatchTransformationApplyFunction

// NewBatchTransformation constructs a function which applies all transformations to all messages, returning a TransformationResult.
func NewBatchTransformation(tranformFunctions ...BatchTransformationFunction) BatchTransformationApplyFunction {
// pre is a function to be run before the configured ones, post is to be run after.
// This is done because sometimes functions need to _always_ run first or last, depending on the specific target logic. (eg. batching by dynamic headers, if configured)
// pre and post functions are intended for use only in the implementations of targets.
return func(messages []*models.Message, pre []BatchTransformationFunction, post []BatchTransformationFunction) models.BatchTransformationResult {
// make a batch to begin with
success := []models.MessageBatch{{OriginalMessages: messages}}

// Because http will require specific functions to always go first and last, we provide these here
// Compiler gets confused if we don't rename.
functionsToRun := append(pre, tranformFunctions...)
functionsToRun = append(functionsToRun, post...)

// If no transformations, just return a result
if len(functionsToRun) == 0 {
return models.BatchTransformationResult{Success: success}
}

var invalid []*models.Message
var oversized []*models.Message
invalidList := make([]*models.Message, 0, len(messages))
oversizedList := make([]*models.Message, 0, len(messages))
// Run each transformation
for _, transformFunction := range functionsToRun {
// success is recomputed each time into a complete list of batches
success, invalid, oversized = transformFunction(success)
// Invalids are excluded each iteration so must be appended to a permanent list
invalidList = append(invalidList, invalid...)

oversizedList = append(oversizedList, oversized...)
}

return models.BatchTransformationResult{Success: success, Invalid: invalidList, Oversized: oversizedList}
}
}
58 changes: 58 additions & 0 deletions pkg/batchtransform/batchtransformconfig/batch_transform_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package batchtransformconfig

import (
"fmt"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/batchtransform"
)

// SupportedTransformations is a ConfigurationPair slice containing all the officially supported transformations.
var SupportedTransformations = []config.ConfigurationPair{
// TODO: Add config implementations & put them here
}

// GetBatchTransformations builds and returns transformationApplyFunction
// from the transformations configured.
func GetBatchTransformations(c *config.Config, supportedTransformations []config.ConfigurationPair) (batchtransform.BatchTransformationApplyFunction, error) {
funcs := make([]batchtransform.BatchTransformationFunction, 0)

for _, transformation := range c.Data.BatchTransformations {

useTransf := transformation.Use
decoderOpts := &config.DecoderOptions{
Input: useTransf.Body,
}

var component interface{}
var err error
for _, pair := range supportedTransformations {
if pair.Name == useTransf.Name {
plug := pair.Handle
component, err = c.CreateComponent(plug, decoderOpts)
if err != nil {
return nil, err
}
}
}

f, ok := component.(batchtransform.BatchTransformationFunction)
if !ok {
return nil, fmt.Errorf("could not interpret transformation configuration for %q", useTransf.Name)
}
funcs = append(funcs, f)
}

return batchtransform.NewBatchTransformation(funcs...), nil
}
74 changes: 74 additions & 0 deletions pkg/batchtransform/template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package batchtransform

import (
"bytes"
"encoding/json"
"text/template"

"github.com/pkg/errors"
"github.com/snowplow/snowbridge/pkg/models"
)

// TemplaterBatchTransformationFunction is a thing TODO add desc
func TemplaterBatchTransformationFunction(batches []models.MessageBatch) ([]models.MessageBatch, []*models.Message) {

// This is just an outline implementation of a templater function, to help figure out the design of batch transforms in general

// The templater would fit here along the following lines:
const templ = `{
attributes: [ {{$first_1 := true}}
{{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}}
{{printf "%s" .attribute_data}}{{end}}
],
events: [ {{$first_2 := true}}
{{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}}
{{printf "%s" .event_data}}{{end}}
]
}`

invalid := make([]*models.Message, 0)
safe := make([]*models.Message, 0)

for _, b := range batches {
formatted := []map[string]json.RawMessage{}
for _, msg := range b.OriginalMessages {
// Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here)
var asMap map[string]json.RawMessage

if err := json.Unmarshal(msg.Data, &asMap); err != nil {
msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup!
invalid = append(invalid, msg)
continue
}

formatted = append(formatted, asMap)
}
var buf bytes.Buffer

t := template.Must(template.New("example").Parse(templ))
if err := t.Execute(&buf, formatted); err != nil {
for _, msg := range safe {
msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup!
invalid = append(invalid, msg)
}
return nil, invalid
}

// Assign the templated request to the HTTPRequestBody field
b.BatchData = buf.Bytes()

}

return batches, invalid
}
4 changes: 2 additions & 2 deletions pkg/failure/snowplow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (d *SnowplowFailure) WriteInvalid(invalid []*models.Message) (*models.Targe
transformed = append(transformed, tMsg)
}

return d.target.Write(transformed)
return d.target.Write(transformed, nil)
}

// WriteOversized will handle the conversion of oversized messages into failure
Expand Down Expand Up @@ -114,7 +114,7 @@ func (d *SnowplowFailure) WriteOversized(maximumAllowedSizeBytes int, oversized
transformed = append(transformed, tMsg)
}

return d.target.Write(transformed)
return d.target.Write(transformed, nil)
}

// Open manages opening the underlying target
Expand Down
3 changes: 2 additions & 1 deletion pkg/failure/snowplow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/snowplow/snowbridge/pkg/batchtransform"
"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/testutil"
)
Expand All @@ -27,7 +28,7 @@ type TestFailureTarget struct {
onWrite func(messages []*models.Message) (*models.TargetWriteResult, error)
}

func (t *TestFailureTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) {
func (t *TestFailureTarget) Write(messages []*models.Message, btf batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) {
return t.onWrite(messages)
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/models/batch_transformation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package models

import "time"

// MessageBatch houses batches of messages, for batch transformations to operate across
type MessageBatch struct {
OriginalMessages []*Message // Most targets will use the data from here, but where we have a http templating transformation, we would use this to ack batches of messages
BatchData []byte // Where we template http requests, we use this to define the body of the request
HTTPHeaders map[string]string // For dynamic headers feature
TimeRequestStarted time.Time
TimeRequestFinished time.Time
}

// BatchTransformationResult houses the result of a batch transformation
type BatchTransformationResult struct {
Success []MessageBatch
Invalid []*Message
Oversized []*Message
}
Loading

0 comments on commit 3c8e391

Please sign in to comment.