diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index bcad034a..f7248ee0 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -93,7 +93,7 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation return err } - tr, err := transformconfig.GetTransformations(cfg, supportedTransformations) + tr, err := transformconfig.GetTransformationsRefactored(cfg, supportedTransformations) if err != nil { return err } @@ -159,9 +159,47 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation } }() + // The channel will need a buffer limit. This should probably be a configuration, and we should think about what to set it as & what effect this may have on latency + batchingChannel := make(chan *models.Message, 10000) + + // Spawn a function to consume the channel, via a goroutine: + go func() { + // This interval would be configurable + ticker := time.NewTicker(1 * time.Second) + currentBatch := []*models.Message{} + for { + select { + case msg := <-batchingChannel: + // Append message to current batch + currentBatch = append(currentBatch, msg) + // Logic can go here to immediately send when the batch is of a certain size of data, with some extra effort + // For now let's mimic that by using number of events + // (We could have both be configurable!) + if len(currentBatch) >= 100 { + // Process the batch + go batchTransformAndWriteData([]*models.TargetBatch{{ + OriginalMessages: currentBatch, + }}, t, ft, o) + // Clear the placeholder for the batch. + currentBatch = []*models.Message{} + // Ofc tests should be written to ensure threadsafety here. + // I don't believe we can reach a point where the next loop executes before this is finished, however - since both happen in this same goroutine + } + case <-ticker.C: + // Every tick, process a batch. + // If we like, we could get custom and restart tickers when the other case gets executed. + go batchTransformAndWriteData([]*models.TargetBatch{{ + OriginalMessages: currentBatch, + }}, t, ft, o) + currentBatch = []*models.Message{} + + } + } + }() + // Callback functions for the source to leverage when writing data sf := sourceiface.SourceFunctions{ - WriteToTarget: sourceWriteFunc(t, ft, tr, o), + WriteToTarget: sourceReadAndTransformFunc(tr, ft, o, batchingChannel), } // Read is a long running process and will only return when the source @@ -184,6 +222,150 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation } +func batchTransformAndWriteData(targetBatches []*models.TargetBatch, t targetiface.Target, ft failureiface.Failure, o *observer.Observer) error { + messageBatches := BatchTransformationFunction(targetBatches) + + // While we're refactoring, retry may be best suited to living in the write function too. + res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) { + res, err := t.Write(messageBatches) + + o.TargetWrite(res) + // messagesToSend = res.Failed + // ^^ This bit needs to be looked at + return res, err + }) + if err != nil { + return err + } + resCast := res.(*models.TargetWriteResult) + + // Send oversized message buffer + messagesToSend := resCast.Oversized + if len(messagesToSend) > 0 { + err2 := retry.Exponential(5, time.Second, "failureTarget.WriteOversized", func() error { + res, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend) + if err != nil { + return err + } + if len(res.Oversized) != 0 || len(res.Invalid) != 0 { + log.Fatal("Oversized message transformation resulted in new oversized / invalid messages") + } + + o.TargetWriteOversized(res) + messagesToSend = res.Failed + return err + }) + if err2 != nil { + return err2 + } + } + + // Send invalid message buffer + messagesToSend = resCast.Invalid + if len(messagesToSend) > 0 { + err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error { + res, err := ft.WriteInvalid(messagesToSend) + if err != nil { + return err + } + if len(res.Oversized) != 0 || len(res.Invalid) != 0 { + log.Fatal("Invalid message transformation resulted in new invalid / oversized messages") + } + + o.TargetWriteInvalid(res) + messagesToSend = res.Failed + return err + }) + if err3 != nil { + return err3 + } + } + + return nil +} + +// This would replace the sourceWrite function, and the batch transformation and target would read from the supplied channel +func sourceReadAndTransformFunc(tr transform.TransformationApplyFunctionRefactored, ft failureiface.Failure, o *observer.Observer, c chan *models.Message) func(messages []*models.Message) error { + return func(messages []*models.Message) error { + + // Successful transforms are immediately fed to the channel, we no longer process them in this function. + // Same with acking filtered messages. We can do that immediately. + // For now, we can continue to deal with failures here - but in a real implementation perhaps those should be handled in a similar flow. + transformResult := tr(messages, c) + + // observer stuff can still go here + filterRes := models.NewFilterResult(transformResult.Filtered) + o.Filtered(filterRes) + + // Deal with transformed invalids - + // TODO: This pattern should probably change in a full refactor, and perhaps use a separate channel too. :thinking_face: + // It def should, then we can have all sources of invalids pop their data into a channel (as we encounter them), and have one thing read from it and deal with it. + messagesToSend := transformResult.Invalid + if len(messagesToSend) > 0 { + err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error { + res, err := ft.WriteInvalid(messagesToSend) + if err != nil { + return err + } + if len(res.Oversized) != 0 || len(res.Invalid) != 0 { + log.Fatal("Invalid message transformation resulted in new invalid / oversized messages") + } + + o.TargetWriteInvalid(res) + messagesToSend = res.Failed + return err + }) + if err3 != nil { + return err3 + } + } + + return nil + } +} + +// BatchTransformationFunction would live elsewhere and be composable like the other transoformation functions +func BatchTransformationFunction(batch []*models.TargetBatch) []*models.TargetBatch { + + // imaine this is composable like transformaion functions, and does something :D + + // The templater would fit here along the following lines: + const templ = `{ + attributes: [ {{$first_1 := true}} + {{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}} + {{printf "%s" .attribute_data}}{{end}} + ], + events: [ {{$first_2 := true}} + {{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}} + {{printf "%s" .event_data}}{{end}} + ] + }` + + for _, b := range batch { + formatted := []map[string]json.RawMessage{} + for _, msg := range b.OriginalMessages { + // Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here) + var asMap map[string]json.RawMessage + + if err := json.Unmarshal(msg.Data, &asMap); err != nil { + panic(err) + } + + formatted = append(formatted, asMap) + } + var buf bytes.Buffer + + t := template.Must(template.New("example").Parse(templ)) + t.Execute(&buf, formatted) + + // Assign the templated request to the HTTPRequestBody field + b.HTTPRequestBody = buf.Bytes() + + } + + return batch +} + // sourceWriteFunc builds the function which wraps the different objects together to handle: // // 1. Sending messages to the target @@ -217,46 +399,6 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform OriginalMessages: messagesToSend, HTTPRequestBody: nil}} - BatchTransformationFunction := func(batch []*models.TargetBatch) []*models.TargetBatch { - - // imaine this is composable like transformaion functions, and does something :D - - // The templater would fit here along the following lines: - const templ = `{ - attributes: [ {{$first_1 := true}} - {{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}} - {{printf "%s" .attribute_data}}{{end}} - ], - events: [ {{$first_2 := true}} - {{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}} - {{printf "%s" .event_data}}{{end}} - ] - }` - - for _, b := range batch { - formatted := []map[string]json.RawMessage{} - for _, msg := range b.OriginalMessages { - // Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here) - var asMap map[string]json.RawMessage - - if err := json.Unmarshal(msg.Data, &asMap); err != nil { - panic(err) - } - - formatted = append(formatted, asMap) - } - var buf bytes.Buffer - - t := template.Must(template.New("example").Parse(templ)) - t.Execute(&buf, formatted) - - // Assign the templated request to the HTTPRequestBody field - b.HTTPRequestBody = buf.Bytes() - - } - - return batch - } messageBatches = BatchTransformationFunction(messageBatches) res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) { diff --git a/pkg/transform/transform.go b/pkg/transform/transform.go index fbb43dff..ee37b627 100644 --- a/pkg/transform/transform.go +++ b/pkg/transform/transform.go @@ -23,6 +23,9 @@ type TransformationFunction func(*models.Message, interface{}) (*models.Message, // TransformationApplyFunction dereferences messages before running transformations, and returns a TransformationResult type TransformationApplyFunction func([]*models.Message) *models.TransformationResult +// TransformationApplyFunctionRefactored dereferences messages before running transformations, and returns a TransformationResult +type TransformationApplyFunctionRefactored func([]*models.Message, chan *models.Message) *models.TransformationResult + // TransformationGenerator returns a TransformationApplyFunction from a provided set of TransformationFunctions type TransformationGenerator func(...TransformationFunction) TransformationApplyFunction @@ -66,3 +69,55 @@ func NewTransformation(tranformFunctions ...TransformationFunction) Transformati return models.NewTransformationResult(successList, filteredList, failureList) } } + +// NewTransformationRefactored constructs a function which applies all transformations to all messages, returning a TransformationResult. +func NewTransformationRefactored(tranformFunctions ...TransformationFunction) TransformationApplyFunctionRefactored { + return func(messages []*models.Message, c chan *models.Message) *models.TransformationResult { + successList := make([]*models.Message, 0, len(messages)) + filteredList := make([]*models.Message, 0, len(messages)) + failureList := make([]*models.Message, 0, len(messages)) + // If no transformations, just return the result rather than shuffling data between slices + + // if len(tranformFunctions) == 0 { + // return models.NewTransformationResult(messages, filteredList, failureList) + // } + // ^^ Maybe this should get removed reagardless + + for _, message := range messages { + msg := *message // dereference to avoid amending input + // ^^ That shouldn't be necessary + success := &msg // success must be both input and output to a TransformationFunction, so we make this pointer. + var failure *models.Message + var filtered *models.Message + var intermediate interface{} + for _, transformFunction := range tranformFunctions { + // Overwrite the input for each iteration in sequence of transformations, + // since the desired result is a single transformed message with a nil failure, or a nil message with a single failure + success, filtered, failure, intermediate = transformFunction(success, intermediate) + if failure != nil || filtered != nil { + break + } + } + if success != nil { + success.TimeTransformed = time.Now().UTC() + // Pass successes to the channel as we get them + c <- success + successList = append(successList, success) + } + // We don't append TimeTransformed in the failure or filtered cases, as it is less useful, and likely to skew metrics + if filtered != nil { + // Ack straight away - there's an argument this is what it should be regardless. + if filtered.AckFunc != nil { + filtered.AckFunc() + } + filteredList = append(filteredList, filtered) + } + if failure != nil { + // Probably we should pass to a chaneel to deal with failures too. + failureList = append(failureList, failure) + } + } + // Assuming we move handling of failures into here too, this now only serves to + return models.NewTransformationResult(successList, filteredList, failureList) + } +} diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/transformconfig/transform_config.go index bff5cec9..1b1fba9a 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/transformconfig/transform_config.go @@ -67,3 +67,37 @@ func GetTransformations(c *config.Config, supportedTransformations []config.Conf return transform.NewTransformation(funcs...), nil } + +// GetTransformationsRefactored builds and returns transformationApplyFunction +// from the transformations configured. +func GetTransformationsRefactored(c *config.Config, supportedTransformations []config.ConfigurationPair) (transform.TransformationApplyFunctionRefactored, error) { + funcs := make([]transform.TransformationFunction, 0) + + for _, transformation := range c.Data.Transformations { + + useTransf := transformation.Use + decoderOpts := &config.DecoderOptions{ + Input: useTransf.Body, + } + + var component interface{} + var err error + for _, pair := range supportedTransformations { + if pair.Name == useTransf.Name { + plug := pair.Handle + component, err = c.CreateComponent(plug, decoderOpts) + if err != nil { + return nil, err + } + } + } + + f, ok := component.(transform.TransformationFunction) + if !ok { + return nil, fmt.Errorf("could not interpret transformation configuration for %q", useTransf.Name) + } + funcs = append(funcs, f) + } + + return transform.NewTransformationRefactored(funcs...), nil +}