Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross event transformation #331

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/aws/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub"
sqssource "github.com/snowplow/snowbridge/pkg/source/sqs"
stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin"
"github.com/snowplow/snowbridge/pkg/transform/transformconfig"
"github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig"
"github.com/snowplow/snowbridge/pkg/transform/single/transformconfig"
)

func main() {
// Make a slice of SourceConfigPairs supported for this build
sourceConfigPairs := []config.ConfigurationPair{stdinsource.ConfigPair, sqssource.ConfigPair,
pubsubsource.ConfigPair, kafkasource.ConfigPair, kinesissource.ConfigPair}

cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations)
cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations)
}
29 changes: 23 additions & 6 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"github.com/snowplow/snowbridge/pkg/source/sourceiface"
"github.com/snowplow/snowbridge/pkg/target/targetiface"
"github.com/snowplow/snowbridge/pkg/telemetry"
"github.com/snowplow/snowbridge/pkg/transform"
"github.com/snowplow/snowbridge/pkg/transform/transformconfig"
batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch"
"github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig"
transform "github.com/snowplow/snowbridge/pkg/transform/single"
"github.com/snowplow/snowbridge/pkg/transform/single/transformconfig"
)

const (
Expand All @@ -47,7 +49,11 @@ const (
)

// RunCli runs the app
func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) {
func RunCli(
supportedSources []config.ConfigurationPair,
supportedTransformations []config.ConfigurationPair,
supportedBatchTransformations []config.ConfigurationPair,
) {
cfg, sentryEnabled, err := cmd.Init()
if err != nil {
exitWithError(err, sentryEnabled)
Expand Down Expand Up @@ -95,6 +101,11 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
return err
}

btr, err := batchtransformconfig.GetBatchTransformations(cfg, batchtransformconfig.SupportedTransformations)
if err != nil {
return err
}

t, err := cfg.GetTarget()
if err != nil {
return err
Expand Down Expand Up @@ -158,7 +169,7 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
WriteToTarget: sourceWriteFunc(t, ft, tr, btr, o),
}

// Read is a long running process and will only return when the source
Expand Down Expand Up @@ -189,7 +200,13 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
// 4. Observing these results
//
// All with retry logic baked in to remove any of this handling from the implementations
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error {
func sourceWriteFunc(
t targetiface.Target,
ft failureiface.Failure,
tr transform.TransformationApplyFunction,
btr batchtransform.BatchTransformationApplyFunction,
o *observer.Observer,
) func(messages []*models.Message) error {
return func(messages []*models.Message) error {

// Apply transformations
Expand All @@ -211,7 +228,7 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
messagesToSend := transformed.Result

res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
res, err := t.Write(messagesToSend)
res, err := t.Write(messagesToSend, btr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder whether there is an alternative so that the target does not need to know about batch transformations. Are there other reasons for this besides the chunking and possible group-by?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with a design that segmented them, but it left things very messy because the target needs to be aware of the transformation in order to decide how to send the data.

Similarly, the dynamic headers feature leaves us with a challenge here. Necessarily it must group data by headers, before a request template is created.

From a configuration perspective, if this logic is upstream of the target, it seems very easy to break the target by configuring a separate feature.

I don't know if it's the best design, but it's what I came up with as an attempt to reconcile this with the concept of solving for batch transformations more generically.


o.TargetWrite(res)
messagesToSend = res.Failed
Expand Down
5 changes: 3 additions & 2 deletions cmd/main/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
pubsubsource "github.com/snowplow/snowbridge/pkg/source/pubsub"
sqssource "github.com/snowplow/snowbridge/pkg/source/sqs"
stdinsource "github.com/snowplow/snowbridge/pkg/source/stdin"
"github.com/snowplow/snowbridge/pkg/transform/transformconfig"
"github.com/snowplow/snowbridge/pkg/transform/batch/batchtransformconfig"
"github.com/snowplow/snowbridge/pkg/transform/single/transformconfig"
)

func main() {
Expand All @@ -28,5 +29,5 @@ func main() {
kafkasource.ConfigPair, pubsubsource.ConfigPair,
}

cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations)
cli.RunCli(sourceConfigPairs, transformconfig.SupportedTransformations, batchtransformconfig.SupportedTransformations)
}
21 changes: 11 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ type Config struct {

// configurationData for holding all configuration options
type configurationData struct {
Source *component `hcl:"source,block" envPrefix:"SOURCE_"`
Target *component `hcl:"target,block" envPrefix:"TARGET_"`
FailureTarget *failureConfig `hcl:"failure_target,block"`
Sentry *sentryConfig `hcl:"sentry,block"`
StatsReceiver *statsConfig `hcl:"stats_receiver,block"`
Transformations []*component `hcl:"transform,block"`
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"`
License *licenseConfig `hcl:"license,block"`
Source *component `hcl:"source,block" envPrefix:"SOURCE_"`
Target *component `hcl:"target,block" envPrefix:"TARGET_"`
FailureTarget *failureConfig `hcl:"failure_target,block"`
Sentry *sentryConfig `hcl:"sentry,block"`
StatsReceiver *statsConfig `hcl:"stats_receiver,block"`
Transformations []*component `hcl:"transform,block"`
BatchTransformations []*component `hcl:"batch_transform,block"`
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
DisableTelemetry bool `hcl:"disable_telemetry,optional" env:"DISABLE_TELEMETRY"`
License *licenseConfig `hcl:"license,block"`
}

// component is a type to abstract over configuration blocks.
Expand Down
8 changes: 4 additions & 4 deletions docs/configuration_transformations_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/hashicorp/hcl/v2/gohcl"
"github.com/snowplow/snowbridge/assets"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/transform"
"github.com/snowplow/snowbridge/pkg/transform/engine"
"github.com/snowplow/snowbridge/pkg/transform/filter"
"github.com/snowplow/snowbridge/pkg/transform/transformconfig"
transform "github.com/snowplow/snowbridge/pkg/transform/single"
"github.com/snowplow/snowbridge/pkg/transform/single/engine"
"github.com/snowplow/snowbridge/pkg/transform/single/filter"
"github.com/snowplow/snowbridge/pkg/transform/single/transformconfig"
"github.com/stretchr/testify/assert"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/failure/snowplow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (d *SnowplowFailure) WriteInvalid(invalid []*models.Message) (*models.Targe
transformed = append(transformed, tMsg)
}

return d.target.Write(transformed)
return d.target.Write(transformed, nil)
}

// WriteOversized will handle the conversion of oversized messages into failure
Expand Down Expand Up @@ -114,7 +114,7 @@ func (d *SnowplowFailure) WriteOversized(maximumAllowedSizeBytes int, oversized
transformed = append(transformed, tMsg)
}

return d.target.Write(transformed)
return d.target.Write(transformed, nil)
}

// Open manages opening the underlying target
Expand Down
3 changes: 2 additions & 1 deletion pkg/failure/snowplow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/testutil"
batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch"
)

// --- Test FailureTarget
Expand All @@ -27,7 +28,7 @@ type TestFailureTarget struct {
onWrite func(messages []*models.Message) (*models.TargetWriteResult, error)
}

func (t *TestFailureTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) {
func (t *TestFailureTarget) Write(messages []*models.Message, btf batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) {
return t.onWrite(messages)
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/models/batch_transformation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package models

import "time"

// MessageBatch houses batches of messages, for batch transformations to operate across
type MessageBatch struct {
OriginalMessages []*Message // Most targets will use the data from here, but where we have a http templating transformation, we would use this to ack batches of messages
jbeemster marked this conversation as resolved.
Show resolved Hide resolved
BatchData []byte // Where we template http requests, we use this to define the body of the request
HTTPHeaders map[string]string // For dynamic headers feature
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a huge fan of having a target specific implementation mixed in with a generic model here. Is there another way to carry this data through without it being specifically mapped to http?

Would something like Parameters map[string]interface{} where in http you can then access a HTTPHeaders key within the Parameters work to abstract this away?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love it either. This actually comes from having implemented the dynamic header transformation - which has the same problem for the message model.

I considered that way of doing things but it's a trade-off - the downside of that approach is that you have a more obscure API and the target's logic depends on that specific key, but the api defines it as being anything.

I experimented with other things we could do but didn't find an elegant solution (yet), and it didn't feel like it serves the project well to labour on it for too long.

Right now we only have one thing that needs to do this, so my thinking was that this will do for the moment but when we need to design for further similar things we should revisit the api design.

I'm not massively opposed to doing what you suggest either - I just haven't given up on finding something better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point! As long as the seed of "maybe we should change this" is planted I am fine with it staying where it is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not even that much of a maybe to be honest! Just needs to percolate a bit. Perhaps we will even see the answer when the rest of this refactor falls into place.

TimeRequestStarted time.Time
TimeRequestFinished time.Time
}

// BatchTransformationResult houses the result of a batch transformation
type BatchTransformationResult struct {
Success []*MessageBatch
Invalid []*Message
Oversized []*Message
}
47 changes: 47 additions & 0 deletions pkg/target/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package target

import (
"github.com/snowplow/snowbridge/pkg/models"
batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch"
)

// chunkBatcherWithConfig returns a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model.
// It is done this way in order to pass GetChunkedMessages its config within the confines of the BatchTransfomration design
func chunkBatcherWithConfig(chunkSize int, maxMessageByteSize int, maxChunkByteSize int) batchtransform.BatchTransformationFunction {

// chunkBatcher is a batch transformation which incorporates models.GetChunkedMessages() into the batch transformation model,
// preserving the original logic and ownership of the function.
chunkBatcher := func(batchesIn []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) {
oversizedOut := make([]*models.Message, 0)
chunkedBatches := make([]*models.MessageBatch, 0)

for _, batch := range batchesIn {
chunks, oversized := models.GetChunkedMessages(batch.OriginalMessages, chunkSize, maxMessageByteSize, maxChunkByteSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetChunkedMessages now feels more like it belongs to target interface. Do you think this (it actually becoming a target method) could be of help to move batch transformation logic upstream of target.Write?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so - the problem is that where we have templating, this logic must occur before the templater.


oversizedOut = append(oversizedOut, oversized...)

for _, chunk := range chunks {
asBatch := &models.MessageBatch{
OriginalMessages: chunk,
}

chunkedBatches = append(chunkedBatches, asBatch)
}

}
return chunkedBatches, nil, oversizedOut
}

return chunkBatcher
}
4 changes: 3 additions & 1 deletion pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/snowplow/snowbridge/pkg/models"
batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch"
)

// EventHubConfig holds a config object for Azure EventHub
Expand Down Expand Up @@ -147,9 +148,10 @@ func AdaptEventHubTargetFunc(f func(c *EventHubConfig) (*EventHubTarget, error))
}
}

func (eht *EventHubTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) {
func (eht *EventHubTarget) Write(messages []*models.Message, batchTransformFunc batchtransform.BatchTransformationApplyFunction) (*models.TargetWriteResult, error) {
eht.log.Debugf("Writing %d messages to stream ...", len(messages))

// TODO: Replace this with the new Chunker Batch Transformation - should be a post function.
chunks, oversized := models.GetChunkedMessages(
messages,
eht.chunkMessageLimit, // Max Chunk size (number of messages)
Expand Down
Loading
Loading