Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic plugin transformation ability #56

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4708aec
Add kafka support
Apr 19, 2021
ed68e90
Add transformation support
colmsnowplow Apr 23, 2021
34bd8cf
Fix go.mod and go.sum
colmsnowplow May 5, 2021
d6692db
Refactor transformations to have each function operate on a single event
colmsnowplow May 6, 2021
7c61cac
Introduce IntermediateState to avoid parsing event multiple times
colmsnowplow May 10, 2021
6ae77ac
Switch Kafka Target to use sarama.AsyncProducer
May 10, 2021
2b70967
Add Tests for Writing to Kafka Producers
May 12, 2021
e9f1701
Update go.mod and go.sum
colmsnowplow May 12, 2021
e4ef84e
Update analytics sdk version
colmsnowplow May 12, 2021
7a2b26f
Update Message.TimeTransformed once, in TransformationFunction
colmsnowplow May 12, 2021
7461ff1
Improve and cleanup transform tests
colmsnowplow May 12, 2021
4519618
Fix broken CI/CD after pushing file early by mistake
colmsnowplow May 12, 2021
85e19ac
Update comments for clarity
colmsnowplow May 12, 2021
50d657d
Refactor and cleanup transform tests
colmsnowplow May 13, 2021
72c39e4
Dereference in TransformationApplyFunction rather than Transformation…
colmsnowplow May 13, 2021
8426666
Add inline comment for clarity
colmsnowplow May 13, 2021
d6d265c
make tidy
colmsnowplow May 13, 2021
51b14b7
Add comments to coerce indentation consistency
colmsnowplow May 13, 2021
a80c312
Add version check to github workflow
colmsnowplow May 17, 2021
eb08fd0
Update VERSION file
colmsnowplow May 17, 2021
acdab61
Pass intermediateState between transformations rather than amending m…
colmsnowplow May 19, 2021
a323427
Tidy up go.mod
colmsnowplow May 19, 2021
35c81e2
Patch Kafka target - use the provided version if supported
colmsnowplow May 21, 2021
2fed522
Pass log to sarama for improved config validation
May 21, 2021
0009211
Bump version to rc3
colmsnowplow May 24, 2021
2cfb616
Update analytics sdk to v0.1.0
colmsnowplow May 26, 2021
0f1d8fd
Always log time values in ms
colmsnowplow May 28, 2021
08d2efb
Bump version to rc4
colmsnowplow May 28, 2021
71601ab
Add generic plugin transformation ability
jbeemster Jun 4, 2021
c1f6118
Fix long.sh example to add newline
lukeindykiewicz Jun 4, 2021
1856783
Add long running example in Go by Joe
lukeindykiewicz Jun 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,21 @@ jobs:
restore-keys: |
${{ runner.os }}-go-

- name: Extract tag version from ref, and project version from file
id: version
run: |
echo ::set-output name=VERSION_FILE::"refs/tags/$(cat VERSION)"

- name: Fail if version mismatch
id: check_version
if: ${{ github.ref != steps.version.outputs.VERSION_FILE }}
run: |
echo "VERSION file ${{steps.version.outputs.VERSION_FILE}} does not match tagged version ${{ github.ref }}"
exit 1

- name: Compile
run: make all

- name: Extract tag version from ref
id: get_version
run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//}

- name: Create Release
uses: actions/create-release@v1
env:
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.3
0.3.0-rc4
29 changes: 21 additions & 8 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
package main

import (
"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
retry "github.com/snowplow-devops/go-retry"
"github.com/urfave/cli"
"os"
"os/signal"
"syscall"
"time"

"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
retry "github.com/snowplow-devops/go-retry"
"github.com/urfave/cli"

"net/http"
_ "net/http/pprof"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/observer"
"github.com/snowplow-devops/stream-replicator/pkg/source/sourceiface"
"github.com/snowplow-devops/stream-replicator/pkg/target/targetiface"
"github.com/snowplow-devops/stream-replicator/pkg/transform"
)

