-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add spEnrichedFilter #60
Conversation
Heads up, I realised today that I forgot to address the issue that when we get the value it's an interface of any type - so latest commits coerce to string when matching. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured this is good enough for a simple first instrumentation.
I agree here. We could go to town with this feature but I believe we should iterate as needs arise. This solves our initial issues of only sending a subset of certain events to Kafka.
Should some requirement need to be satisfied that requires more than one field match, or both an equality and negation condition, we can chain more than one filter in the transformations list.
If I'm understanding this right, you're suggest add AND support, not just OR support as it currently is. I think this would be very valuable. In a recent e-mail to a customer we stated this would work:
Apply multiple filters with an AND condition
app_id = web AND event_name = link_click
app_id = web AND event_name != [page_ping OR my_custom_event]
So adding AND might be a worthwhile endeavour. I think that last example can be app_id = web AND event_name != page_ping AND event_name != my_custom_event
return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, interface{}) { | ||
|
||
// Check for a negation condition first | ||
keyValues := strings.SplitN(filterConfig, "!=", 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I first read the PR words, I thought it was multiple statements split with |
but now I realise it's a single field with multiple options. I think it might be worth adding multiple field filters rather than multiple values per field, as that offers more flexibility without too much additional complexity (a little more though).
e.g.
Rather than MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value1}|{value2}|...
we have MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value1}|{field}=={value2}|{field2}=={value25}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So following our chat - some clarification. An individual filter can only deal with one field, but can match n values. So an individual filter can either:
- Include the event if the value is one of a set of values,
OR - Exclude the event if the value is one of a set of values
However, we can apply as many filters as we like in a row - essentially chaining the above one after another. So your above example would be configured like so:
MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value1}|{value2},spEnrichedFilter:{field2}=={value25}
So if we have conditions which depend on multiple fields, we can do it as long as we require both conditions to be met. If we require that one or the other condition is met, we currently cannot configure that. So, if the example requirement was:
[field == value1|value2] OR [field2 == value25]
Then we'd be out of luck.
keyValues = strings.SplitN(filterConfig, "==", 2) | ||
keepMessage = false | ||
} | ||
// TODO: Design - Should there be validation of the input here, or perhaps in the config? Or at all? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should definitely make sure the input is valid. It'd be pretty easy to make a mistake when configuring this. Even if it's just ensuring all the characters are valid and it's been parsed resonably, if not then responding with an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - needs testing but as we discussed moving this section of code out of the returned function and into the body of NewSpEnrichedFilterFunction
should achieve what we need.
if message.AckFunc != nil { | ||
message.AckFunc() | ||
} | ||
return nil, nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite clear, marking it as neither a success or a failure. It seems a little odd that a "transformation" can Ack a message though, it feels like a bit of an unexpected side effect of a transformation.
I wonder if we can take the same logic of the transformation section and pull it into a filter section in the code base to keep the two concerns separated? This might also afford us some future flexibility in filtering without having to try and squeeze it into the same shape as a transformation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think I agree, but would consider it out of scope of what we're doing here - which is to implement whatever sensible filtering we can in the short term.
I think it makes sense to separate the concept of a filter from the concept of a transformation. The only time I can really see this becoming a problem is if a filter were to depend on a transformation... But current implementation doesn't allow for this anyway... So I think certainly food for thought and a more nuanced design is well worth thinking over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a stop gap how do you feel about extending the interface to return a filtered message list so that we can handle what to do with the filtered set outside of the transformations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth considering... I did knock the idea about in my head, but ultimately figured that if we're gonna just ack them and ignore them, it's more efficient to do it straight away rather than pass more data around.
Having said that I'm not opposed to doing it that way for this instrumentation!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On further reflection on doing this - I think I am actually opposed to doing it this way, for the simple reason that it involves a lot of changes in a lot of places that we'll presumably want to revert back should we redesign it out:
- Change the model for transformations
- Change each transformation to suit the new model
- Change the transformationApplyFunction to handle slices of filtered data
- Change the main function code in cmd/serverless and cmd/cli to ack the filtered data
Ultimately, it's not that these aren't worth doing, but that's as much work or more than just implementing Paul's suggestion that filters are done separately from transformations - and if we're to decide that's the best design, we'd need to undo all of the above in order to re-implement against that design.
Thoughts @jbeemster ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just popping an update here for clarity since I'm working on another branch atm, and we've progressed the discussion elsewhere.
We have discussed this and leaving things as they are isn't really an option - so we'll need to refactor something. At the moment am leaning towards Josh's suggestion of modifying the transformationResult model to allow a 'filtered' slice of messages, which are subsequently acked outside of the function.
Separating filters out completely means that we can't share information across a filter and a transformation, but filters can be much more powerful if they're allowed to depend on transformations IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't mean to approve this immediately, it's looking good and we could release it like this to solve the needs we have now but I think there's some discussion and thought to be had around some of the comments first.
Clicked approve by accident. Looking for feedback on comments before approving.
Addressing this in a top-level comment, since we had a discussion on a zoom and it seems pertinent to make sure it's clear - especially since on reflection my PR description doesn't cover the bases. What we can doA single filter can apply to only one field. It can match against any amount of values on that field. It can also be an inclusive condition (include the message if any match is found) or an exclusive condition (exclude the message if any match is found). We can combine these individual filters, by chaining them one after another. So if we have a requirement to filter on more than one field, we can instrument a filter on the first field, then a filter on the second event straight after - and both will be applied. So, currently we can instrument combinations of conditions as long as our logic requires that each of the conditions are met. For example:
Can be achieved by configuring a filter to match
What we cannot doWe cannot combine two filters with a disjunction ( IMO we probably do want to instrument this, because I can think of potentially typical use cases where it's required. For example, let's say you want all your web data replicated to a target, but you track web events via the JS tracker, and transactions via server-side tracking. In that scenario, you might require a filter condition something similar to:
IMO we should loop that kind of requirement back into the product planning process and spend some time designing the solution - because I think this requirement should involve both a refactor of how conditions are reasoned about, and also how configuration is provided. |
Did you guys consider just embedding Lua and letting the filter expression be a Lua function? |
@alexanderdean I opened an issue on the topic to keep discussion on this PR relevant to the PR itself. In brief, the answer is no - but we haven't considered much wrt design here - the requirement was to implement whatever we could do quickly, even if simple and limited - since there are some customers for whom it seems to be important. |
Makes sense, thanks @colmsnowplow |
OK @jbeemster and @paulboocock I think we are good to take another pass now. Changes since last review:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I much prefer the new approach, it more cleanly separates the differing concepts of filtering and transforming whilst not creating loads of duplication. I added a couple of nits around variable naming, take or leave them, no issues from me either way.
As I've mentioned, I think this is a good first pass at filtering and gives us the primary capabilities we need at reducing the volume being sent to Kafka for some customers.
Some future things to think about:
- Routing - Can we filter messages to different targets (ideally without having millions of Stream Replicator instances). The use case I see here is sending some message to one topic, and other messages to a different topic.
- More complex filtering - Combining fields to decide whether to include or drop them and/or running logic against fields for inclusion... is this number > 4, does this event contain X context, does this field + this other field = something or other. I think this more complex filtering becomes more interesting if we consider routing too.
|
||
return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { | ||
// Start by resetting keepMessage to initialKeepValue | ||
keepMessage := initialKeepValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: keepMessage
sounds like a message object that should be kept, not a boolean. I think shouldKeepMessage
is clearer. I had to read the whole function to figure out what this boolean did.
// Check for a negation condition first | ||
keyValues := strings.SplitN(filterConfig, "!=", 2) | ||
|
||
// Initial Keep Value is the t/f value of keepMessage, to be reset on every invocation of the returned function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Does t/f
value stand for true / false? I initially didn't quite understand this comment until I read the rest of the function, perhaps this variable should be isNegationFilter
then the below statement would be:
shouldKeepMessage
:= isNegationFilter
I think this helps in reading the below function, as it becomes clearer why we're keeping messages or not i.e. we're keeping them when we're running a negation function if we don't find a match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah t/f
is meant to mean true/false
.
I agree with this and the other comments. I did have my own concerns about readability here, just didn't have a clear picture of how to improve it, thanks!
evaluation: | ||
for _, valueToMatch := range strings.Split(keyValues[1], "|") { | ||
if valueToMatch == fmt.Sprintf("%v", valueFound) { // coerce to string as valueFound may be any type found in a Snowplow event | ||
keepMessage = !keepMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if isNegationFilter {
shouldKeepMessage = false
} else {
shouldKeepMessage = true
}
For me this makes it clearer what this does rather than flipping a boolean. I think this is worth the extra check for the sake of readability, especially since will only be called once per filter match. If you don't want the if
then shouldKeepMessage = !isNegationFilter
is clearer than flipping the same boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the brevity of shouldKeepMessage = !isNegationFilter
, but I think you're right that the if
statement is much better for readability so going with that. :)
cmd/config_test.go
Outdated
transformation, err := c.GetTransformations() | ||
assert.Nil(transformation) | ||
assert.NotNil(err) | ||
assert.Equal(`Filter Function Config does not match regex \S+(!=|==)[^\s\|]+((?:\|[^\s|]+)*)$`, err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be nice having something more meaningful in the error message than an enormous regex to help in actually solving the problem - if it's too much information to put in an error message maybe a link back to some documentation would help instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I wrote this same comment during my review when I was trying to understand what the regex did but that comment seems to have vanished 🤷. This regex is hard enough to understand and I agree, doesn't make it clear what valid examples would be from my point of view. I think one or two, short, valid examples would be better in the error.
@@ -107,6 +112,12 @@ func (o *Observer) Stop() { | |||
|
|||
// --- Functions called to push information to observer | |||
|
|||
// FilteredWrite pushes a filter result onto a channel for processing | |||
// by the observer | |||
func (o *Observer) Filtered(r *models.FilterResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Function is not called "FilteredWrite" as the doc would indicate - try running make lint
which automatically checks for these things (https://github.com/snowplow-devops/stream-replicator/blob/master/Makefile#L109)
"github.com/snowplow/snowplow-golang-analytics-sdk/analytics" | ||
) | ||
|
||
func intermediateAsParsed(intermediateState interface{}, message *models.Message) (analytics.ParsedEvent, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is part of the transform package not part of a specific snowplow_enriched package. As such it should probably have a name which more cleanly ties it to the fact that it is looking for an enriched parsed event - intermediateAsParsed
does not feel specific enough here for a function in a generically named package.
Closing in favour of the release PR I have created - apologies for messy workflow. |
Apologies Paul I should've replied to this before closing the PR On routing - yeah I see a lot of potential here but at the same time it's complicated - I think we need to consider how we ack messages (what if it goes to both endpoints for example) and how we design around failure. But I can see the path forwards on this requirement, with some effort it's possible if we want to do it. On More complex filtering - I think that most requirements like this can be achieved by instrumenting a transformation which does this more complex complication and passes the result to Feels like the requirement here is for a user to be able to implement their own algorithm for transformation and filtering, and so providing a means of plugging an arbitrary binary which conforms the transformation model we've now established sounds like a pretty solid way of enabling that, if it's a feasible approach. |
PR to add a filter on Enriched events. Details:
Filter function requires a field and value or set of values to match against as a condition. When the condition is true, the event is included. Otherwise it's acked without passing through to the target. One may pass a single value or a pipe separated list of values.
Configure as follows:
Equality condition:
MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value}
OR
MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}=={value1}|{value2}|...
Negation condition:
MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}!={value}
OR
MESSAGE_TRANSFORMATION=spEnrichedFilter:{field}!={value1}|{value2}|...
We may want to avoid awkwardness by establishing a more flexible configuration, but I figured this is good enough for a simple first instrumentation.
Should some requirement need to be satisfied that requires more than one field match, or both an equality and negation condition, we can chain more than one filter in the transformations list.
With this instrumentation, we cannot meet a requirement where a single condition depends on two fields (eg. if appID = test THEN filter on event name). Seems unlikely we need to go that far at least for the time being.
The filter uses the analytics SDK's get method, and so doesn't require transformation. Currently, only top-level fields are appropriate for a filter, nothing that's nested in a self-describing field (so as yet no using
unstruct
,contexts
, orderived_contexts
.