-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cross event transformation #331
Conversation
e2f119a
to
3c8e391
Compare
@@ -127,6 +127,7 @@ func AdaptKinesisTargetFunc(f func(c *KinesisTargetConfig) (*KinesisTarget, erro | |||
func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { | |||
kt.log.Debugf("Writing %d messages to stream ...", len(messages)) | |||
|
|||
// TODO: Replace with new batch transformation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "replace" you mean change from passing in messages []*models.Message
to passing in batches []models.MessageBatch
? I imagine the chunking logic here would need to stay still but you are just working across batches instead of messages right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean replace this bit where we call the Chunk function directly with the new function we have in this PR and pass it as a batch transformation.
The chunking logic necessarily remains yes - the change here is just to run it as part of the new structure.
type MessageBatch struct { | ||
OriginalMessages []*Message // Most targets will use the data from here, but where we have a http templating transformation, we would use this to ack batches of messages | ||
BatchData []byte // Where we template http requests, we use this to define the body of the request | ||
HTTPHeaders map[string]string // For dynamic headers feature |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a huge fan of having a target
specific implementation mixed in with a generic model here. Is there another way to carry this data through without it being specifically mapped to http
?
Would something like Parameters map[string]interface{}
where in http
you can then access a HTTPHeaders
key within the Parameters
work to abstract this away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love it either. This actually comes from having implemented the dynamic header transformation - which has the same problem for the message model.
I considered that way of doing things but it's a trade-off - the downside of that approach is that you have a more obscure API and the target's logic depends on that specific key, but the api defines it as being anything.
I experimented with other things we could do but didn't find an elegant solution (yet), and it didn't feel like it serves the project well to labour on it for too long.
Right now we only have one thing that needs to do this, so my thinking was that this will do for the moment but when we need to design for further similar things we should revisit the api design.
I'm not massively opposed to doing what you suggest either - I just haven't given up on finding something better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point! As long as the seed of "maybe we should change this" is planted I am fine with it staying where it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not even that much of a maybe to be honest! Just needs to percolate a bit. Perhaps we will even see the answer when the rest of this refactor falls into place.
cmd/aws/cli/main.go
Outdated
@@ -14,6 +14,7 @@ package main | |||
import ( | |||
"github.com/snowplow/snowbridge/cmd/cli" | |||
"github.com/snowplow/snowbridge/config" | |||
"github.com/snowplow/snowbridge/pkg/batchtransform/batchtransformconfig" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have already github.com/snowplow/snowbridge/pkg/transform
so what do you think about making batch part of it, like: "github.com/snowplow/snowbridge/pkg/transform/batch/....
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm good point - batch transformations are a different class of thing to my mind - hence why I did it this way to begine with.
But from a project organisation point of view it might make more sense to organise things so that we have transform/batch
and transform/{the existing ones}
. Major downside is that we now need to come up with a name for that :D lmk if you have ideas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single
....? batchbutwithsize1
...? I give up, let's leave it :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notbatch
it is
@jbeemster should be pointers now and @pondzix redid the dir structure :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to approve at this point. No strong objections. My comments are only points i'd like to understand better or discuss for alternatives.
@@ -211,7 +228,7 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform | |||
messagesToSend := transformed.Result | |||
|
|||
res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) { | |||
res, err := t.Write(messagesToSend) | |||
res, err := t.Write(messagesToSend, btr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder whether there is an alternative so that the target does not need to know about batch transformations. Are there other reasons for this besides the chunking and possible group-by?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started with a design that segmented them, but it left things very messy because the target needs to be aware of the transformation in order to decide how to send the data.
Similarly, the dynamic headers feature leaves us with a challenge here. Necessarily it must group data by headers, before a request template is created.
From a configuration perspective, if this logic is upstream of the target, it seems very easy to break the target by configuring a separate feature.
I don't know if it's the best design, but it's what I came up with as an attempt to reconcile this with the concept of solving for batch transformations more generically.
chunkedBatches := make([]*models.MessageBatch, 0) | ||
|
||
for _, batch := range batchesIn { | ||
chunks, oversized := models.GetChunkedMessages(batch.OriginalMessages, chunkSize, maxMessageByteSize, maxChunkByteSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetChunkedMessages
now feels more like it belongs to target interface. Do you think this (it actually becoming a target method) could be of help to move batch transformation logic upstream of target.Write?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so - the problem is that where we have templating, this logic must occur before the templater.
func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { | ||
// When we have dynamic headers, batching by header must necessarily run first. This is a http specific function, | ||
// so defining it here and fixing it into the Write function avoids complexity in configuration | ||
func (ht *HTTPTarget) groupByDynamicHeaders(batches []*models.MessageBatch) ([]*models.MessageBatch, []*models.Message, []*models.Message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think i have an idea, but not sure exactly about what you mean by configuration complexity. Is it about the feature switch? So far i'd still prefer batch transformations decoupled, so commenting to understand whether this could be a counter argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must group the data by dynamic headers before we create a request template. If we create the template before that then we cannot batch only events with the same headers together afterwards.
If we decouple this from the target, then the user has to understand this nuance of how the code works in order to configure the app correctly. Or to put it another way - they can have a valid configuration that fits the API that we have provided, which breaks a feature.
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining @colmsnowplow ! It does make more sense.
To confirm, the order before HTTP target writes is:
Group-by > Chunks > Templater
Is this correct?
A side question: For the chuck size limits we won't take into account the template size. Is this ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason we can't take the size of the template into account. But that's more of an implementation detail than a matter of the design
This isn't ready for develop, but the refactoring part of things is basically done, so it feels like a good time to get a review and split up the rest of the work to separate it from the complex bits.
There are still a lot of TODO notes etc in the code, and the work to migrate the targets to the new structure, fix and add tests etc. is yet to happen.
But at this point this PR is in service of getting a review of the overall structure. we can treat the feature branch as a WIP, and get it ready for a PR to develop bit by bit.