From 3c8e39144674393ecddffb258c416b6362d2814f Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 5 Jun 2024 18:04:44 +0100 Subject: [PATCH 1/4] Refactor structure to allow Batch Transformations --- cmd/aws/cli/main.go | 3 +- cmd/cli/cli.go | 25 +++- cmd/main/cli/main.go | 3 +- config/config.go | 21 ++-- pkg/batchtransform/batch_transform.go | 61 ++++++++++ .../batch_transform_config.go | 58 +++++++++ pkg/batchtransform/template.go | 74 ++++++++++++ pkg/failure/snowplow.go | 4 +- pkg/failure/snowplow_test.go | 3 +- pkg/models/batch_transformation.go | 30 +++++ pkg/target/common.go | 47 ++++++++ pkg/target/eventhub.go | 4 +- pkg/target/http.go | 113 +++++++++++++++--- pkg/target/http_test.go | 23 ++-- pkg/target/kafka.go | 108 +++++++++-------- pkg/target/kafka_test.go | 25 +++- pkg/target/kinesis.go | 1 + pkg/target/oauth2_test.go | 2 +- pkg/target/pubsub.go | 1 + pkg/target/targetiface/target.go | 3 +- 20 files changed, 510 insertions(+), 99 deletions(-) create mode 100644 pkg/batchtransform/batch_transform.go create mode 100644 pkg/batchtransform/batchtransformconfig/batch_transform_config.go create mode 100644 pkg/batchtransform/template.go create mode 100644 pkg/models/batch_transformation.go create mode 100644 pkg/target/common.go diff --git a/cmd/aws/cli/main.go b/cmd/aws/cli/main.go index e38fec40..5947677b 100644 --- a/cmd/aws/cli/main.go +++ b/cmd/aws/cli/main.go @@ -14,6 +14,7 @@ package main import ( "github.com/snowplow/snowbridge/cmd/cli" "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig" kafkasource "github.com/snowplow/snowbridge/pkg/source/kafka" kinesissource "github.com/snowplow/snowbridge/pkg/source/kinesis" pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" @@ -27,5 +28,5 @@ func main() { sourceConfigPairs := []config.ConfigurationPair{stdinsource.ConfigPair, sqssource.ConfigPair, pubsubsource.ConfigPair, kafkasource.ConfigPair, kinesissource.ConfigPair} - cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations) + cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations) } diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index 65b4c198..9108e955 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -28,6 +28,8 @@ import ( "github.com/snowplow/snowbridge/cmd" "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/batchtransform" + "github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig" "github.com/snowplow/snowbridge/pkg/failure/failureiface" "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/observer" @@ -47,7 +49,11 @@ const ( ) // RunCli runs the app -func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) { +func RunCli( + supportedSources []config.ConfigurationPair, + supportedTransformations []config.ConfigurationPair, + supportedBatchTransformations []config.ConfigurationPair, +) { cfg, sentryEnabled, err := cmd.Init() if err != nil { exitWithError(err, sentryEnabled) @@ -95,6 +101,11 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation return err } + btr, err := batchtransformconfig.GetBatchTransformations(cfg, batchtransformconfig.SupportedTransformations) + if err != nil { + return err + } + t, err := cfg.GetTarget() if err != nil { return err @@ -158,7 +169,7 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation // Callback functions for the source to leverage when writing data sf := sourceiface.SourceFunctions{ - WriteToTarget: sourceWriteFunc(t, ft, tr, o), + WriteToTarget: sourceWriteFunc(t, ft, tr, btr, o), } // Read is a long running process and will only return when the source @@ -189,7 +200,13 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation // 4. Observing these results // // All with retry logic baked in to remove any of this handling from the implementations -func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error { +func sourceWriteFunc( + t targetiface.Target, + ft failureiface.Failure, + tr transform.TransformationApplyFunction, + btr batchtransform.BatchTransformationApplyFunction, + o *observer.Observer, +) func(messages []*models.Message) error { return func(messages []*models.Message) error { // Apply transformations @@ -211,7 +228,7 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform messagesToSend := transformed.Result res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) { - res, err := t.Write(messagesToSend) + res, err := t.Write(messagesToSend, btr) o.TargetWrite(res) messagesToSend = res.Failed diff --git a/cmd/main/cli/main.go b/cmd/main/cli/main.go index 26e6a72b..21068085 100644 --- a/cmd/main/cli/main.go +++ b/cmd/main/cli/main.go @@ -14,6 +14,7 @@ package main import ( "github.com/snowplow/snowbridge/cmd/cli" "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig" kafkasource "github.com/snowplow/snowbridge/pkg/source/kafka" pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" sqssource "github.com/snowplow/snowbridge/pkg/source/sqs" @@ -28,5 +29,5 @@ func main() { kafkasource.ConfigPair, pubsubsource.ConfigPair, } - cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations) + cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations) } diff --git a/config/config.go b/config/config.go index 3971a98f..73077fc1 100644 --- a/config/config.go +++ b/config/config.go @@ -46,16 +46,17 @@ type Config struct { // configurationData for holding all configuration options type configurationData struct { - Source *component `hcl:"source,block" envPrefix:"SOURCE_"` - Target *component `hcl:"target,block" envPrefix:"TARGET_"` - FailureTarget *failureConfig `hcl:"failure_target,block"` - Sentry *sentryConfig `hcl:"sentry,block"` - StatsReceiver *statsConfig `hcl:"stats_receiver,block"` - Transformations []*component `hcl:"transform,block"` - LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"` - UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"` - DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"` - License *licenseConfig `hcl:"license,block"` + Source *component `hcl:"source,block" envPrefix:"SOURCE_"` + Target *component `hcl:"target,block" envPrefix:"TARGET_"` + FailureTarget *failureConfig `hcl:"failure_target,block"` + Sentry *sentryConfig `hcl:"sentry,block"` + StatsReceiver *statsConfig `hcl:"stats_receiver,block"` + Transformations []*component `hcl:"transform,block"` + BatchTransformations []*component `hcl:"batch_transform,block"` + LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"` + UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"` + DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"` + License *licenseConfig `hcl:"license,block"` } // component is a type to abstract over configuration blocks. diff --git a/pkg/batchtransform/batch_transform.go b/pkg/batchtransform/batch_transform.go new file mode 100644 index 00000000..a9d2c9fa --- /dev/null +++ b/pkg/batchtransform/batch_transform.go @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package batchtransform + +import "github.com/snowplow/snowbridge/pkg/models" + +// BatchTransformationFunction is a transformation function which operates across a batch of events +// It takes a batch as an input, and returns a successful batch and a slice of invalid messages +type BatchTransformationFunction func([]models.MessageBatch) (success []models.MessageBatch, invalid []*models.Message, oversized []*models.Message) + +// BatchTransformationApplyFunction combines batch into one callable function +type BatchTransformationApplyFunction func([]*models.Message, []BatchTransformationFunction, []BatchTransformationFunction) models.BatchTransformationResult + +// BatchTransformationGenerator returns a BatchTransformationApplyFunction from a provided set of BatchTransformationFunctions +type BatchTransformationGenerator func(...BatchTransformationFunction) BatchTransformationApplyFunction + +// NewBatchTransformation constructs a function which applies all transformations to all messages, returning a TransformationResult. +func NewBatchTransformation(tranformFunctions ...BatchTransformationFunction) BatchTransformationApplyFunction { + // pre is a function to be run before the configured ones, post is to be run after. + // This is done because sometimes functions need to _always_ run first or last, depending on the specific target logic. (eg. batching by dynamic headers, if configured) + // pre and post functions are intended for use only in the implementations of targets. + return func(messages []*models.Message, pre []BatchTransformationFunction, post []BatchTransformationFunction) models.BatchTransformationResult { + // make a batch to begin with + success := []models.MessageBatch{{OriginalMessages: messages}} + + // Because http will require specific functions to always go first and last, we provide these here + // Compiler gets confused if we don't rename. + functionsToRun := append(pre, tranformFunctions...) + functionsToRun = append(functionsToRun, post...) + + // If no transformations, just return a result + if len(functionsToRun) == 0 { + return models.BatchTransformationResult{Success: success} + } + + var invalid []*models.Message + var oversized []*models.Message + invalidList := make([]*models.Message, 0, len(messages)) + oversizedList := make([]*models.Message, 0, len(messages)) + // Run each transformation + for _, transformFunction := range functionsToRun { + // success is recomputed each time into a complete list of batches + success, invalid, oversized = transformFunction(success) + // Invalids are excluded each iteration so must be appended to a permanent list + invalidList = append(invalidList, invalid...) + + oversizedList = append(oversizedList, oversized...) + } + + return models.BatchTransformationResult{Success: success, Invalid: invalidList, Oversized: oversizedList} + } +} diff --git a/pkg/batchtransform/batchtransformconfig/batch_transform_config.go b/pkg/batchtransform/batchtransformconfig/batch_transform_config.go new file mode 100644 index 00000000..1173ed90 --- /dev/null +++ b/pkg/batchtransform/batchtransformconfig/batch_transform_config.go @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package batchtransformconfig + +import ( + "fmt" + + "github.com/snowplow/snowbridge/config" + "github.com/snowplow/snowbridge/pkg/batchtransform" +) + +// SupportedTransformations is a ConfigurationPair slice containing all the officially supported transformations. +var SupportedTransformations = []config.ConfigurationPair{ + // TODO: Add config implementations & put them here +} + +// GetBatchTransformations builds and returns transformationApplyFunction +// from the transformations configured. +func GetBatchTransformations(c *config.Config, supportedTransformations []config.ConfigurationPair) (batchtransform.BatchTransformationApplyFunction, error) { + funcs := make([]batchtransform.BatchTransformationFunction, 0) + + for _, transformation := range c.Data.BatchTransformations { + + useTransf := transformation.Use + decoderOpts := &config.DecoderOptions{ + Input: useTransf.Body, + } + + var component interface{} + var err error + for _, pair := range supportedTransformations { + if pair.Name == useTransf.Name { + plug := pair.Handle + component, err = c.CreateComponent(plug, decoderOpts) + if err != nil { + return nil, err + } + } + } + + f, ok := component.(batchtransform.BatchTransformationFunction) + if !ok { + return nil, fmt.Errorf("could not interpret transformation configuration for %q", useTransf.Name) + } + funcs = append(funcs, f) + } + + return batchtransform.NewBatchTransformation(funcs...), nil +} diff --git a/pkg/batchtransform/template.go b/pkg/batchtransform/template.go new file mode 100644 index 00000000..9a294eb0 --- /dev/null +++ b/pkg/batchtransform/template.go @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package batchtransform + +import ( + "bytes" + "encoding/json" + "text/template" + + "github.com/pkg/errors" + "github.com/snowplow/snowbridge/pkg/models" +) + +// TemplaterBatchTransformationFunction is a thing TODO add desc +func TemplaterBatchTransformationFunction(batches []models.MessageBatch) ([]models.MessageBatch, []*models.Message) { + + // This is just an outline implementation of a templater function, to help figure out the design of batch transforms in general + + // The templater would fit here along the following lines: + const templ = `{ + attributes: [ {{$first_1 := true}} + {{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}} + {{printf "%s" .attribute_data}}{{end}} + ], + events: [ {{$first_2 := true}} + {{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}} + {{printf "%s" .event_data}}{{end}} + ] + }` + + invalid := make([]*models.Message, 0) + safe := make([]*models.Message, 0) + + for _, b := range batches { + formatted := []map[string]json.RawMessage{} + for _, msg := range b.OriginalMessages { + // Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here) + var asMap map[string]json.RawMessage + + if err := json.Unmarshal(msg.Data, &asMap); err != nil { + msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup! + invalid = append(invalid, msg) + continue + } + + formatted = append(formatted, asMap) + } + var buf bytes.Buffer + + t := template.Must(template.New("example").Parse(templ)) + if err := t.Execute(&buf, formatted); err != nil { + for _, msg := range safe { + msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup! + invalid = append(invalid, msg) + } + return nil, invalid + } + + // Assign the templated request to the HTTPRequestBody field + b.BatchData = buf.Bytes() + + } + + return batches, invalid +} diff --git a/pkg/failure/snowplow.go b/pkg/failure/snowplow.go index 39f7d06c..b1d9bb61 100644 --- a/pkg/failure/snowplow.go +++ b/pkg/failure/snowplow.go @@ -79,7 +79,7 @@ func (d *SnowplowFailure) WriteInvalid(invalid []*models.Message) (*models.Targe transformed = append(transformed, tMsg) } - return d.target.Write(transformed) + return d.target.Write(transformed, nil) } // WriteOversized will handle the conversion of oversized messages into failure @@ -114,7 +114,7 @@ func (d *SnowplowFailure) WriteOversized(maximumAllowedSizeBytes int, oversized transformed = append(transformed, tMsg) } - return d.target.Write(transformed) + return d.target.Write(transformed, nil) } // Open manages opening the underlying target diff --git a/pkg/failure/snowplow_test.go b/pkg/failure/snowplow_test.go index d8ff5760..2bc832a4 100644 --- a/pkg/failure/snowplow_test.go +++ b/pkg/failure/snowplow_test.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/testutil" ) @@ -27,7 +28,7 @@ type TestFailureTarget struct { onWrite func(messages []*models.Message) (*models.TargetWriteResult, error) } -func (t *TestFailureTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +func (t *TestFailureTarget) Write(messages []*models.Message, btf batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { return t.onWrite(messages) } diff --git a/pkg/models/batch_transformation.go b/pkg/models/batch_transformation.go new file mode 100644 index 00000000..e1dd1ada --- /dev/null +++ b/pkg/models/batch_transformation.go @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package models + +import "time" + +// MessageBatch houses batches of messages, for batch transformations to operate across +type MessageBatch struct { + OriginalMessages []*Message // Most targets will use the data from here, but where we have a http templating transformation, we would use this to ack batches of messages + BatchData []byte // Where we template http requests, we use this to define the body of the request + HTTPHeaders map[string]string // For dynamic headers feature + TimeRequestStarted time.Time + TimeRequestFinished time.Time +} + +// BatchTransformationResult houses the result of a batch transformation +type BatchTransformationResult struct { + Success []MessageBatch + Invalid []*Message + Oversized []*Message +} diff --git a/pkg/target/common.go b/pkg/target/common.go new file mode 100644 index 00000000..81330a5e --- /dev/null +++ b/pkg/target/common.go @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package target + +import ( + "github.com/snowplow/snowbridge/pkg/batchtransform" + "github.com/snowplow/snowbridge/pkg/models" +) + +// chunkBatcherWithConfig returns a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model. +// It is done this way in order to pass GetChunkedMessages its config within the confines of the BatchTransfomration design +func chunkBatcherWithConfig(chunkSize int, maxMessageByteSize int, maxChunkByteSize int) batchtransform.BatchTransformationFunction { + + // chunkBatcher is a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model, + // preserving the original logic and ownership of the function. + chunkBatcher := func(batchesIn []models.MessageBatch) ([]models.MessageBatch, []*models.Message, []*models.Message) { + oversizedOut := make([]*models.Message, 0) + chunkedBatches := make([]models.MessageBatch, 0) + + for _, batch := range batchesIn { + chunks, oversized := models.GetChunkedMessages(batch.OriginalMessages, chunkSize, maxMessageByteSize, maxChunkByteSize) + + oversizedOut = append(oversizedOut, oversized...) + + for _, chunk := range chunks { + asBatch := models.MessageBatch{ + OriginalMessages: chunk, + } + + chunkedBatches = append(chunkedBatches, asBatch) + } + + } + return chunkedBatches, nil, oversizedOut + } + + return chunkBatcher +} diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index c34ddf7b..42eed48f 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/models" ) @@ -147,9 +148,10 @@ func AdaptEventHubTargetFunc(f func(c *EventHubConfig) (*EventHubTarget, error)) } } -func (eht *EventHubTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +func (eht *EventHubTarget) Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { eht.log.Debugf("Writing %d messages to stream ...", len(messages)) + // TODO: Replace this with the new Chunker Batch Transformation - should be a post function. chunks, oversized := models.GetChunkedMessages( messages, eht.chunkMessageLimit, // Max Chunk size (number of messages) diff --git a/pkg/target/http.go b/pkg/target/http.go index 7738f163..8c77a74f 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" @@ -212,9 +213,77 @@ func AdaptHTTPTargetFunc(f func(c *HTTPTargetConfig) (*HTTPTarget, error)) HTTPT } } -func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +// When we have dynamic headers, batching by header must necessarily run first. This is a http specific function, +// so defining it here and fixing it into the Write function avoids complexity in configuration +func (ht *HTTPTarget) groupByDynamicHeaders(batches []models.MessageBatch) ([]models.MessageBatch, []*models.Message, []*models.Message) { + if !ht.dynamicHeaders { + // If the feature is disabled just return + return batches, nil, nil + } + + // Make a map of stringified header values + headersFound := make(map[string]*models.MessageBatch) + + for _, batch := range batches { + // Group data by that index + for _, msg := range batch.OriginalMessages { + headerKey := fmt.Sprint(msg.HTTPHeaders) + if headersFound[headerKey] != nil { + // If a key already exists, just add this message + headersFound[headerKey].OriginalMessages = append(headersFound[headerKey].OriginalMessages, msg) + } else { + headersFound[headerKey] = &models.MessageBatch{ + OriginalMessages: []*models.Message{msg}, + HTTPHeaders: ht.retrieveHeaders(msg), + } + } + } + } + + outBatches := []models.MessageBatch{} + for _, batch := range headersFound { + outBatches = append(outBatches, *batch) + } + + return outBatches, nil, nil +} + +// Where no transformation function provides a request body, we must provide one - this necessarily must happen last. +// This is a http specific function so we define it here to avoid scope for misconfiguration +func (ht *HTTPTarget) provideRequestBody(batches []models.MessageBatch) ([]models.MessageBatch, []*models.Message, []*models.Message) { + + // TODO: Add test for when messagess are just strings & confirm that it all works + + // TODO: Note: This would mean that the GTM client gets arrays of single events instead of single events. + // But we could configure an explicit templater to change that if we wanted + // We should test to be certain that it's still compatible. + invalid := make([]*models.Message, 0) + for _, batch := range batches { + if batch.BatchData != nil { + // TODO: Check if it is nil or zero & adapt accordingly + continue + } + requestData := []string{} + for _, msg := range batch.OriginalMessages { + requestData = append(requestData, string(msg.Data)) + } + // TODO: Add tests to be sure this produces the desired request + requestBody, err := json.Marshal(requestData) + if err != nil { + // TODO: Handle errors here + fmt.Println(err) + } + batch.BatchData = requestBody + } + + return batches, invalid, nil +} + +func (ht *HTTPTarget) Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { ht.log.Debugf("Writing %d messages to endpoint ...", len(messages)) + // TODO: We are changing this target from always operating on single events to now handling batches + // this should therefore be replaced by the new chunking Batch Transformation. safeMessages, oversized := models.FilterOversizedMessages( messages, ht.MaximumAllowedMessageSizeBytes(), @@ -225,44 +294,55 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu var sent []*models.Message var errResult error - for _, msg := range safeMessages { - request, err := http.NewRequest("POST", ht.httpURL, bytes.NewBuffer(msg.Data)) + // Run the transformations + // We provide a 'pre' function to group by Dynamic headers (if enabled) - this must necessarily happen first. + // We also provide a 'post' function to create a message Body if none is provided via templater - this must happen last. + batchTransformRes := batchTransformFunc(safeMessages, []batchtransform.BatchTransformationFunction{ht.groupByDynamicHeaders}, []batchtransform.BatchTransformationFunction{ht.provideRequestBody}) + + invalid = append(invalid, batchTransformRes.Invalid...) + + for _, batch := range batchTransformRes.Success { + request, err := http.NewRequest("POST", ht.httpURL, bytes.NewBuffer(batch.BatchData)) if err != nil { errResult = multierror.Append(errResult, errors.Wrap(err, "Error creating request")) - failed = append(failed, msg) + failed = append(failed, batch.OriginalMessages...) continue } - request.Header.Add("Content-Type", ht.contentType) // Add content type - addHeadersToRequest(request, ht.headers, ht.retrieveHeaders(msg)) // Add headers if there are any - if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set + request.Header.Add("Content-Type", ht.contentType) // Add content type + addHeadersToRequest(request, ht.headers, batch.HTTPHeaders) // Add headers + if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword) } requestStarted := time.Now() resp, err := ht.client.Do(request) // Make request requestFinished := time.Now() - msg.TimeRequestStarted = requestStarted - msg.TimeRequestFinished = requestFinished - if err != nil { errResult = multierror.Append(errResult, err) - failed = append(failed, msg) + // TODO: This means that errored requests won't have request times attached. + // Can iterate to add or rethink how it works. + failed = append(failed, batch.OriginalMessages...) continue } defer resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode < 300 { - sent = append(sent, msg) - if msg.AckFunc != nil { // Ack successful messages - msg.AckFunc() + for _, msg := range batch.OriginalMessages { + msg.TimeRequestStarted = requestStarted + msg.TimeRequestFinished = requestFinished + sent = append(sent, msg) + if msg.AckFunc != nil { // Ack successful messages + msg.AckFunc() + } } + } else { errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status)) - failed = append(failed, msg) + failed = append(failed, batch.OriginalMessages...) continue } } if errResult != nil { - errResult = errors.Wrap(errResult, "Error sending http requests") + errResult = errors.Wrap(errResult, "Error sending http request(s)") } ht.log.Debugf("Successfully wrote %d/%d messages", len(sent), len(messages)) @@ -291,6 +371,7 @@ func (ht *HTTPTarget) GetID() string { return ht.httpURL } +// TODO: Can prob remove func (ht *HTTPTarget) retrieveHeaders(msg *models.Message) map[string]string { if !ht.dynamicHeaders { return nil diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index c4a83b87..5989ab37 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -12,20 +12,17 @@ package target import ( - "bytes" - "encoding/json" + "fmt" "io" "net/http" "net/http/httptest" - "reflect" "sync" "sync/atomic" "testing" - "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" - "github.com/snowplow/snowbridge/pkg/models" + "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/testutil" ) @@ -49,6 +46,8 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se return createTestServerWithResponseCode(results, waitgroup, 200) } +/* + func TestGetHeaders(t *testing.T) { assert := assert.New(t) valid1 := `{"Max Forwards": "10", "Accept-Language": "en-US", "Accept-Datetime": "Thu, 31 May 2007 20:35:00 GMT"}` @@ -326,6 +325,8 @@ func TestNewHTTPTarget(t *testing.T) { assert.Nil(failedHTTPTarget2) } +*/ + func TestHttpWrite_Simple(t *testing.T) { testCases := []struct { Name string @@ -356,8 +357,10 @@ func TestHttpWrite_Simple(t *testing.T) { } messages := testutil.GetTestMessages(501, "Hello Server!!", ackFunc) - wg.Add(501) - writeResult, err1 := target.Write(messages) + wg.Add(1) + + batchTransformFunc := batchtransform.NewBatchTransformation() + writeResult, err1 := target.Write(messages, batchTransformFunc) wg.Wait() @@ -368,11 +371,16 @@ func TestHttpWrite_Simple(t *testing.T) { assert.Equal("Hello Server!!", string(result)) } + fmt.Println(writeResult.Invalid) + fmt.Println(results) + assert.Equal(int64(501), ackOps) }) } } +/* + func TestHttpWrite_Concurrent(t *testing.T) { assert := assert.New(t) @@ -678,3 +686,4 @@ func getNgrokAddress() string { } panic("no ngrok https endpoint found") } +*/ diff --git a/pkg/target/kafka.go b/pkg/target/kafka.go index 4e1f60ab..17cbb112 100644 --- a/pkg/target/kafka.go +++ b/pkg/target/kafka.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" ) @@ -205,72 +206,83 @@ func AdaptKafkaTargetFunc(f func(c *KafkaConfig) (*KafkaTarget, error)) KafkaTar } // Write pushes all messages to the required target -func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +func (kt *KafkaTarget) Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { kt.log.Debugf("Writing %d messages to topic ...", len(messages)) + // TODO: This can just be implemented safeMessages, oversized := models.FilterOversizedMessages( messages, kt.MaximumAllowedMessageSizeBytes(), ) + var invalid []*models.Message var sent []*models.Message var failed []*models.Message var errResult error - if kt.asyncProducer != nil { - // Not adding request latency metric to async producer for now, since it would complicate the implementation, and delay our debug. - for _, msg := range safeMessages { - kt.asyncProducer.Input() <- &sarama.ProducerMessage{ - Topic: kt.topicName, - Key: sarama.StringEncoder(msg.PartitionKey), - Value: sarama.ByteEncoder(msg.Data), - Metadata: msg, - } - } + // Run the transformations + batchTransformRes := batchTransformFunc(safeMessages, nil, nil) - for i := 0; i < len(safeMessages); i++ { + invalid = append(invalid, batchTransformRes.Invalid...) - result := <-kt.asyncResults // Block until result is returned + for _, batch := range batchTransformRes.Success { - if result.Err != nil { - errResult = multierror.Append(errResult, result.Err) - originalMessage := result.Msg.Metadata.(*models.Message) - originalMessage.SetError(result.Err) - failed = append(failed, originalMessage) - } else { - originalMessage := result.Msg.Metadata.(*models.Message) - if originalMessage.AckFunc != nil { - originalMessage.AckFunc() + // TODO: This is too much nesting. Refactor for readability + if kt.asyncProducer != nil { + // Not adding request latency metric to async producer for now, since it would complicate the implementation, and delay our debug. + for _, msg := range batch.OriginalMessages { + kt.asyncProducer.Input() <- &sarama.ProducerMessage{ + Topic: kt.topicName, + Key: sarama.StringEncoder(msg.PartitionKey), + Value: sarama.ByteEncoder(msg.Data), + Metadata: msg, } - sent = append(sent, originalMessage) } - } - } else if kt.syncProducer != nil { - for _, msg := range safeMessages { - requestStarted := time.Now() - _, _, err := kt.syncProducer.SendMessage(&sarama.ProducerMessage{ - Topic: kt.topicName, - Key: sarama.StringEncoder(msg.PartitionKey), - Value: sarama.ByteEncoder(msg.Data), - }) - requestFinished := time.Now() - - msg.TimeRequestStarted = requestStarted - msg.TimeRequestFinished = requestFinished - - if err != nil { - errResult = multierror.Append(errResult, err) - msg.SetError(err) - failed = append(failed, msg) - } else { - if msg.AckFunc != nil { - msg.AckFunc() + + for i := 0; i < len(batch.OriginalMessages); i++ { + + result := <-kt.asyncResults // Block until result is returned + + if result.Err != nil { + errResult = multierror.Append(errResult, result.Err) + originalMessage := result.Msg.Metadata.(*models.Message) + originalMessage.SetError(result.Err) + failed = append(failed, originalMessage) + } else { + originalMessage := result.Msg.Metadata.(*models.Message) + if originalMessage.AckFunc != nil { + originalMessage.AckFunc() + } + sent = append(sent, originalMessage) + } + } + } else if kt.syncProducer != nil { + for _, msg := range batch.OriginalMessages { + requestStarted := time.Now() + _, _, err := kt.syncProducer.SendMessage(&sarama.ProducerMessage{ + Topic: kt.topicName, + Key: sarama.StringEncoder(msg.PartitionKey), + Value: sarama.ByteEncoder(msg.Data), + }) + requestFinished := time.Now() + + msg.TimeRequestStarted = requestStarted + msg.TimeRequestFinished = requestFinished + + if err != nil { + errResult = multierror.Append(errResult, err) + msg.SetError(err) + failed = append(failed, msg) + } else { + if msg.AckFunc != nil { + msg.AckFunc() + } + sent = append(sent, msg) } - sent = append(sent, msg) } + } else { + errResult = multierror.Append(errResult, fmt.Errorf("no producer has been configured")) } - } else { - errResult = multierror.Append(errResult, fmt.Errorf("no producer has been configured")) } if errResult != nil { @@ -282,7 +294,7 @@ func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteRes sent, failed, oversized, - nil, + invalid, ), errResult } diff --git a/pkg/target/kafka_test.go b/pkg/target/kafka_test.go index a60c089b..78b88523 100644 --- a/pkg/target/kafka_test.go +++ b/pkg/target/kafka_test.go @@ -19,6 +19,7 @@ import ( "github.com/IBM/sarama/mocks" log "github.com/sirupsen/logrus" + "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/testutil" "github.com/stretchr/testify/assert" ) @@ -79,7 +80,9 @@ func TestKafkaTarget_AsyncWriteFailure(t *testing.T) { messages := testutil.GetTestMessages(1, "Hello Kafka!!", nil) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.NotNil(err) if err != nil { assert.Equal("Error writing messages to Kafka topic: : 1 error occurred:\n\t* kafka: client has run out of available brokers to talk to\n\n", err.Error()) @@ -110,7 +113,9 @@ func TestKafkaTarget_AsyncWriteSuccess(t *testing.T) { messages := testutil.GetTestMessages(501, "Hello Kafka!!", ackFunc) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) @@ -134,7 +139,9 @@ func TestKafkaTarget_SyncWriteFailure(t *testing.T) { messages := testutil.GetTestMessages(1, "Hello Kafka!!", nil) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.NotNil(err) if err != nil { assert.Equal("Error writing messages to Kafka topic: : 1 error occurred:\n\t* kafka: client has run out of available brokers to talk to\n\n", err.Error()) @@ -165,7 +172,9 @@ func TestKafkaTarget_SyncWriteSuccess(t *testing.T) { messages := testutil.GetTestMessages(501, "Hello Kafka!!", ackFunc) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) @@ -197,7 +206,9 @@ func TestKafkaTarget_WriteSuccess_OversizeBatch(t *testing.T) { messages := testutil.GetTestMessages(10, "Hello Kafka!!", ackFunc) messages = append(messages, testutil.GetTestMessages(10, testutil.GenRandomString(1048576), ackFunc)...) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) @@ -229,7 +240,9 @@ func TestKafkaTarget_WriteSuccess_OversizeRecord(t *testing.T) { messages := testutil.GetTestMessages(10, "Hello Kafka!!", ackFunc) messages = append(messages, testutil.GetTestMessages(1, testutil.GenRandomString(1048577), ackFunc)...) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index cd7fdfdc..70e7aeeb 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -127,6 +127,7 @@ func AdaptKinesisTargetFunc(f func(c *KinesisTargetConfig) (*KinesisTarget, erro func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { kt.log.Debugf("Writing %d messages to stream ...", len(messages)) + // TODO: Replace with new batch transformation chunks, oversized := models.GetChunkedMessages( messages, kt.requestMaxMessages, diff --git a/pkg/target/oauth2_test.go b/pkg/target/oauth2_test.go index 21b514f5..8371a35a 100644 --- a/pkg/target/oauth2_test.go +++ b/pkg/target/oauth2_test.go @@ -116,7 +116,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input target := oauth2Target(t, server.URL, inputClientID, inputClientSecret, inputRefreshToken, tokenServer.URL) message := testutil.GetTestMessages(1, "Hello Server!!", func() {}) - return target.Write(message) + return target.Write(message, nil) } func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget { diff --git a/pkg/target/pubsub.go b/pkg/target/pubsub.go index c86e9da3..28a625cb 100644 --- a/pkg/target/pubsub.go +++ b/pkg/target/pubsub.go @@ -125,6 +125,7 @@ func (ps *PubSubTarget) Write(messages []*models.Message) (*models.TargetWriteRe var results []*pubSubPublishResult + // TODO: Refactor for Batch transformation model safeMessages, oversized := models.FilterOversizedMessages( messages, ps.MaximumAllowedMessageSizeBytes(), diff --git a/pkg/target/targetiface/target.go b/pkg/target/targetiface/target.go index 405adc32..a2bc73ac 100644 --- a/pkg/target/targetiface/target.go +++ b/pkg/target/targetiface/target.go @@ -12,12 +12,13 @@ package targetiface import ( + "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/models" ) // Target describes the interface for how to push the data pulled from the source type Target interface { - Write(messages []*models.Message) (*models.TargetWriteResult, error) + Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) Open() Close() MaximumAllowedMessageSizeBytes() int From 3c31b74b65ddbd6d24fa3b4a40ff7ab5910e2229 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 19 Jun 2024 16:13:01 +0100 Subject: [PATCH 2/4] Josh's pointer to use pointers --- pkg/batchtransform/batch_transform.go | 12 ++++++------ pkg/models/batch_transformation.go | 2 +- pkg/target/common.go | 6 +++--- pkg/target/http.go | 13 ++++++++----- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/batchtransform/batch_transform.go b/pkg/batchtransform/batch_transform.go index a9d2c9fa..3459050a 100644 --- a/pkg/batchtransform/batch_transform.go +++ b/pkg/batchtransform/batch_transform.go @@ -15,10 +15,10 @@ import "github.com/snowplow/snowbridge/pkg/models" // BatchTransformationFunction is a transformation function which operates across a batch of events // It takes a batch as an input, and returns a successful batch and a slice of invalid messages -type BatchTransformationFunction func([]models.MessageBatch) (success []models.MessageBatch, invalid []*models.Message, oversized []*models.Message) +type BatchTransformationFunction func([]*models.MessageBatch) (success []*models.MessageBatch, invalid []*models.Message, oversized []*models.Message) // BatchTransformationApplyFunction combines batch into one callable function -type BatchTransformationApplyFunction func([]*models.Message, []BatchTransformationFunction, []BatchTransformationFunction) models.BatchTransformationResult +type BatchTransformationApplyFunction func([]*models.Message, []BatchTransformationFunction, []BatchTransformationFunction) *models.BatchTransformationResult // BatchTransformationGenerator returns a BatchTransformationApplyFunction from a provided set of BatchTransformationFunctions type BatchTransformationGenerator func(...BatchTransformationFunction) BatchTransformationApplyFunction @@ -28,9 +28,9 @@ func NewBatchTransformation(tranformFunctions ...BatchTransformationFunction) Ba // pre is a function to be run before the configured ones, post is to be run after. // This is done because sometimes functions need to _always_ run first or last, depending on the specific target logic. (eg. batching by dynamic headers, if configured) // pre and post functions are intended for use only in the implementations of targets. - return func(messages []*models.Message, pre []BatchTransformationFunction, post []BatchTransformationFunction) models.BatchTransformationResult { + return func(messages []*models.Message, pre []BatchTransformationFunction, post []BatchTransformationFunction) *models.BatchTransformationResult { // make a batch to begin with - success := []models.MessageBatch{{OriginalMessages: messages}} + success := []*models.MessageBatch{{OriginalMessages: messages}} // Because http will require specific functions to always go first and last, we provide these here // Compiler gets confused if we don't rename. @@ -39,7 +39,7 @@ func NewBatchTransformation(tranformFunctions ...BatchTransformationFunction) Ba // If no transformations, just return a result if len(functionsToRun) == 0 { - return models.BatchTransformationResult{Success: success} + return &models.BatchTransformationResult{Success: success} } var invalid []*models.Message @@ -56,6 +56,6 @@ func NewBatchTransformation(tranformFunctions ...BatchTransformationFunction) Ba oversizedList = append(oversizedList, oversized...) } - return models.BatchTransformationResult{Success: success, Invalid: invalidList, Oversized: oversizedList} + return &models.BatchTransformationResult{Success: success, Invalid: invalidList, Oversized: oversizedList} } } diff --git a/pkg/models/batch_transformation.go b/pkg/models/batch_transformation.go index e1dd1ada..eae4193d 100644 --- a/pkg/models/batch_transformation.go +++ b/pkg/models/batch_transformation.go @@ -24,7 +24,7 @@ type MessageBatch struct { // BatchTransformationResult houses the result of a batch transformation type BatchTransformationResult struct { - Success []MessageBatch + Success []*MessageBatch Invalid []*Message Oversized []*Message } diff --git a/pkg/target/common.go b/pkg/target/common.go index 81330a5e..223d2359 100644 --- a/pkg/target/common.go +++ b/pkg/target/common.go @@ -22,9 +22,9 @@ func chunkBatcherWithConfig(chunkSize int, maxMessageByteSize int, maxChunkByteS // chunkBatcher is a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model, // preserving the original logic and ownership of the function. - chunkBatcher := func(batchesIn []models.MessageBatch) ([]models.MessageBatch, []*models.Message, []*models.Message) { + chunkBatcher := func(batchesIn []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) { oversizedOut := make([]*models.Message, 0) - chunkedBatches := make([]models.MessageBatch, 0) + chunkedBatches := make([]*models.MessageBatch, 0) for _, batch := range batchesIn { chunks, oversized := models.GetChunkedMessages(batch.OriginalMessages, chunkSize, maxMessageByteSize, maxChunkByteSize) @@ -32,7 +32,7 @@ func chunkBatcherWithConfig(chunkSize int, maxMessageByteSize int, maxChunkByteS oversizedOut = append(oversizedOut, oversized...) for _, chunk := range chunks { - asBatch := models.MessageBatch{ + asBatch := &models.MessageBatch{ OriginalMessages: chunk, } diff --git a/pkg/target/http.go b/pkg/target/http.go index 8c77a74f..8c82d863 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -215,7 +215,7 @@ func AdaptHTTPTargetFunc(f func(c *HTTPTargetConfig) (*HTTPTarget, error)) HTTPT // When we have dynamic headers, batching by header must necessarily run first. This is a http specific function, // so defining it here and fixing it into the Write function avoids complexity in configuration -func (ht *HTTPTarget) groupByDynamicHeaders(batches []models.MessageBatch) ([]models.MessageBatch, []*models.Message, []*models.Message) { +func (ht *HTTPTarget) groupByDynamicHeaders(batches []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) { if !ht.dynamicHeaders { // If the feature is disabled just return return batches, nil, nil @@ -240,9 +240,9 @@ func (ht *HTTPTarget) groupByDynamicHeaders(batches []models.MessageBatch) ([]mo } } - outBatches := []models.MessageBatch{} + outBatches := []*models.MessageBatch{} for _, batch := range headersFound { - outBatches = append(outBatches, *batch) + outBatches = append(outBatches, batch) } return outBatches, nil, nil @@ -250,7 +250,7 @@ func (ht *HTTPTarget) groupByDynamicHeaders(batches []models.MessageBatch) ([]mo // Where no transformation function provides a request body, we must provide one - this necessarily must happen last. // This is a http specific function so we define it here to avoid scope for misconfiguration -func (ht *HTTPTarget) provideRequestBody(batches []models.MessageBatch) ([]models.MessageBatch, []*models.Message, []*models.Message) { +func (ht *HTTPTarget) provideRequestBody(batches []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) { // TODO: Add test for when messagess are just strings & confirm that it all works @@ -297,7 +297,10 @@ func (ht *HTTPTarget) Write(messages []*models.Message, batchTransformFunc batch // Run the transformations // We provide a 'pre' function to group by Dynamic headers (if enabled) - this must necessarily happen first. // We also provide a 'post' function to create a message Body if none is provided via templater - this must happen last. - batchTransformRes := batchTransformFunc(safeMessages, []batchtransform.BatchTransformationFunction{ht.groupByDynamicHeaders}, []batchtransform.BatchTransformationFunction{ht.provideRequestBody}) + batchTransformRes := batchTransformFunc( + safeMessages, + []batchtransform.BatchTransformationFunction{ht.groupByDynamicHeaders}, // runs first + []batchtransform.BatchTransformationFunction{ht.provideRequestBody}) // runs last invalid = append(invalid, batchTransformRes.Invalid...) From 869441b4c3498c40adfc7fbf565c47e4adbf7440 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 19 Jun 2024 16:18:25 +0100 Subject: [PATCH 3/4] Move existing transforms to 'single' directory --- cmd/aws/cli/main.go | 2 +- cmd/cli/cli.go | 4 ++-- cmd/main/cli/main.go | 2 +- docs/configuration_transformations_docs_test.go | 8 ++++---- pkg/transform/{ => single}/base64Decode.go | 0 pkg/transform/{ => single}/base64Encode.go | 0 pkg/transform/{ => single}/base64_test.go | 0 pkg/transform/{ => single}/engine/engine.go | 2 +- pkg/transform/{ => single}/engine/engine_javascript.go | 2 +- .../{ => single}/engine/engine_javascript_test.go | 0 pkg/transform/{ => single}/engine/engine_lua.go | 2 +- pkg/transform/{ => single}/engine/engine_lua_test.go | 2 +- .../{ => single}/engine/engine_test_variables.go | 0 .../filter/snowplow_enriched_filter_atomic.go | 2 +- .../filter/snowplow_enriched_filter_atomic_test.go | 2 +- .../filter/snowplow_enriched_filter_common.go | 2 +- .../filter/snowplow_enriched_filter_common_test.go | 2 +- .../filter/snowplow_enriched_filter_context.go | 2 +- .../filter/snowplow_enriched_filter_context_test.go | 2 +- .../filter/snowplow_enriched_filter_unstruct.go | 2 +- .../filter/snowplow_enriched_filter_unstruct_test.go | 2 +- pkg/transform/{ => single}/setup_test.go | 0 pkg/transform/{ => single}/snowplow_enriched_set_pk.go | 0 .../{ => single}/snowplow_enriched_set_pk_test.go | 0 pkg/transform/{ => single}/snowplow_enriched_to_json.go | 0 .../{ => single}/snowplow_enriched_to_json_test.go | 0 pkg/transform/{ => single}/snowplow_enriched_util.go | 0 pkg/transform/{ => single}/snowplow_enriched_util_test.go | 0 pkg/transform/{ => single}/snowplow_gtmss_preview.go | 0 pkg/transform/{ => single}/snowplow_gtmss_preview_test.go | 0 pkg/transform/{ => single}/transform.go | 0 pkg/transform/{ => single}/transform_test.go | 0 pkg/transform/{ => single}/transform_test_variables.go | 0 .../{ => single}/transformconfig/transform_config.go | 6 +++--- .../{ => single}/transformconfig/transform_config_test.go | 0 35 files changed, 23 insertions(+), 23 deletions(-) rename pkg/transform/{ => single}/base64Decode.go (100%) rename pkg/transform/{ => single}/base64Encode.go (100%) rename pkg/transform/{ => single}/base64_test.go (100%) rename pkg/transform/{ => single}/engine/engine.go (95%) rename pkg/transform/{ => single}/engine/engine_javascript.go (99%) rename pkg/transform/{ => single}/engine/engine_javascript_test.go (100%) rename pkg/transform/{ => single}/engine/engine_lua.go (99%) rename pkg/transform/{ => single}/engine/engine_lua_test.go (99%) rename pkg/transform/{ => single}/engine/engine_test_variables.go (100%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_atomic.go (98%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_atomic_test.go (98%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_common.go (98%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_common_test.go (98%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_context.go (98%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_context_test.go (98%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_unstruct.go (98%) rename pkg/transform/{ => single}/filter/snowplow_enriched_filter_unstruct_test.go (98%) rename pkg/transform/{ => single}/setup_test.go (100%) rename pkg/transform/{ => single}/snowplow_enriched_set_pk.go (100%) rename pkg/transform/{ => single}/snowplow_enriched_set_pk_test.go (100%) rename pkg/transform/{ => single}/snowplow_enriched_to_json.go (100%) rename pkg/transform/{ => single}/snowplow_enriched_to_json_test.go (100%) rename pkg/transform/{ => single}/snowplow_enriched_util.go (100%) rename pkg/transform/{ => single}/snowplow_enriched_util_test.go (100%) rename pkg/transform/{ => single}/snowplow_gtmss_preview.go (100%) rename pkg/transform/{ => single}/snowplow_gtmss_preview_test.go (100%) rename pkg/transform/{ => single}/transform.go (100%) rename pkg/transform/{ => single}/transform_test.go (100%) rename pkg/transform/{ => single}/transform_test_variables.go (100%) rename pkg/transform/{ => single}/transformconfig/transform_config.go (91%) rename pkg/transform/{ => single}/transformconfig/transform_config_test.go (100%) diff --git a/cmd/aws/cli/main.go b/cmd/aws/cli/main.go index 5947677b..e49600c3 100644 --- a/cmd/aws/cli/main.go +++ b/cmd/aws/cli/main.go @@ -20,7 +20,7 @@ import ( pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" sqssource "github.com/snowplow/snowbridge/pkg/source/sqs" stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) func main() { diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index 9108e955..aa80fae2 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -37,8 +37,8 @@ import ( "github.com/snowplow/snowbridge/pkg/source/sourceiface" "github.com/snowplow/snowbridge/pkg/target/targetiface" "github.com/snowplow/snowbridge/pkg/telemetry" - "github.com/snowplow/snowbridge/pkg/transform" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + transform "github.com/snowplow/snowbridge/pkg/transform/single" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) const ( diff --git a/cmd/main/cli/main.go b/cmd/main/cli/main.go index 21068085..84d285ac 100644 --- a/cmd/main/cli/main.go +++ b/cmd/main/cli/main.go @@ -19,7 +19,7 @@ import ( pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" sqssource "github.com/snowplow/snowbridge/pkg/source/sqs" stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) func main() { diff --git a/docs/configuration_transformations_docs_test.go b/docs/configuration_transformations_docs_test.go index 01ca9bd1..211a789c 100644 --- a/docs/configuration_transformations_docs_test.go +++ b/docs/configuration_transformations_docs_test.go @@ -21,10 +21,10 @@ import ( "github.com/hashicorp/hcl/v2/gohcl" "github.com/snowplow/snowbridge/assets" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" - "github.com/snowplow/snowbridge/pkg/transform/engine" - "github.com/snowplow/snowbridge/pkg/transform/filter" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + transform "github.com/snowplow/snowbridge/pkg/transform/single" + "github.com/snowplow/snowbridge/pkg/transform/single/engine" + "github.com/snowplow/snowbridge/pkg/transform/single/filter" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" "github.com/stretchr/testify/assert" ) diff --git a/pkg/transform/base64Decode.go b/pkg/transform/single/base64Decode.go similarity index 100% rename from pkg/transform/base64Decode.go rename to pkg/transform/single/base64Decode.go diff --git a/pkg/transform/base64Encode.go b/pkg/transform/single/base64Encode.go similarity index 100% rename from pkg/transform/base64Encode.go rename to pkg/transform/single/base64Encode.go diff --git a/pkg/transform/base64_test.go b/pkg/transform/single/base64_test.go similarity index 100% rename from pkg/transform/base64_test.go rename to pkg/transform/single/base64_test.go diff --git a/pkg/transform/engine/engine.go b/pkg/transform/single/engine/engine.go similarity index 95% rename from pkg/transform/engine/engine.go rename to pkg/transform/single/engine/engine.go index 83b42dac..abf6cd88 100644 --- a/pkg/transform/engine/engine.go +++ b/pkg/transform/single/engine/engine.go @@ -13,7 +13,7 @@ package engine import ( jsoniter "github.com/json-iterator/go" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // make a jsoniter instance that won't escape html diff --git a/pkg/transform/engine/engine_javascript.go b/pkg/transform/single/engine/engine_javascript.go similarity index 99% rename from pkg/transform/engine/engine_javascript.go rename to pkg/transform/single/engine/engine_javascript.go index 62b94fcf..387855a3 100644 --- a/pkg/transform/engine/engine_javascript.go +++ b/pkg/transform/single/engine/engine_javascript.go @@ -24,7 +24,7 @@ import ( "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // JSEngineConfig configures the JavaScript Engine. diff --git a/pkg/transform/engine/engine_javascript_test.go b/pkg/transform/single/engine/engine_javascript_test.go similarity index 100% rename from pkg/transform/engine/engine_javascript_test.go rename to pkg/transform/single/engine/engine_javascript_test.go diff --git a/pkg/transform/engine/engine_lua.go b/pkg/transform/single/engine/engine_lua.go similarity index 99% rename from pkg/transform/engine/engine_lua.go rename to pkg/transform/single/engine/engine_lua.go index 810e3253..8bb906f3 100644 --- a/pkg/transform/engine/engine_lua.go +++ b/pkg/transform/single/engine/engine_lua.go @@ -26,7 +26,7 @@ import ( "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // LuaEngineConfig configures the Lua Engine. diff --git a/pkg/transform/engine/engine_lua_test.go b/pkg/transform/single/engine/engine_lua_test.go similarity index 99% rename from pkg/transform/engine/engine_lua_test.go rename to pkg/transform/single/engine/engine_lua_test.go index 6979c1c2..e09bbdf3 100644 --- a/pkg/transform/engine/engine_lua_test.go +++ b/pkg/transform/single/engine/engine_lua_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) type LuaTestCase struct { diff --git a/pkg/transform/engine/engine_test_variables.go b/pkg/transform/single/engine/engine_test_variables.go similarity index 100% rename from pkg/transform/engine/engine_test_variables.go rename to pkg/transform/single/engine/engine_test_variables.go diff --git a/pkg/transform/filter/snowplow_enriched_filter_atomic.go b/pkg/transform/single/filter/snowplow_enriched_filter_atomic.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_atomic.go rename to pkg/transform/single/filter/snowplow_enriched_filter_atomic.go index 769ec7e8..115041c2 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_atomic.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_atomic.go @@ -14,7 +14,7 @@ package filter import ( "github.com/pkg/errors" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_atomic_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_atomic_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_atomic_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_atomic_test.go index ca2edab6..3c801071 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_atomic_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_atomic_test.go @@ -18,7 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) func TestMakeBaseValueGetter(t *testing.T) { diff --git a/pkg/transform/filter/snowplow_enriched_filter_common.go b/pkg/transform/single/filter/snowplow_enriched_filter_common.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_common.go rename to pkg/transform/single/filter/snowplow_enriched_filter_common.go index 9f43e158..6c79df70 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_common.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_common.go @@ -21,7 +21,7 @@ import ( "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // evaluateSpEnrichedfilter takes a regex and a slice of values, and returns whether or not a value has been matched diff --git a/pkg/transform/filter/snowplow_enriched_filter_common_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_common_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_common_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_common_test.go index 2cda07a2..27e8f322 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_common_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_common_test.go @@ -19,7 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) var messageGood = models.Message{ diff --git a/pkg/transform/filter/snowplow_enriched_filter_context.go b/pkg/transform/single/filter/snowplow_enriched_filter_context.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_context.go rename to pkg/transform/single/filter/snowplow_enriched_filter_context.go index 2e2c008c..7b095b56 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_context.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_context.go @@ -16,7 +16,7 @@ import ( "github.com/pkg/errors" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_context_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_context_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_context_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_context_test.go index 384cec46..5c75c673 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_context_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_context_test.go @@ -16,7 +16,7 @@ import ( "testing" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/stretchr/testify/assert" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_unstruct.go b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_unstruct.go rename to pkg/transform/single/filter/snowplow_enriched_filter_unstruct.go index 35e714bf..459cdba4 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_unstruct.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct.go @@ -18,7 +18,7 @@ import ( "github.com/pkg/errors" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_unstruct_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_unstruct_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_unstruct_test.go index abec4af6..84a46746 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_unstruct_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct_test.go @@ -17,7 +17,7 @@ import ( "testing" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/stretchr/testify/assert" ) diff --git a/pkg/transform/setup_test.go b/pkg/transform/single/setup_test.go similarity index 100% rename from pkg/transform/setup_test.go rename to pkg/transform/single/setup_test.go diff --git a/pkg/transform/snowplow_enriched_set_pk.go b/pkg/transform/single/snowplow_enriched_set_pk.go similarity index 100% rename from pkg/transform/snowplow_enriched_set_pk.go rename to pkg/transform/single/snowplow_enriched_set_pk.go diff --git a/pkg/transform/snowplow_enriched_set_pk_test.go b/pkg/transform/single/snowplow_enriched_set_pk_test.go similarity index 100% rename from pkg/transform/snowplow_enriched_set_pk_test.go rename to pkg/transform/single/snowplow_enriched_set_pk_test.go diff --git a/pkg/transform/snowplow_enriched_to_json.go b/pkg/transform/single/snowplow_enriched_to_json.go similarity index 100% rename from pkg/transform/snowplow_enriched_to_json.go rename to pkg/transform/single/snowplow_enriched_to_json.go diff --git a/pkg/transform/snowplow_enriched_to_json_test.go b/pkg/transform/single/snowplow_enriched_to_json_test.go similarity index 100% rename from pkg/transform/snowplow_enriched_to_json_test.go rename to pkg/transform/single/snowplow_enriched_to_json_test.go diff --git a/pkg/transform/snowplow_enriched_util.go b/pkg/transform/single/snowplow_enriched_util.go similarity index 100% rename from pkg/transform/snowplow_enriched_util.go rename to pkg/transform/single/snowplow_enriched_util.go diff --git a/pkg/transform/snowplow_enriched_util_test.go b/pkg/transform/single/snowplow_enriched_util_test.go similarity index 100% rename from pkg/transform/snowplow_enriched_util_test.go rename to pkg/transform/single/snowplow_enriched_util_test.go diff --git a/pkg/transform/snowplow_gtmss_preview.go b/pkg/transform/single/snowplow_gtmss_preview.go similarity index 100% rename from pkg/transform/snowplow_gtmss_preview.go rename to pkg/transform/single/snowplow_gtmss_preview.go diff --git a/pkg/transform/snowplow_gtmss_preview_test.go b/pkg/transform/single/snowplow_gtmss_preview_test.go similarity index 100% rename from pkg/transform/snowplow_gtmss_preview_test.go rename to pkg/transform/single/snowplow_gtmss_preview_test.go diff --git a/pkg/transform/transform.go b/pkg/transform/single/transform.go similarity index 100% rename from pkg/transform/transform.go rename to pkg/transform/single/transform.go diff --git a/pkg/transform/transform_test.go b/pkg/transform/single/transform_test.go similarity index 100% rename from pkg/transform/transform_test.go rename to pkg/transform/single/transform_test.go diff --git a/pkg/transform/transform_test_variables.go b/pkg/transform/single/transform_test_variables.go similarity index 100% rename from pkg/transform/transform_test_variables.go rename to pkg/transform/single/transform_test_variables.go diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/single/transformconfig/transform_config.go similarity index 91% rename from pkg/transform/transformconfig/transform_config.go rename to pkg/transform/single/transformconfig/transform_config.go index bff5cec9..dbc54a0a 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/single/transformconfig/transform_config.go @@ -15,9 +15,9 @@ import ( "fmt" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" - "github.com/snowplow/snowbridge/pkg/transform/engine" - "github.com/snowplow/snowbridge/pkg/transform/filter" + transform "github.com/snowplow/snowbridge/pkg/transform/single" + "github.com/snowplow/snowbridge/pkg/transform/single/engine" + "github.com/snowplow/snowbridge/pkg/transform/single/filter" ) // SupportedTransformations is a ConfigurationPair slice containing all the officially supported transformations. diff --git a/pkg/transform/transformconfig/transform_config_test.go b/pkg/transform/single/transformconfig/transform_config_test.go similarity index 100% rename from pkg/transform/transformconfig/transform_config_test.go rename to pkg/transform/single/transformconfig/transform_config_test.go From f46f39820669f275de014e94b9d066b0a9628b03 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 19 Jun 2024 16:26:53 +0100 Subject: [PATCH 4/4] Move batch transformations under the transform umbrella --- cmd/aws/cli/main.go | 2 +- cmd/cli/cli.go | 4 ++-- cmd/main/cli/main.go | 2 +- pkg/failure/snowplow_test.go | 2 +- pkg/target/common.go | 2 +- pkg/target/eventhub.go | 2 +- pkg/target/http.go | 2 +- pkg/target/http_test.go | 2 +- pkg/target/kafka.go | 2 +- pkg/target/kafka_test.go | 2 +- pkg/target/targetiface/target.go | 2 +- pkg/{batchtransform => transform/batch}/batch_transform.go | 0 .../batch}/batchtransformconfig/batch_transform_config.go | 2 +- pkg/{batchtransform => transform/batch}/template.go | 0 14 files changed, 13 insertions(+), 13 deletions(-) rename pkg/{batchtransform => transform/batch}/batch_transform.go (100%) rename pkg/{batchtransform => transform/batch}/batchtransformconfig/batch_transform_config.go (96%) rename pkg/{batchtransform => transform/batch}/template.go (100%) diff --git a/cmd/aws/cli/main.go b/cmd/aws/cli/main.go index e49600c3..f931ca5e 100644 --- a/cmd/aws/cli/main.go +++ b/cmd/aws/cli/main.go @@ -14,12 +14,12 @@ package main import ( "github.com/snowplow/snowbridge/cmd/cli" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig" kafkasource "github.com/snowplow/snowbridge/pkg/source/kafka" kinesissource "github.com/snowplow/snowbridge/pkg/source/kinesis" pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" sqssource "github.com/snowplow/snowbridge/pkg/source/sqs" stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin" + "github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig" "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index aa80fae2..f3994e65 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -28,8 +28,6 @@ import ( "github.com/snowplow/snowbridge/cmd" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/batchtransform" - "github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig" "github.com/snowplow/snowbridge/pkg/failure/failureiface" "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/observer" @@ -37,6 +35,8 @@ import ( "github.com/snowplow/snowbridge/pkg/source/sourceiface" "github.com/snowplow/snowbridge/pkg/target/targetiface" "github.com/snowplow/snowbridge/pkg/telemetry" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" + "github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig" transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) diff --git a/cmd/main/cli/main.go b/cmd/main/cli/main.go index 84d285ac..b3f710ca 100644 --- a/cmd/main/cli/main.go +++ b/cmd/main/cli/main.go @@ -14,11 +14,11 @@ package main import ( "github.com/snowplow/snowbridge/cmd/cli" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig" kafkasource "github.com/snowplow/snowbridge/pkg/source/kafka" pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" sqssource "github.com/snowplow/snowbridge/pkg/source/sqs" stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin" + "github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig" "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) diff --git a/pkg/failure/snowplow_test.go b/pkg/failure/snowplow_test.go index 2bc832a4..dc6f343a 100644 --- a/pkg/failure/snowplow_test.go +++ b/pkg/failure/snowplow_test.go @@ -17,9 +17,9 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/testutil" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // --- Test FailureTarget diff --git a/pkg/target/common.go b/pkg/target/common.go index 223d2359..1c2b62e3 100644 --- a/pkg/target/common.go +++ b/pkg/target/common.go @@ -12,8 +12,8 @@ package target import ( - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // chunkBatcherWithConfig returns a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model. diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index 42eed48f..318fe844 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -22,8 +22,8 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // EventHubConfig holds a config object for Azure EventHub diff --git a/pkg/target/http.go b/pkg/target/http.go index 8c82d863..b166d020 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -24,9 +24,9 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" "golang.org/x/oauth2" ) diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index 5989ab37..37fc45d7 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -22,8 +22,8 @@ import ( "github.com/stretchr/testify/assert" - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/testutil" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) func createTestServerWithResponseCode(results *[][]byte, waitgroup *sync.WaitGroup, responseCode int) *httptest.Server { diff --git a/pkg/target/kafka.go b/pkg/target/kafka.go index 17cbb112..9a2885da 100644 --- a/pkg/target/kafka.go +++ b/pkg/target/kafka.go @@ -20,9 +20,9 @@ import ( "github.com/hashicorp/go-multierror" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // KafkaConfig contains configurable options for the kafka target diff --git a/pkg/target/kafka_test.go b/pkg/target/kafka_test.go index 78b88523..19e75657 100644 --- a/pkg/target/kafka_test.go +++ b/pkg/target/kafka_test.go @@ -19,8 +19,8 @@ import ( "github.com/IBM/sarama/mocks" log "github.com/sirupsen/logrus" - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/testutil" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" "github.com/stretchr/testify/assert" ) diff --git a/pkg/target/targetiface/target.go b/pkg/target/targetiface/target.go index a2bc73ac..601f148c 100644 --- a/pkg/target/targetiface/target.go +++ b/pkg/target/targetiface/target.go @@ -12,8 +12,8 @@ package targetiface import ( - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // Target describes the interface for how to push the data pulled from the source diff --git a/pkg/batchtransform/batch_transform.go b/pkg/transform/batch/batch_transform.go similarity index 100% rename from pkg/batchtransform/batch_transform.go rename to pkg/transform/batch/batch_transform.go diff --git a/pkg/batchtransform/batchtransformconfig/batch_transform_config.go b/pkg/transform/batch/batchtransformconfig/batch_transform_config.go similarity index 96% rename from pkg/batchtransform/batchtransformconfig/batch_transform_config.go rename to pkg/transform/batch/batchtransformconfig/batch_transform_config.go index 1173ed90..26ffb875 100644 --- a/pkg/batchtransform/batchtransformconfig/batch_transform_config.go +++ b/pkg/transform/batch/batchtransformconfig/batch_transform_config.go @@ -15,7 +15,7 @@ import ( "fmt" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/batchtransform" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // SupportedTransformations is a ConfigurationPair slice containing all the officially supported transformations. diff --git a/pkg/batchtransform/template.go b/pkg/transform/batch/template.go similarity index 100% rename from pkg/batchtransform/template.go rename to pkg/transform/batch/template.go