Skip to content

Commit

Permalink
Add an illustrative sketch of solving the batching model
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed May 28, 2024
1 parent fa18eb8 commit 46437bf
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 42 deletions.
226 changes: 184 additions & 42 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
return err
}

tr, err := transformconfig.GetTransformations(cfg, supportedTransformations)
tr, err := transformconfig.GetTransformationsRefactored(cfg, supportedTransformations)
if err != nil {
return err
}
Expand Down Expand Up @@ -159,9 +159,47 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
}
}()

// The channel will need a buffer limit. This should probably be a configuration, and we should think about what to set it as & what effect this may have on latency
batchingChannel := make(chan *models.Message, 10000)

// Spawn a function to consume the channel, via a goroutine:
go func() {
// This interval would be configurable
ticker := time.NewTicker(1 * time.Second)
currentBatch := []*models.Message{}
for {
select {
case msg := <-batchingChannel:
// Append message to current batch
currentBatch = append(currentBatch, msg)
// Logic can go here to immediately send when the batch is of a certain size of data, with some extra effort
// For now let's mimic that by using number of events
// (We could have both be configurable!)
if len(currentBatch) >= 100 {
// Process the batch
go batchTransformAndWriteData([]*models.TargetBatch{{
OriginalMessages: currentBatch,
}}, t, ft, o)
// Clear the placeholder for the batch.
currentBatch = []*models.Message{}
// Ofc tests should be written to ensure threadsafety here.
// I don't believe we can reach a point where the next loop executes before this is finished, however - since both happen in this same goroutine
}
case <-ticker.C:
// Every tick, process a batch.
// If we like, we could get custom and restart tickers when the other case gets executed.
go batchTransformAndWriteData([]*models.TargetBatch{{
OriginalMessages: currentBatch,
}}, t, ft, o)
currentBatch = []*models.Message{}

}
}
}()

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
WriteToTarget: sourceReadAndTransformFunc(tr, ft, o, batchingChannel),
}

// Read is a long running process and will only return when the source
Expand All @@ -184,6 +222,150 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation

}

func batchTransformAndWriteData(targetBatches []*models.TargetBatch, t targetiface.Target, ft failureiface.Failure, o *observer.Observer) error {
messageBatches := BatchTransformationFunction(targetBatches)

// While we're refactoring, retry may be best suited to living in the write function too.
res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
res, err := t.Write(messageBatches)

o.TargetWrite(res)
// messagesToSend = res.Failed
// ^^ This bit needs to be looked at
return res, err
})
if err != nil {
return err
}
resCast := res.(*models.TargetWriteResult)

// Send oversized message buffer
messagesToSend := resCast.Oversized
if len(messagesToSend) > 0 {
err2 := retry.Exponential(5, time.Second, "failureTarget.WriteOversized", func() error {
res, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
log.Fatal("Oversized message transformation resulted in new oversized / invalid messages")
}

o.TargetWriteOversized(res)
messagesToSend = res.Failed
return err
})
if err2 != nil {
return err2
}
}

// Send invalid message buffer
messagesToSend = resCast.Invalid
if len(messagesToSend) > 0 {
err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error {
res, err := ft.WriteInvalid(messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
log.Fatal("Invalid message transformation resulted in new invalid / oversized messages")
}

o.TargetWriteInvalid(res)
messagesToSend = res.Failed
return err
})
if err3 != nil {
return err3
}
}

return nil
}

// This would replace the sourceWrite function, and the batch transformation and target would read from the supplied channel
func sourceReadAndTransformFunc(tr transform.TransformationApplyFunctionRefactored, ft failureiface.Failure, o *observer.Observer, c chan *models.Message) func(messages []*models.Message) error {
return func(messages []*models.Message) error {

// Successful transforms are immediately fed to the channel, we no longer process them in this function.
// Same with acking filtered messages. We can do that immediately.
// For now, we can continue to deal with failures here - but in a real implementation perhaps those should be handled in a similar flow.
transformResult := tr(messages, c)

// observer stuff can still go here
filterRes := models.NewFilterResult(transformResult.Filtered)
o.Filtered(filterRes)

// Deal with transformed invalids -
// TODO: This pattern should probably change in a full refactor, and perhaps use a separate channel too. :thinking_face:
// It def should, then we can have all sources of invalids pop their data into a channel (as we encounter them), and have one thing read from it and deal with it.
messagesToSend := transformResult.Invalid
if len(messagesToSend) > 0 {
err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error {
res, err := ft.WriteInvalid(messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
log.Fatal("Invalid message transformation resulted in new invalid / oversized messages")
}

o.TargetWriteInvalid(res)
messagesToSend = res.Failed
return err
})
if err3 != nil {
return err3
}
}

return nil
}
}

// BatchTransformationFunction would live elsewhere and be composable like the other transoformation functions
func BatchTransformationFunction(batch []*models.TargetBatch) []*models.TargetBatch {

// imaine this is composable like transformaion functions, and does something :D

// The templater would fit here along the following lines:
const templ = `{
attributes: [ {{$first_1 := true}}
{{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}}
{{printf "%s" .attribute_data}}{{end}}
],
events: [ {{$first_2 := true}}
{{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}}
{{printf "%s" .event_data}}{{end}}
]
}`

for _, b := range batch {
formatted := []map[string]json.RawMessage{}
for _, msg := range b.OriginalMessages {
// Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here)
var asMap map[string]json.RawMessage

if err := json.Unmarshal(msg.Data, &asMap); err != nil {
panic(err)
}

formatted = append(formatted, asMap)
}
var buf bytes.Buffer

t := template.Must(template.New("example").Parse(templ))
t.Execute(&buf, formatted)

// Assign the templated request to the HTTPRequestBody field
b.HTTPRequestBody = buf.Bytes()

}

return batch
}