const (
Expand Down Expand Up @@ -75,6 +77,11 @@ func main() {
return err
}

tr, err := cfg.GetTransformations()
if err != nil {
return err
}

t, err := cfg.GetTarget()
if err != nil {
return err
Expand Down Expand Up @@ -126,7 +133,7 @@ func main() {

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, o),
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
}

// Read is a long running process and will only return when the source
Expand Down Expand Up @@ -156,10 +163,16 @@ func main() {
// 4. Observing these results
//
// All with retry logic baked in to remove any of this handling from the implementations
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, o *observer.Observer) func(messages []*models.Message) error {
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error {
return func(messages []*models.Message) error {

// Apply transformations
transformed := tr(messages)
// no error as errors should be returned in the failures array of TransformationResult

// Send message buffer
messagesToSend := messages
messagesToSend := transformed.Result

res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
res, err := t.Write(messagesToSend)

Expand Down Expand Up @@ -191,7 +204,7 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, o *observer.
}

// Send invalid message buffer
messagesToSend = resCast.Invalid
messagesToSend = append(resCast.Invalid, transformed.Invalid...)
if len(messagesToSend) > 0 {
err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error {
res, err := ft.WriteInvalid(messagesToSend)
Expand Down
135 changes: 131 additions & 4 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ package cmd

import (
"fmt"
"github.com/caarlos0/env/v6"
"github.com/pkg/errors"
"os"
"strconv"
"strings"
"time"

"github.com/caarlos0/env/v6"
"github.com/pkg/errors"

"github.com/snowplow-devops/stream-replicator/pkg/failure"
"github.com/snowplow-devops/stream-replicator/pkg/failure/failureiface"
"github.com/snowplow-devops/stream-replicator/pkg/observer"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/statsreceiver/statsreceiveriface"
"github.com/snowplow-devops/stream-replicator/pkg/target"
"github.com/snowplow-devops/stream-replicator/pkg/target/targetiface"
"github.com/snowplow-devops/stream-replicator/pkg/transform"
)

// ---------- [ TARGETS ] ----------
Expand All @@ -47,11 +50,36 @@ type SQSTargetConfig struct {
RoleARN string `env:"TARGET_SQS_ROLE_ARN"`
}

// KafkaTargetConfig configures the destination for records consumed
type KafkaTargetConfig struct {
Brokers string `env:"TARGET_KAFKA_BROKERS"` // REQUIRED
TopicName string `env:"TARGET_KAFKA_TOPIC_NAME"` // REQUIRED
TargetVersion string `env:"TARGET_KAFKA_TARGET_VERSION"` // The Kafka version we should target e.g. 2.7.0 or 0.11.0.2
MaxRetries int `env:"TARGET_KAFKA_MAX_RETRIES" envDefault:"10"` // Max retries
ByteLimit int `env:"TARGET_KAFKA_BYTE_LIMIT" envDefault:"1048576"` // Kafka Default is 1MiB
Compress bool `env:"TARGET_KAFKA_COMPRESS"` // Reduces Network usage & Increases latency by compressing data
WaitForAll bool `env:"TARGET_KAFKA_WAIT_FOR_ALL"` // Sets RequireAcks = WaitForAll which waits for min.insync.replicas to Ack
Idempotent bool `env:"TARGET_KAFKA_IDEMPOTENT"` // Exactly once writes - Also sets RequiredAcks = WaitForAll
EnableSASL bool `env:"TARGET_KAFKA_ENABLE_SASL"` // Enables SASL Support
SASLUsername string `env:"TARGET_KAFKA_SASL_USERNAME"` // SASL auth
SASLPassword string `env:"TARGET_KAFKA_SASL_PASSWORD"` // SASL auth
SASLAlgorithm string `env:"TARGET_KAFKA_SASL_ALGORITHM" envDefault:"sha512"` // sha256 or sha512
CertFile string `env:"TARGET_KAFKA_TLS_CERT_FILE"` // The optional certificate file for client authentication
KeyFile string `env:"TARGET_KAFKA_TLS_KEY_FILE"` // The optional key file for client authentication
CaFile string `env:"TARGET_KAFKA_TLS_CA_FILE"` // The optional certificate authority file for TLS client authentication
SkipVerifyTls bool `env:"TARGET_KAFKA_TLS_SKIP_VERIFY_TLS"` // Optional skip verifying ssl certificates chain
ForceSyncProducer bool `env:"TARGET_KAFKA_FORCE_SYNC_PRODUCER"` // Forces the use of the Sync Producer, emits as fast as possible, may limit performance
FlushFrequency int `env:"TARGET_KAFKA_FLUSH_FREQUENCY" envDefault:"0"` // Milliseconds between flushes of events - 0 = as fast as possible
FlushMessages int `env:"TARGET_KAFKA_FLUSH_MESSAGES" envDefault:"0"` // Best effort for how many messages are sent in each batch - 0 = as fast as possible
FlushBytes int `env:"TARGET_KAFKA_FLUSH_BYTES" envDefault:"0"` // Best effort for how many bytes will trigger a flush - 0 = as fast as possible
}

// TargetsConfig holds configuration for the available targets
type TargetsConfig struct {
Kinesis KinesisTargetConfig
PubSub PubSubTargetConfig
SQS SQSTargetConfig
Kafka KafkaTargetConfig
}

// ---------- [ FAILURE MESSAGE TARGETS ] ----------
Expand All @@ -76,11 +104,36 @@ type FailureSQSTargetConfig struct {
RoleARN string `env:"FAILURE_TARGET_SQS_ROLE_ARN"`
}

// KafkaTargetConfig configures the destination for records consumed
type FailureKafkaTargetConfig struct {
Brokers string `env:"FAILURE_TARGET_KAFKA_BROKERS"` // REQUIRED
TopicName string `env:"FAILURE_TARGET_KAFKA_TOPIC_NAME"` // REQUIRED
TargetVersion string `env:"FAILURE_TARGET_KAFKA_TARGET_VERSION"` // The Kafka version we should target e.g. 2.7.0 or 0.11.0.2
MaxRetries int `env:"FAILURE_TARGET_KAFKA_MAX_RETRIES" envDefault:"10"` // Max retries
ByteLimit int `env:"FAILURE_TARGET_KAFKA_BYTE_LIMIT" envDefault:"1048576"` // Kafka Default is 1MiB
Compress bool `env:"FAILURE_TARGET_KAFKA_COMPRESS"` // Reduces Network usage & Increases latency by compressing data
WaitForAll bool `env:"FAILURE_TARGET_KAFKA_WAIT_FOR_ALL"` // Sets RequireAcks = WaitForAll which waits for min.insync.replicas to Ack
Idempotent bool `env:"FAILURE_TARGET_KAFKA_IDEMPOTENT"` // Exactly once writes
EnableSASL bool `env:"FAILURE_TARGET_KAFKA_ENABLE_SASL"` // Enables SASL Support
SASLUsername string `env:"FAILURE_TARGET_KAFKA_SASL_USERNAME"` // SASL auth
SASLPassword string `env:"FAILURE_TARGET_KAFKA_SASL_PASSWORD"` // SASL auth
SASLAlgorithm string `env:"FAILURE_TARGET_KAFKA_SASL_ALGORITHM" envDefault:"sha512"` // sha256 or sha512
CertFile string `env:"FAILURE_TARGET_KAFKA_TLS_CERT_FILE"` // The optional certificate file for client authentication
KeyFile string `env:"FAILURE_TARGET_KAFKA_TLS_KEY_FILE"` // The optional key file for client authentication
CaFile string `env:"FAILURE_TARGET_KAFKA_TLS_CA_FILE"` // The optional certificate authority file for TLS client authentication
SkipVerifyTls bool `env:"FAILURE_TARGET_KAFKA_TLS_SKIP_VERIFY_TLS"` // Optional skip verifying ssl certificates chain
ForceSyncProducer bool `env:"FAILURE_TARGET_KAFKA_FORCE_SYNC_PRODUCER"` // Forces the use of the Sync Producer, emits as fast as possible, may limit performance
FlushFrequency int `env:"FAILURE_TARGET_KAFKA_FLUSH_FREQUENCY" envDefault:"0"` // Milliseconds between flushes of events - 0 = as fast as possible
FlushMessages int `env:"FAILURE_TARGET_KAFKA_FLUSH_MESSAGES" envDefault:"0"` // Best effort for how many messages are sent in each batch - 0 = as fast as possible
FlushBytes int `env:"FAILURE_TARGET_KAFKA_FLUSH_BYTES" envDefault:"0"` // Best effort for how many bytes will trigger a flush - 0 = as fast as possible
}

// FailureTargetsConfig holds configuration for the available targets
type FailureTargetsConfig struct {
Kinesis FailureKinesisTargetConfig
PubSub FailurePubSubTargetConfig
SQS FailureSQSTargetConfig
Kafka FailureKafkaTargetConfig

// Format defines how the message will be transformed before
// being sent to the target
Expand Down Expand Up @@ -155,6 +208,7 @@ type Config struct {
Targets TargetsConfig
FailureTarget string `env:"FAILURE_TARGET" envDefault:"stdout"`
FailureTargets FailureTargetsConfig
Transformation string `env:"MESSAGE_TRANSFORMATION" envDefault:"none"`
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
Sentry SentryConfig
StatsReceiver string `env:"STATS_RECEIVER"`
Expand Down Expand Up @@ -229,8 +283,31 @@ func (c *Config) GetTarget() (targetiface.Target, error) {
c.Targets.SQS.QueueName,
c.Targets.SQS.RoleARN,
)
case "kafka":
return target.NewKafkaTarget(&target.KafkaConfig{
Brokers: c.Targets.Kafka.Brokers,
TopicName: c.Targets.Kafka.TopicName,
TargetVersion: c.Targets.Kafka.TargetVersion,
MaxRetries: c.Targets.Kafka.MaxRetries,
ByteLimit: c.Targets.Kafka.ByteLimit,
Compress: c.Targets.Kafka.Compress,
WaitForAll: c.Targets.Kafka.WaitForAll,
Idempotent: c.Targets.Kafka.Idempotent,
EnableSASL: c.Targets.Kafka.EnableSASL,
SASLUsername: c.Targets.Kafka.SASLUsername,
SASLPassword: c.Targets.Kafka.SASLPassword,
SASLAlgorithm: c.Targets.Kafka.SASLAlgorithm,
CertFile: c.Targets.Kafka.CertFile,
KeyFile: c.Targets.Kafka.KeyFile,
CaFile: c.Targets.Kafka.CaFile,
SkipVerifyTls: c.Targets.Kafka.SkipVerifyTls,
ForceSync: c.Targets.Kafka.ForceSyncProducer,
FlushFrequency: c.Targets.Kafka.FlushFrequency,
FlushMessages: c.Targets.Kafka.FlushMessages,
FlushBytes: c.Targets.Kafka.FlushBytes,
})
default:
return nil, errors.New(fmt.Sprintf("Invalid target found; expected one of 'stdout, kinesis, pubsub, sqs' and got '%s'", c.Target))
return nil, errors.New(fmt.Sprintf("Invalid target found; expected one of 'stdout, kinesis, pubsub, sqs, kafka' and got '%s'", c.Target))
}
}

Expand Down Expand Up @@ -259,8 +336,31 @@ func (c *Config) GetFailureTarget() (failureiface.Failure, error) {
c.FailureTargets.SQS.QueueName,
c.FailureTargets.SQS.RoleARN,
)
case "kafka":
t, err = target.NewKafkaTarget(&target.KafkaConfig{
Brokers: c.FailureTargets.Kafka.Brokers,
TopicName: c.FailureTargets.Kafka.TopicName,
TargetVersion: c.FailureTargets.Kafka.TargetVersion,
MaxRetries: c.FailureTargets.Kafka.MaxRetries,
ByteLimit: c.FailureTargets.Kafka.ByteLimit,
Compress: c.FailureTargets.Kafka.Compress,
WaitForAll: c.FailureTargets.Kafka.WaitForAll,
Idempotent: c.FailureTargets.Kafka.Idempotent,
EnableSASL: c.FailureTargets.Kafka.EnableSASL,
SASLUsername: c.FailureTargets.Kafka.SASLUsername,
SASLPassword: c.FailureTargets.Kafka.SASLPassword,
SASLAlgorithm: c.FailureTargets.Kafka.SASLAlgorithm,
CertFile: c.FailureTargets.Kafka.CertFile,
KeyFile: c.FailureTargets.Kafka.KeyFile,
CaFile: c.FailureTargets.Kafka.CaFile,
SkipVerifyTls: c.FailureTargets.Kafka.SkipVerifyTls,
ForceSync: c.FailureTargets.Kafka.ForceSyncProducer,
FlushFrequency: c.FailureTargets.Kafka.FlushFrequency,
FlushMessages: c.FailureTargets.Kafka.FlushMessages,
FlushBytes: c.FailureTargets.Kafka.FlushBytes,
})
default:
err = errors.New(fmt.Sprintf("Invalid failure target found; expected one of 'stdout, kinesis, pubsub, sqs' and got '%s'", c.FailureTarget))
err = errors.New(fmt.Sprintf("Invalid failure target found; expected one of 'stdout, kinesis, pubsub, sqs, kafka' and got '%s'", c.FailureTarget))
}
if err != nil {
return nil, err
Expand All @@ -274,6 +374,33 @@ func (c *Config) GetFailureTarget() (failureiface.Failure, error) {
}
}

func (c *Config) GetTransformations() (transform.TransformationApplyFunction, error) {
funcs := make([]transform.TransformationFunction, 0, 0)

// Parse list of transformations
transformations := strings.Split(c.Transformation, ",")

for _, transformation := range transformations {
// Parse function name-option sets
funcOpts := strings.Split(transformation, ":")

switch funcOpts[0] {
case "spEnrichedToJson":
funcs = append(funcs, transform.SpEnrichedToJson)
case "spEnrichedSetPk":
funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(funcOpts[1]))
case "plugin":
funcs = append(funcs, transform.NewPluginFunction(funcOpts[1]))
case "pluginLong":
funcs = append(funcs, transform.NewPluginLongFunction(funcOpts[1]))
case "none":
default:
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', 'plugin:{path}', 'pluginLong:{path}' and got '%s'", c.Transformation))
}
}
return transform.NewTransformation(funcs...), nil
}

// GetTags returns a list of tags to use in identifying this instance of stream-replicator
func (c *Config) GetTags(sourceID string, targetID string, failureTargetID string) (map[string]string, error) {
hostname, err := os.Hostname()
Expand Down
Loading