diff --git a/cmd/aws/cli/main.go b/cmd/aws/cli/main.go index e38fec40..f931ca5e 100644 --- a/cmd/aws/cli/main.go +++ b/cmd/aws/cli/main.go @@ -19,7 +19,8 @@ import ( pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" sqssource "github.com/snowplow/snowbridge/pkg/source/sqs" stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + "github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) func main() { @@ -27,5 +28,5 @@ func main() { sourceConfigPairs := []config.ConfigurationPair{stdinsource.ConfigPair, sqssource.ConfigPair, pubsubsource.ConfigPair, kafkasource.ConfigPair, kinesissource.ConfigPair} - cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations) + cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations) } diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index 65b4c198..f3994e65 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -35,8 +35,10 @@ import ( "github.com/snowplow/snowbridge/pkg/source/sourceiface" "github.com/snowplow/snowbridge/pkg/target/targetiface" "github.com/snowplow/snowbridge/pkg/telemetry" - "github.com/snowplow/snowbridge/pkg/transform" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" + "github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig" + transform "github.com/snowplow/snowbridge/pkg/transform/single" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) const ( @@ -47,7 +49,11 @@ const ( ) // RunCli runs the app -func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) { +func RunCli( + supportedSources []config.ConfigurationPair, + supportedTransformations []config.ConfigurationPair, + supportedBatchTransformations []config.ConfigurationPair, +) { cfg, sentryEnabled, err := cmd.Init() if err != nil { exitWithError(err, sentryEnabled) @@ -95,6 +101,11 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation return err } + btr, err := batchtransformconfig.GetBatchTransformations(cfg, batchtransformconfig.SupportedTransformations) + if err != nil { + return err + } + t, err := cfg.GetTarget() if err != nil { return err @@ -158,7 +169,7 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation // Callback functions for the source to leverage when writing data sf := sourceiface.SourceFunctions{ - WriteToTarget: sourceWriteFunc(t, ft, tr, o), + WriteToTarget: sourceWriteFunc(t, ft, tr, btr, o), } // Read is a long running process and will only return when the source @@ -189,7 +200,13 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation // 4. Observing these results // // All with retry logic baked in to remove any of this handling from the implementations -func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error { +func sourceWriteFunc( + t targetiface.Target, + ft failureiface.Failure, + tr transform.TransformationApplyFunction, + btr batchtransform.BatchTransformationApplyFunction, + o *observer.Observer, +) func(messages []*models.Message) error { return func(messages []*models.Message) error { // Apply transformations @@ -211,7 +228,7 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform messagesToSend := transformed.Result res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) { - res, err := t.Write(messagesToSend) + res, err := t.Write(messagesToSend, btr) o.TargetWrite(res) messagesToSend = res.Failed diff --git a/cmd/main/cli/main.go b/cmd/main/cli/main.go index 26e6a72b..b3f710ca 100644 --- a/cmd/main/cli/main.go +++ b/cmd/main/cli/main.go @@ -18,7 +18,8 @@ import ( pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub" sqssource "github.com/snowplow/snowbridge/pkg/source/sqs" stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + "github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" ) func main() { @@ -28,5 +29,5 @@ func main() { kafkasource.ConfigPair, pubsubsource.ConfigPair, } - cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations) + cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations) } diff --git a/config/config.go b/config/config.go index 3971a98f..73077fc1 100644 --- a/config/config.go +++ b/config/config.go @@ -46,16 +46,17 @@ type Config struct { // configurationData for holding all configuration options type configurationData struct { - Source *component `hcl:"source,block" envPrefix:"SOURCE_"` - Target *component `hcl:"target,block" envPrefix:"TARGET_"` - FailureTarget *failureConfig `hcl:"failure_target,block"` - Sentry *sentryConfig `hcl:"sentry,block"` - StatsReceiver *statsConfig `hcl:"stats_receiver,block"` - Transformations []*component `hcl:"transform,block"` - LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"` - UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"` - DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"` - License *licenseConfig `hcl:"license,block"` + Source *component `hcl:"source,block" envPrefix:"SOURCE_"` + Target *component `hcl:"target,block" envPrefix:"TARGET_"` + FailureTarget *failureConfig `hcl:"failure_target,block"` + Sentry *sentryConfig `hcl:"sentry,block"` + StatsReceiver *statsConfig `hcl:"stats_receiver,block"` + Transformations []*component `hcl:"transform,block"` + BatchTransformations []*component `hcl:"batch_transform,block"` + LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"` + UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"` + DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"` + License *licenseConfig `hcl:"license,block"` } // component is a type to abstract over configuration blocks. diff --git a/docs/configuration_transformations_docs_test.go b/docs/configuration_transformations_docs_test.go index 01ca9bd1..211a789c 100644 --- a/docs/configuration_transformations_docs_test.go +++ b/docs/configuration_transformations_docs_test.go @@ -21,10 +21,10 @@ import ( "github.com/hashicorp/hcl/v2/gohcl" "github.com/snowplow/snowbridge/assets" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" - "github.com/snowplow/snowbridge/pkg/transform/engine" - "github.com/snowplow/snowbridge/pkg/transform/filter" - "github.com/snowplow/snowbridge/pkg/transform/transformconfig" + transform "github.com/snowplow/snowbridge/pkg/transform/single" + "github.com/snowplow/snowbridge/pkg/transform/single/engine" + "github.com/snowplow/snowbridge/pkg/transform/single/filter" + "github.com/snowplow/snowbridge/pkg/transform/single/transformconfig" "github.com/stretchr/testify/assert" ) diff --git a/pkg/failure/snowplow.go b/pkg/failure/snowplow.go index 39f7d06c..b1d9bb61 100644 --- a/pkg/failure/snowplow.go +++ b/pkg/failure/snowplow.go @@ -79,7 +79,7 @@ func (d *SnowplowFailure) WriteInvalid(invalid []*models.Message) (*models.Targe transformed = append(transformed, tMsg) } - return d.target.Write(transformed) + return d.target.Write(transformed, nil) } // WriteOversized will handle the conversion of oversized messages into failure @@ -114,7 +114,7 @@ func (d *SnowplowFailure) WriteOversized(maximumAllowedSizeBytes int, oversized transformed = append(transformed, tMsg) } - return d.target.Write(transformed) + return d.target.Write(transformed, nil) } // Open manages opening the underlying target diff --git a/pkg/failure/snowplow_test.go b/pkg/failure/snowplow_test.go index d8ff5760..dc6f343a 100644 --- a/pkg/failure/snowplow_test.go +++ b/pkg/failure/snowplow_test.go @@ -19,6 +19,7 @@ import ( "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/testutil" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // --- Test FailureTarget @@ -27,7 +28,7 @@ type TestFailureTarget struct { onWrite func(messages []*models.Message) (*models.TargetWriteResult, error) } -func (t *TestFailureTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +func (t *TestFailureTarget) Write(messages []*models.Message, btf batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { return t.onWrite(messages) } diff --git a/pkg/models/batch_transformation.go b/pkg/models/batch_transformation.go new file mode 100644 index 00000000..eae4193d --- /dev/null +++ b/pkg/models/batch_transformation.go @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package models + +import "time" + +// MessageBatch houses batches of messages, for batch transformations to operate across +type MessageBatch struct { + OriginalMessages []*Message // Most targets will use the data from here, but where we have a http templating transformation, we would use this to ack batches of messages + BatchData []byte // Where we template http requests, we use this to define the body of the request + HTTPHeaders map[string]string // For dynamic headers feature + TimeRequestStarted time.Time + TimeRequestFinished time.Time +} + +// BatchTransformationResult houses the result of a batch transformation +type BatchTransformationResult struct { + Success []*MessageBatch + Invalid []*Message + Oversized []*Message +} diff --git a/pkg/target/common.go b/pkg/target/common.go new file mode 100644 index 00000000..1c2b62e3 --- /dev/null +++ b/pkg/target/common.go @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package target + +import ( + "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" +) + +// chunkBatcherWithConfig returns a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model. +// It is done this way in order to pass GetChunkedMessages its config within the confines of the BatchTransfomration design +func chunkBatcherWithConfig(chunkSize int, maxMessageByteSize int, maxChunkByteSize int) batchtransform.BatchTransformationFunction { + + // chunkBatcher is a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model, + // preserving the original logic and ownership of the function. + chunkBatcher := func(batchesIn []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) { + oversizedOut := make([]*models.Message, 0) + chunkedBatches := make([]*models.MessageBatch, 0) + + for _, batch := range batchesIn { + chunks, oversized := models.GetChunkedMessages(batch.OriginalMessages, chunkSize, maxMessageByteSize, maxChunkByteSize) + + oversizedOut = append(oversizedOut, oversized...) + + for _, chunk := range chunks { + asBatch := &models.MessageBatch{ + OriginalMessages: chunk, + } + + chunkedBatches = append(chunkedBatches, asBatch) + } + + } + return chunkedBatches, nil, oversizedOut + } + + return chunkBatcher +} diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index c34ddf7b..318fe844 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -23,6 +23,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // EventHubConfig holds a config object for Azure EventHub @@ -147,9 +148,10 @@ func AdaptEventHubTargetFunc(f func(c *EventHubConfig) (*EventHubTarget, error)) } } -func (eht *EventHubTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +func (eht *EventHubTarget) Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { eht.log.Debugf("Writing %d messages to stream ...", len(messages)) + // TODO: Replace this with the new Chunker Batch Transformation - should be a post function. chunks, oversized := models.GetChunkedMessages( messages, eht.chunkMessageLimit, // Max Chunk size (number of messages) diff --git a/pkg/target/http.go b/pkg/target/http.go index 7738f163..b166d020 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -26,6 +26,7 @@ import ( "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" "golang.org/x/oauth2" ) @@ -212,9 +213,77 @@ func AdaptHTTPTargetFunc(f func(c *HTTPTargetConfig) (*HTTPTarget, error)) HTTPT } } -func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +// When we have dynamic headers, batching by header must necessarily run first. This is a http specific function, +// so defining it here and fixing it into the Write function avoids complexity in configuration +func (ht *HTTPTarget) groupByDynamicHeaders(batches []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) { + if !ht.dynamicHeaders { + // If the feature is disabled just return + return batches, nil, nil + } + + // Make a map of stringified header values + headersFound := make(map[string]*models.MessageBatch) + + for _, batch := range batches { + // Group data by that index + for _, msg := range batch.OriginalMessages { + headerKey := fmt.Sprint(msg.HTTPHeaders) + if headersFound[headerKey] != nil { + // If a key already exists, just add this message + headersFound[headerKey].OriginalMessages = append(headersFound[headerKey].OriginalMessages, msg) + } else { + headersFound[headerKey] = &models.MessageBatch{ + OriginalMessages: []*models.Message{msg}, + HTTPHeaders: ht.retrieveHeaders(msg), + } + } + } + } + + outBatches := []*models.MessageBatch{} + for _, batch := range headersFound { + outBatches = append(outBatches, batch) + } + + return outBatches, nil, nil +} + +// Where no transformation function provides a request body, we must provide one - this necessarily must happen last. +// This is a http specific function so we define it here to avoid scope for misconfiguration +func (ht *HTTPTarget) provideRequestBody(batches []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) { + + // TODO: Add test for when messagess are just strings & confirm that it all works + + // TODO: Note: This would mean that the GTM client gets arrays of single events instead of single events. + // But we could configure an explicit templater to change that if we wanted + // We should test to be certain that it's still compatible. + invalid := make([]*models.Message, 0) + for _, batch := range batches { + if batch.BatchData != nil { + // TODO: Check if it is nil or zero & adapt accordingly + continue + } + requestData := []string{} + for _, msg := range batch.OriginalMessages { + requestData = append(requestData, string(msg.Data)) + } + // TODO: Add tests to be sure this produces the desired request + requestBody, err := json.Marshal(requestData) + if err != nil { + // TODO: Handle errors here + fmt.Println(err) + } + batch.BatchData = requestBody + } + + return batches, invalid, nil +} + +func (ht *HTTPTarget) Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { ht.log.Debugf("Writing %d messages to endpoint ...", len(messages)) + // TODO: We are changing this target from always operating on single events to now handling batches + // this should therefore be replaced by the new chunking Batch Transformation. safeMessages, oversized := models.FilterOversizedMessages( messages, ht.MaximumAllowedMessageSizeBytes(), @@ -225,44 +294,58 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu var sent []*models.Message var errResult error - for _, msg := range safeMessages { - request, err := http.NewRequest("POST", ht.httpURL, bytes.NewBuffer(msg.Data)) + // Run the transformations + // We provide a 'pre' function to group by Dynamic headers (if enabled) - this must necessarily happen first. + // We also provide a 'post' function to create a message Body if none is provided via templater - this must happen last. + batchTransformRes := batchTransformFunc( + safeMessages, + []batchtransform.BatchTransformationFunction{ht.groupByDynamicHeaders}, // runs first + []batchtransform.BatchTransformationFunction{ht.provideRequestBody}) // runs last + + invalid = append(invalid, batchTransformRes.Invalid...) + + for _, batch := range batchTransformRes.Success { + request, err := http.NewRequest("POST", ht.httpURL, bytes.NewBuffer(batch.BatchData)) if err != nil { errResult = multierror.Append(errResult, errors.Wrap(err, "Error creating request")) - failed = append(failed, msg) + failed = append(failed, batch.OriginalMessages...) continue } - request.Header.Add("Content-Type", ht.contentType) // Add content type - addHeadersToRequest(request, ht.headers, ht.retrieveHeaders(msg)) // Add headers if there are any - if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set + request.Header.Add("Content-Type", ht.contentType) // Add content type + addHeadersToRequest(request, ht.headers, batch.HTTPHeaders) // Add headers + if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword) } requestStarted := time.Now() resp, err := ht.client.Do(request) // Make request requestFinished := time.Now() - msg.TimeRequestStarted = requestStarted - msg.TimeRequestFinished = requestFinished - if err != nil { errResult = multierror.Append(errResult, err) - failed = append(failed, msg) + // TODO: This means that errored requests won't have request times attached. + // Can iterate to add or rethink how it works. + failed = append(failed, batch.OriginalMessages...) continue } defer resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode < 300 { - sent = append(sent, msg) - if msg.AckFunc != nil { // Ack successful messages - msg.AckFunc() + for _, msg := range batch.OriginalMessages { + msg.TimeRequestStarted = requestStarted + msg.TimeRequestFinished = requestFinished + sent = append(sent, msg) + if msg.AckFunc != nil { // Ack successful messages + msg.AckFunc() + } } + } else { errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status)) - failed = append(failed, msg) + failed = append(failed, batch.OriginalMessages...) continue } } if errResult != nil { - errResult = errors.Wrap(errResult, "Error sending http requests") + errResult = errors.Wrap(errResult, "Error sending http request(s)") } ht.log.Debugf("Successfully wrote %d/%d messages", len(sent), len(messages)) @@ -291,6 +374,7 @@ func (ht *HTTPTarget) GetID() string { return ht.httpURL } +// TODO: Can prob remove func (ht *HTTPTarget) retrieveHeaders(msg *models.Message) map[string]string { if !ht.dynamicHeaders { return nil diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index c4a83b87..37fc45d7 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -12,21 +12,18 @@ package target import ( - "bytes" - "encoding/json" + "fmt" "io" "net/http" "net/http/httptest" - "reflect" "sync" "sync/atomic" "testing" - "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" - "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/testutil" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) func createTestServerWithResponseCode(results *[][]byte, waitgroup *sync.WaitGroup, responseCode int) *httptest.Server { @@ -49,6 +46,8 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se return createTestServerWithResponseCode(results, waitgroup, 200) } +/* + func TestGetHeaders(t *testing.T) { assert := assert.New(t) valid1 := `{"Max Forwards": "10", "Accept-Language": "en-US", "Accept-Datetime": "Thu, 31 May 2007 20:35:00 GMT"}` @@ -326,6 +325,8 @@ func TestNewHTTPTarget(t *testing.T) { assert.Nil(failedHTTPTarget2) } +*/ + func TestHttpWrite_Simple(t *testing.T) { testCases := []struct { Name string @@ -356,8 +357,10 @@ func TestHttpWrite_Simple(t *testing.T) { } messages := testutil.GetTestMessages(501, "Hello Server!!", ackFunc) - wg.Add(501) - writeResult, err1 := target.Write(messages) + wg.Add(1) + + batchTransformFunc := batchtransform.NewBatchTransformation() + writeResult, err1 := target.Write(messages, batchTransformFunc) wg.Wait() @@ -368,11 +371,16 @@ func TestHttpWrite_Simple(t *testing.T) { assert.Equal("Hello Server!!", string(result)) } + fmt.Println(writeResult.Invalid) + fmt.Println(results) + assert.Equal(int64(501), ackOps) }) } } +/* + func TestHttpWrite_Concurrent(t *testing.T) { assert := assert.New(t) @@ -678,3 +686,4 @@ func getNgrokAddress() string { } panic("no ngrok https endpoint found") } +*/ diff --git a/pkg/target/kafka.go b/pkg/target/kafka.go index 4e1f60ab..9a2885da 100644 --- a/pkg/target/kafka.go +++ b/pkg/target/kafka.go @@ -22,6 +22,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // KafkaConfig contains configurable options for the kafka target @@ -205,72 +206,83 @@ func AdaptKafkaTargetFunc(f func(c *KafkaConfig) (*KafkaTarget, error)) KafkaTar } // Write pushes all messages to the required target -func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { +func (kt *KafkaTarget) Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) { kt.log.Debugf("Writing %d messages to topic ...", len(messages)) + // TODO: This can just be implemented safeMessages, oversized := models.FilterOversizedMessages( messages, kt.MaximumAllowedMessageSizeBytes(), ) + var invalid []*models.Message var sent []*models.Message var failed []*models.Message var errResult error - if kt.asyncProducer != nil { - // Not adding request latency metric to async producer for now, since it would complicate the implementation, and delay our debug. - for _, msg := range safeMessages { - kt.asyncProducer.Input() <- &sarama.ProducerMessage{ - Topic: kt.topicName, - Key: sarama.StringEncoder(msg.PartitionKey), - Value: sarama.ByteEncoder(msg.Data), - Metadata: msg, - } - } + // Run the transformations + batchTransformRes := batchTransformFunc(safeMessages, nil, nil) - for i := 0; i < len(safeMessages); i++ { + invalid = append(invalid, batchTransformRes.Invalid...) - result := <-kt.asyncResults // Block until result is returned + for _, batch := range batchTransformRes.Success { - if result.Err != nil { - errResult = multierror.Append(errResult, result.Err) - originalMessage := result.Msg.Metadata.(*models.Message) - originalMessage.SetError(result.Err) - failed = append(failed, originalMessage) - } else { - originalMessage := result.Msg.Metadata.(*models.Message) - if originalMessage.AckFunc != nil { - originalMessage.AckFunc() + // TODO: This is too much nesting. Refactor for readability + if kt.asyncProducer != nil { + // Not adding request latency metric to async producer for now, since it would complicate the implementation, and delay our debug. + for _, msg := range batch.OriginalMessages { + kt.asyncProducer.Input() <- &sarama.ProducerMessage{ + Topic: kt.topicName, + Key: sarama.StringEncoder(msg.PartitionKey), + Value: sarama.ByteEncoder(msg.Data), + Metadata: msg, } - sent = append(sent, originalMessage) } - } - } else if kt.syncProducer != nil { - for _, msg := range safeMessages { - requestStarted := time.Now() - _, _, err := kt.syncProducer.SendMessage(&sarama.ProducerMessage{ - Topic: kt.topicName, - Key: sarama.StringEncoder(msg.PartitionKey), - Value: sarama.ByteEncoder(msg.Data), - }) - requestFinished := time.Now() - - msg.TimeRequestStarted = requestStarted - msg.TimeRequestFinished = requestFinished - - if err != nil { - errResult = multierror.Append(errResult, err) - msg.SetError(err) - failed = append(failed, msg) - } else { - if msg.AckFunc != nil { - msg.AckFunc() + + for i := 0; i < len(batch.OriginalMessages); i++ { + + result := <-kt.asyncResults // Block until result is returned + + if result.Err != nil { + errResult = multierror.Append(errResult, result.Err) + originalMessage := result.Msg.Metadata.(*models.Message) + originalMessage.SetError(result.Err) + failed = append(failed, originalMessage) + } else { + originalMessage := result.Msg.Metadata.(*models.Message) + if originalMessage.AckFunc != nil { + originalMessage.AckFunc() + } + sent = append(sent, originalMessage) + } + } + } else if kt.syncProducer != nil { + for _, msg := range batch.OriginalMessages { + requestStarted := time.Now() + _, _, err := kt.syncProducer.SendMessage(&sarama.ProducerMessage{ + Topic: kt.topicName, + Key: sarama.StringEncoder(msg.PartitionKey), + Value: sarama.ByteEncoder(msg.Data), + }) + requestFinished := time.Now() + + msg.TimeRequestStarted = requestStarted + msg.TimeRequestFinished = requestFinished + + if err != nil { + errResult = multierror.Append(errResult, err) + msg.SetError(err) + failed = append(failed, msg) + } else { + if msg.AckFunc != nil { + msg.AckFunc() + } + sent = append(sent, msg) } - sent = append(sent, msg) } + } else { + errResult = multierror.Append(errResult, fmt.Errorf("no producer has been configured")) } - } else { - errResult = multierror.Append(errResult, fmt.Errorf("no producer has been configured")) } if errResult != nil { @@ -282,7 +294,7 @@ func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteRes sent, failed, oversized, - nil, + invalid, ), errResult } diff --git a/pkg/target/kafka_test.go b/pkg/target/kafka_test.go index a60c089b..19e75657 100644 --- a/pkg/target/kafka_test.go +++ b/pkg/target/kafka_test.go @@ -20,6 +20,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/snowplow/snowbridge/pkg/testutil" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" "github.com/stretchr/testify/assert" ) @@ -79,7 +80,9 @@ func TestKafkaTarget_AsyncWriteFailure(t *testing.T) { messages := testutil.GetTestMessages(1, "Hello Kafka!!", nil) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.NotNil(err) if err != nil { assert.Equal("Error writing messages to Kafka topic: : 1 error occurred:\n\t* kafka: client has run out of available brokers to talk to\n\n", err.Error()) @@ -110,7 +113,9 @@ func TestKafkaTarget_AsyncWriteSuccess(t *testing.T) { messages := testutil.GetTestMessages(501, "Hello Kafka!!", ackFunc) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) @@ -134,7 +139,9 @@ func TestKafkaTarget_SyncWriteFailure(t *testing.T) { messages := testutil.GetTestMessages(1, "Hello Kafka!!", nil) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.NotNil(err) if err != nil { assert.Equal("Error writing messages to Kafka topic: : 1 error occurred:\n\t* kafka: client has run out of available brokers to talk to\n\n", err.Error()) @@ -165,7 +172,9 @@ func TestKafkaTarget_SyncWriteSuccess(t *testing.T) { messages := testutil.GetTestMessages(501, "Hello Kafka!!", ackFunc) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) @@ -197,7 +206,9 @@ func TestKafkaTarget_WriteSuccess_OversizeBatch(t *testing.T) { messages := testutil.GetTestMessages(10, "Hello Kafka!!", ackFunc) messages = append(messages, testutil.GetTestMessages(10, testutil.GenRandomString(1048576), ackFunc)...) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) @@ -229,7 +240,9 @@ func TestKafkaTarget_WriteSuccess_OversizeRecord(t *testing.T) { messages := testutil.GetTestMessages(10, "Hello Kafka!!", ackFunc) messages = append(messages, testutil.GetTestMessages(1, testutil.GenRandomString(1048577), ackFunc)...) - writeRes, err := target.Write(messages) + batchTransform := batchtransform.NewBatchTransformation(nil) + + writeRes, err := target.Write(messages, batchTransform) assert.Nil(err) assert.NotNil(writeRes) diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index cd7fdfdc..70e7aeeb 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -127,6 +127,7 @@ func AdaptKinesisTargetFunc(f func(c *KinesisTargetConfig) (*KinesisTarget, erro func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { kt.log.Debugf("Writing %d messages to stream ...", len(messages)) + // TODO: Replace with new batch transformation chunks, oversized := models.GetChunkedMessages( messages, kt.requestMaxMessages, diff --git a/pkg/target/oauth2_test.go b/pkg/target/oauth2_test.go index 21b514f5..8371a35a 100644 --- a/pkg/target/oauth2_test.go +++ b/pkg/target/oauth2_test.go @@ -116,7 +116,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input target := oauth2Target(t, server.URL, inputClientID, inputClientSecret, inputRefreshToken, tokenServer.URL) message := testutil.GetTestMessages(1, "Hello Server!!", func() {}) - return target.Write(message) + return target.Write(message, nil) } func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget { diff --git a/pkg/target/pubsub.go b/pkg/target/pubsub.go index c86e9da3..28a625cb 100644 --- a/pkg/target/pubsub.go +++ b/pkg/target/pubsub.go @@ -125,6 +125,7 @@ func (ps *PubSubTarget) Write(messages []*models.Message) (*models.TargetWriteRe var results []*pubSubPublishResult + // TODO: Refactor for Batch transformation model safeMessages, oversized := models.FilterOversizedMessages( messages, ps.MaximumAllowedMessageSizeBytes(), diff --git a/pkg/target/targetiface/target.go b/pkg/target/targetiface/target.go index 405adc32..601f148c 100644 --- a/pkg/target/targetiface/target.go +++ b/pkg/target/targetiface/target.go @@ -13,11 +13,12 @@ package targetiface import ( "github.com/snowplow/snowbridge/pkg/models" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) // Target describes the interface for how to push the data pulled from the source type Target interface { - Write(messages []*models.Message) (*models.TargetWriteResult, error) + Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) Open() Close() MaximumAllowedMessageSizeBytes() int diff --git a/pkg/transform/batch/batch_transform.go b/pkg/transform/batch/batch_transform.go new file mode 100644 index 00000000..3459050a --- /dev/null +++ b/pkg/transform/batch/batch_transform.go @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package batchtransform + +import "github.com/snowplow/snowbridge/pkg/models" + +// BatchTransformationFunction is a transformation function which operates across a batch of events +// It takes a batch as an input, and returns a successful batch and a slice of invalid messages +type BatchTransformationFunction func([]*models.MessageBatch) (success []*models.MessageBatch, invalid []*models.Message, oversized []*models.Message) + +// BatchTransformationApplyFunction combines batch into one callable function +type BatchTransformationApplyFunction func([]*models.Message, []BatchTransformationFunction, []BatchTransformationFunction) *models.BatchTransformationResult + +// BatchTransformationGenerator returns a BatchTransformationApplyFunction from a provided set of BatchTransformationFunctions +type BatchTransformationGenerator func(...BatchTransformationFunction) BatchTransformationApplyFunction + +// NewBatchTransformation constructs a function which applies all transformations to all messages, returning a TransformationResult. +func NewBatchTransformation(tranformFunctions ...BatchTransformationFunction) BatchTransformationApplyFunction { + // pre is a function to be run before the configured ones, post is to be run after. + // This is done because sometimes functions need to _always_ run first or last, depending on the specific target logic. (eg. batching by dynamic headers, if configured) + // pre and post functions are intended for use only in the implementations of targets. + return func(messages []*models.Message, pre []BatchTransformationFunction, post []BatchTransformationFunction) *models.BatchTransformationResult { + // make a batch to begin with + success := []*models.MessageBatch{{OriginalMessages: messages}} + + // Because http will require specific functions to always go first and last, we provide these here + // Compiler gets confused if we don't rename. + functionsToRun := append(pre, tranformFunctions...) + functionsToRun = append(functionsToRun, post...) + + // If no transformations, just return a result + if len(functionsToRun) == 0 { + return &models.BatchTransformationResult{Success: success} + } + + var invalid []*models.Message + var oversized []*models.Message + invalidList := make([]*models.Message, 0, len(messages)) + oversizedList := make([]*models.Message, 0, len(messages)) + // Run each transformation + for _, transformFunction := range functionsToRun { + // success is recomputed each time into a complete list of batches + success, invalid, oversized = transformFunction(success) + // Invalids are excluded each iteration so must be appended to a permanent list + invalidList = append(invalidList, invalid...) + + oversizedList = append(oversizedList, oversized...) + } + + return &models.BatchTransformationResult{Success: success, Invalid: invalidList, Oversized: oversizedList} + } +} diff --git a/pkg/transform/batch/batchtransformconfig/batch_transform_config.go b/pkg/transform/batch/batchtransformconfig/batch_transform_config.go new file mode 100644 index 00000000..26ffb875 --- /dev/null +++ b/pkg/transform/batch/batchtransformconfig/batch_transform_config.go @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package batchtransformconfig + +import ( + "fmt" + + "github.com/snowplow/snowbridge/config" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" +) + +// SupportedTransformations is a ConfigurationPair slice containing all the officially supported transformations. +var SupportedTransformations = []config.ConfigurationPair{ + // TODO: Add config implementations & put them here +} + +// GetBatchTransformations builds and returns transformationApplyFunction +// from the transformations configured. +func GetBatchTransformations(c *config.Config, supportedTransformations []config.ConfigurationPair) (batchtransform.BatchTransformationApplyFunction, error) { + funcs := make([]batchtransform.BatchTransformationFunction, 0) + + for _, transformation := range c.Data.BatchTransformations { + + useTransf := transformation.Use + decoderOpts := &config.DecoderOptions{ + Input: useTransf.Body, + } + + var component interface{} + var err error + for _, pair := range supportedTransformations { + if pair.Name == useTransf.Name { + plug := pair.Handle + component, err = c.CreateComponent(plug, decoderOpts) + if err != nil { + return nil, err + } + } + } + + f, ok := component.(batchtransform.BatchTransformationFunction) + if !ok { + return nil, fmt.Errorf("could not interpret transformation configuration for %q", useTransf.Name) + } + funcs = append(funcs, f) + } + + return batchtransform.NewBatchTransformation(funcs...), nil +} diff --git a/pkg/transform/batch/template.go b/pkg/transform/batch/template.go new file mode 100644 index 00000000..9a294eb0 --- /dev/null +++ b/pkg/transform/batch/template.go @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package batchtransform + +import ( + "bytes" + "encoding/json" + "text/template" + + "github.com/pkg/errors" + "github.com/snowplow/snowbridge/pkg/models" +) + +// TemplaterBatchTransformationFunction is a thing TODO add desc +func TemplaterBatchTransformationFunction(batches []models.MessageBatch) ([]models.MessageBatch, []*models.Message) { + + // This is just an outline implementation of a templater function, to help figure out the design of batch transforms in general + + // The templater would fit here along the following lines: + const templ = `{ + attributes: [ {{$first_1 := true}} + {{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}} + {{printf "%s" .attribute_data}}{{end}} + ], + events: [ {{$first_2 := true}} + {{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}} + {{printf "%s" .event_data}}{{end}} + ] + }` + + invalid := make([]*models.Message, 0) + safe := make([]*models.Message, 0) + + for _, b := range batches { + formatted := []map[string]json.RawMessage{} + for _, msg := range b.OriginalMessages { + // Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here) + var asMap map[string]json.RawMessage + + if err := json.Unmarshal(msg.Data, &asMap); err != nil { + msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup! + invalid = append(invalid, msg) + continue + } + + formatted = append(formatted, asMap) + } + var buf bytes.Buffer + + t := template.Must(template.New("example").Parse(templ)) + if err := t.Execute(&buf, formatted); err != nil { + for _, msg := range safe { + msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup! + invalid = append(invalid, msg) + } + return nil, invalid + } + + // Assign the templated request to the HTTPRequestBody field + b.BatchData = buf.Bytes() + + } + + return batches, invalid +} diff --git a/pkg/transform/base64Decode.go b/pkg/transform/single/base64Decode.go similarity index 100% rename from pkg/transform/base64Decode.go rename to pkg/transform/single/base64Decode.go diff --git a/pkg/transform/base64Encode.go b/pkg/transform/single/base64Encode.go similarity index 100% rename from pkg/transform/base64Encode.go rename to pkg/transform/single/base64Encode.go diff --git a/pkg/transform/base64_test.go b/pkg/transform/single/base64_test.go similarity index 100% rename from pkg/transform/base64_test.go rename to pkg/transform/single/base64_test.go diff --git a/pkg/transform/engine/engine.go b/pkg/transform/single/engine/engine.go similarity index 95% rename from pkg/transform/engine/engine.go rename to pkg/transform/single/engine/engine.go index 83b42dac..abf6cd88 100644 --- a/pkg/transform/engine/engine.go +++ b/pkg/transform/single/engine/engine.go @@ -13,7 +13,7 @@ package engine import ( jsoniter "github.com/json-iterator/go" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // make a jsoniter instance that won't escape html diff --git a/pkg/transform/engine/engine_javascript.go b/pkg/transform/single/engine/engine_javascript.go similarity index 99% rename from pkg/transform/engine/engine_javascript.go rename to pkg/transform/single/engine/engine_javascript.go index 62b94fcf..387855a3 100644 --- a/pkg/transform/engine/engine_javascript.go +++ b/pkg/transform/single/engine/engine_javascript.go @@ -24,7 +24,7 @@ import ( "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // JSEngineConfig configures the JavaScript Engine. diff --git a/pkg/transform/engine/engine_javascript_test.go b/pkg/transform/single/engine/engine_javascript_test.go similarity index 100% rename from pkg/transform/engine/engine_javascript_test.go rename to pkg/transform/single/engine/engine_javascript_test.go diff --git a/pkg/transform/engine/engine_lua.go b/pkg/transform/single/engine/engine_lua.go similarity index 99% rename from pkg/transform/engine/engine_lua.go rename to pkg/transform/single/engine/engine_lua.go index 810e3253..8bb906f3 100644 --- a/pkg/transform/engine/engine_lua.go +++ b/pkg/transform/single/engine/engine_lua.go @@ -26,7 +26,7 @@ import ( "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // LuaEngineConfig configures the Lua Engine. diff --git a/pkg/transform/engine/engine_lua_test.go b/pkg/transform/single/engine/engine_lua_test.go similarity index 99% rename from pkg/transform/engine/engine_lua_test.go rename to pkg/transform/single/engine/engine_lua_test.go index 6979c1c2..e09bbdf3 100644 --- a/pkg/transform/engine/engine_lua_test.go +++ b/pkg/transform/single/engine/engine_lua_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) type LuaTestCase struct { diff --git a/pkg/transform/engine/engine_test_variables.go b/pkg/transform/single/engine/engine_test_variables.go similarity index 100% rename from pkg/transform/engine/engine_test_variables.go rename to pkg/transform/single/engine/engine_test_variables.go diff --git a/pkg/transform/filter/snowplow_enriched_filter_atomic.go b/pkg/transform/single/filter/snowplow_enriched_filter_atomic.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_atomic.go rename to pkg/transform/single/filter/snowplow_enriched_filter_atomic.go index 769ec7e8..115041c2 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_atomic.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_atomic.go @@ -14,7 +14,7 @@ package filter import ( "github.com/pkg/errors" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_atomic_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_atomic_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_atomic_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_atomic_test.go index ca2edab6..3c801071 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_atomic_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_atomic_test.go @@ -18,7 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) func TestMakeBaseValueGetter(t *testing.T) { diff --git a/pkg/transform/filter/snowplow_enriched_filter_common.go b/pkg/transform/single/filter/snowplow_enriched_filter_common.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_common.go rename to pkg/transform/single/filter/snowplow_enriched_filter_common.go index 9f43e158..6c79df70 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_common.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_common.go @@ -21,7 +21,7 @@ import ( "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) // evaluateSpEnrichedfilter takes a regex and a slice of values, and returns whether or not a value has been matched diff --git a/pkg/transform/filter/snowplow_enriched_filter_common_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_common_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_common_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_common_test.go index 2cda07a2..27e8f322 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_common_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_common_test.go @@ -19,7 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" ) var messageGood = models.Message{ diff --git a/pkg/transform/filter/snowplow_enriched_filter_context.go b/pkg/transform/single/filter/snowplow_enriched_filter_context.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_context.go rename to pkg/transform/single/filter/snowplow_enriched_filter_context.go index 2e2c008c..7b095b56 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_context.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_context.go @@ -16,7 +16,7 @@ import ( "github.com/pkg/errors" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_context_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_context_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_context_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_context_test.go index 384cec46..5c75c673 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_context_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_context_test.go @@ -16,7 +16,7 @@ import ( "testing" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/stretchr/testify/assert" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_unstruct.go b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_unstruct.go rename to pkg/transform/single/filter/snowplow_enriched_filter_unstruct.go index 35e714bf..459cdba4 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_unstruct.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct.go @@ -18,7 +18,7 @@ import ( "github.com/pkg/errors" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/snowplow/snowplow-golang-analytics-sdk/analytics" ) diff --git a/pkg/transform/filter/snowplow_enriched_filter_unstruct_test.go b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct_test.go similarity index 98% rename from pkg/transform/filter/snowplow_enriched_filter_unstruct_test.go rename to pkg/transform/single/filter/snowplow_enriched_filter_unstruct_test.go index abec4af6..84a46746 100644 --- a/pkg/transform/filter/snowplow_enriched_filter_unstruct_test.go +++ b/pkg/transform/single/filter/snowplow_enriched_filter_unstruct_test.go @@ -17,7 +17,7 @@ import ( "testing" "github.com/snowplow/snowbridge/pkg/models" - "github.com/snowplow/snowbridge/pkg/transform" + transform "github.com/snowplow/snowbridge/pkg/transform/single" "github.com/stretchr/testify/assert" ) diff --git a/pkg/transform/setup_test.go b/pkg/transform/single/setup_test.go similarity index 100% rename from pkg/transform/setup_test.go rename to pkg/transform/single/setup_test.go diff --git a/pkg/transform/snowplow_enriched_set_pk.go b/pkg/transform/single/snowplow_enriched_set_pk.go similarity index 100% rename from pkg/transform/snowplow_enriched_set_pk.go rename to pkg/transform/single/snowplow_enriched_set_pk.go diff --git a/pkg/transform/snowplow_enriched_set_pk_test.go b/pkg/transform/single/snowplow_enriched_set_pk_test.go similarity index 100% rename from pkg/transform/snowplow_enriched_set_pk_test.go rename to pkg/transform/single/snowplow_enriched_set_pk_test.go diff --git a/pkg/transform/snowplow_enriched_to_json.go b/pkg/transform/single/snowplow_enriched_to_json.go similarity index 100% rename from pkg/transform/snowplow_enriched_to_json.go rename to pkg/transform/single/snowplow_enriched_to_json.go diff --git a/pkg/transform/snowplow_enriched_to_json_test.go b/pkg/transform/single/snowplow_enriched_to_json_test.go similarity index 100% rename from pkg/transform/snowplow_enriched_to_json_test.go rename to pkg/transform/single/snowplow_enriched_to_json_test.go diff --git a/pkg/transform/snowplow_enriched_util.go b/pkg/transform/single/snowplow_enriched_util.go similarity index 100% rename from pkg/transform/snowplow_enriched_util.go rename to pkg/transform/single/snowplow_enriched_util.go diff --git a/pkg/transform/snowplow_enriched_util_test.go b/pkg/transform/single/snowplow_enriched_util_test.go similarity index 100% rename from pkg/transform/snowplow_enriched_util_test.go rename to pkg/transform/single/snowplow_enriched_util_test.go diff --git a/pkg/transform/snowplow_gtmss_preview.go b/pkg/transform/single/snowplow_gtmss_preview.go similarity index 100% rename from pkg/transform/snowplow_gtmss_preview.go rename to pkg/transform/single/snowplow_gtmss_preview.go diff --git a/pkg/transform/snowplow_gtmss_preview_test.go b/pkg/transform/single/snowplow_gtmss_preview_test.go similarity index 100% rename from pkg/transform/snowplow_gtmss_preview_test.go rename to pkg/transform/single/snowplow_gtmss_preview_test.go diff --git a/pkg/transform/transform.go b/pkg/transform/single/transform.go similarity index 100% rename from pkg/transform/transform.go rename to pkg/transform/single/transform.go diff --git a/pkg/transform/transform_test.go b/pkg/transform/single/transform_test.go similarity index 100% rename from pkg/transform/transform_test.go rename to pkg/transform/single/transform_test.go diff --git a/pkg/transform/transform_test_variables.go b/pkg/transform/single/transform_test_variables.go similarity index 100% rename from pkg/transform/transform_test_variables.go rename to pkg/transform/single/transform_test_variables.go diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/single/transformconfig/transform_config.go similarity index 91% rename from pkg/transform/transformconfig/transform_config.go rename to pkg/transform/single/transformconfig/transform_config.go index bff5cec9..dbc54a0a 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/single/transformconfig/transform_config.go @@ -15,9 +15,9 @@ import ( "fmt" "github.com/snowplow/snowbridge/config" - "github.com/snowplow/snowbridge/pkg/transform" - "github.com/snowplow/snowbridge/pkg/transform/engine" - "github.com/snowplow/snowbridge/pkg/transform/filter" + transform "github.com/snowplow/snowbridge/pkg/transform/single" + "github.com/snowplow/snowbridge/pkg/transform/single/engine" + "github.com/snowplow/snowbridge/pkg/transform/single/filter" ) // SupportedTransformations is a ConfigurationPair slice containing all the officially supported transformations. diff --git a/pkg/transform/transformconfig/transform_config_test.go b/pkg/transform/single/transformconfig/transform_config_test.go similarity index 100% rename from pkg/transform/transformconfig/transform_config_test.go rename to pkg/transform/single/transformconfig/transform_config_test.go