// sourceWriteFunc builds the function which wraps the different objects together to handle:
//
// 1. Sending messages to the target
Expand Down Expand Up @@ -217,46 +399,6 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
OriginalMessages: messagesToSend,
HTTPRequestBody: nil}}

BatchTransformationFunction := func(batch []*models.TargetBatch) []*models.TargetBatch {

// imaine this is composable like transformaion functions, and does something :D

// The templater would fit here along the following lines:
const templ = `{
attributes: [ {{$first_1 := true}}
{{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}}
{{printf "%s" .attribute_data}}{{end}}
],
events: [ {{$first_2 := true}}
{{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}}
{{printf "%s" .event_data}}{{end}}
]
}`

for _, b := range batch {
formatted := []map[string]json.RawMessage{}
for _, msg := range b.OriginalMessages {
// Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here)
var asMap map[string]json.RawMessage

if err := json.Unmarshal(msg.Data, &asMap); err != nil {
panic(err)
}

formatted = append(formatted, asMap)
}
var buf bytes.Buffer

t := template.Must(template.New("example").Parse(templ))
t.Execute(&buf, formatted)

// Assign the templated request to the HTTPRequestBody field
b.HTTPRequestBody = buf.Bytes()

}

return batch
}
messageBatches = BatchTransformationFunction(messageBatches)

res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
Expand Down
55 changes: 55 additions & 0 deletions pkg/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type TransformationFunction func(*models.Message, interface{}) (*models.Message,
// TransformationApplyFunction dereferences messages before running transformations, and returns a TransformationResult
type TransformationApplyFunction func([]*models.Message) *models.TransformationResult

// TransformationApplyFunctionRefactored dereferences messages before running transformations, and returns a TransformationResult
type TransformationApplyFunctionRefactored func([]*models.Message, chan *models.Message) *models.TransformationResult

// TransformationGenerator returns a TransformationApplyFunction from a provided set of TransformationFunctions
type TransformationGenerator func(...TransformationFunction) TransformationApplyFunction

Expand Down Expand Up @@ -66,3 +69,55 @@ func NewTransformation(tranformFunctions ...TransformationFunction) Transformati
return models.NewTransformationResult(successList, filteredList, failureList)
}
}

// NewTransformationRefactored constructs a function which applies all transformations to all messages, returning a TransformationResult.
func NewTransformationRefactored(tranformFunctions ...TransformationFunction) TransformationApplyFunctionRefactored {
return func(messages []*models.Message, c chan *models.Message) *models.TransformationResult {
successList := make([]*models.Message, 0, len(messages))
filteredList := make([]*models.Message, 0, len(messages))
failureList := make([]*models.Message, 0, len(messages))
// If no transformations, just return the result rather than shuffling data between slices

// if len(tranformFunctions) == 0 {
// return models.NewTransformationResult(messages, filteredList, failureList)
// }
// ^^ Maybe this should get removed reagardless

for _, message := range messages {
msg := *message // dereference to avoid amending input
// ^^ That shouldn't be necessary
success := &msg // success must be both input and output to a TransformationFunction, so we make this pointer.
var failure *models.Message
var filtered *models.Message
var intermediate interface{}
for _, transformFunction := range tranformFunctions {
// Overwrite the input for each iteration in sequence of transformations,
// since the desired result is a single transformed message with a nil failure, or a nil message with a single failure
success, filtered, failure, intermediate = transformFunction(success, intermediate)
if failure != nil || filtered != nil {
break
}
}
if success != nil {
success.TimeTransformed = time.Now().UTC()
// Pass successes to the channel as we get them
c <- success
successList = append(successList, success)
}
// We don't append TimeTransformed in the failure or filtered cases, as it is less useful, and likely to skew metrics
if filtered != nil {
// Ack straight away - there's an argument this is what it should be regardless.
if filtered.AckFunc != nil {
filtered.AckFunc()
}
filteredList = append(filteredList, filtered)
}
if failure != nil {
// Probably we should pass to a chaneel to deal with failures too.
failureList = append(failureList, failure)
}
}
// Assuming we move handling of failures into here too, this now only serves to
return models.NewTransformationResult(successList, filteredList, failureList)
}
}
34 changes: 34 additions & 0 deletions pkg/transform/transformconfig/transform_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,37 @@ func GetTransformations(c *config.Config, supportedTransformations []config.Conf

return transform.NewTransformation(funcs...), nil
}

// GetTransformationsRefactored builds and returns transformationApplyFunction
// from the transformations configured.
func GetTransformationsRefactored(c *config.Config, supportedTransformations []config.ConfigurationPair) (transform.TransformationApplyFunctionRefactored, error) {
funcs := make([]transform.TransformationFunction, 0)

for _, transformation := range c.Data.Transformations {

useTransf := transformation.Use
decoderOpts := &config.DecoderOptions{
Input: useTransf.Body,
}

var component interface{}
var err error
for _, pair := range supportedTransformations {
if pair.Name == useTransf.Name {
plug := pair.Handle
component, err = c.CreateComponent(plug, decoderOpts)
if err != nil {
return nil, err
}
}
}

f, ok := component.(transform.TransformationFunction)
if !ok {
return nil, fmt.Errorf("could not interpret transformation configuration for %q", useTransf.Name)
}
funcs = append(funcs, f)
}

return transform.NewTransformationRefactored(funcs...), nil
}

0 comments on commit 46437bf

Please sign in to comment.