From 45bb33688a7c5e2a474767ceb151c786aebc83b3 Mon Sep 17 00:00:00 2001 From: adatzer Date: Tue, 11 Jun 2024 23:15:59 +0300 Subject: [PATCH 1/6] Add jq mapper as transformation --- pkg/transform/mapper.go | 416 ++++++++++++++++++++++------------ pkg/transform/mapper_test.go | 397 ++++++++++++++++++++++++++++++++ pkg/transform/mappper_test.go | 106 --------- 3 files changed, 673 insertions(+), 246 deletions(-) create mode 100644 pkg/transform/mapper_test.go delete mode 100644 pkg/transform/mappper_test.go diff --git a/pkg/transform/mapper.go b/pkg/transform/mapper.go index 2443bed5..9b3e5cfc 100644 --- a/pkg/transform/mapper.go +++ b/pkg/transform/mapper.go @@ -12,176 +12,312 @@ package transform import ( + "context" "encoding/json" - "errors" "fmt" - "log" + "time" "github.com/itchyny/gojq" + + "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" ) -// This works perfectly, is a million times simpler to implement, and prevents us from being blocked in future if we haven't predicted a requirement -var examplePureJQConfig = `{ - field1: .app_id, - field2: { field2: .contexts_com_acme_just_ints_1[0] }, - fieldWithOtherCoalesceExample: ( .app_id // .contexts_com_acme_just_ints_1[0] ), - manualUnnest: { just_ints_integerField: .contexts_com_acme_just_ints_1[0].integerField }, - arraySpecified: [ .app_id, .event_id ] - }` - -func grabFromGenericJQConfig(inputData map[string]any, config string) []byte { - query, err := gojq.Parse(config) - if err != nil { - panic(err) - } +// JQMapperConfig represents the configuration for the JQ transformation +type JQMapperConfig struct { + JQCommand string `hcl:"jq_command"` + RunTimeout int `hcl:"timeout_sec,optional"` + SpMode bool `hcl:"snowplow_mode,optional"` +} - res, err := grabValue(inputData, query) - if err != nil { - panic(err) - } +// JQMapper handles jq generic mapping as a transformation +type jqMapper struct { + JQCode *gojq.Code + RunTimeout time.Duration + SpMode bool +} - out, err := json.Marshal(res) - if err != nil { - panic(err) +// RunFunction runs a jq mapper transformation +func (jqm *jqMapper) RunFunction() TransformationFunction { + return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + input, err := mkJQInput(jqm, message, interState) + if err != nil { + message.SetError(err) + return nil, nil, message, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), jqm.RunTimeout) + defer cancel() + + iter := jqm.JQCode.RunWithContext(ctx, input) + // no looping since we only keep first value + v, ok := iter.Next() + if !ok { + message.SetError(fmt.Errorf("jq query got no output")) + return nil, nil, message, nil + } + + if err, ok := v.(error); ok { + message.SetError(err) + return nil, nil, message, nil + } + + // here v is any, so we Marshal. alternative: gojq.Marshal + data, err := json.Marshal(v) + if err != nil { + message.SetError(fmt.Errorf("error encoding jq query output data")) + return nil, nil, message, nil + } + + message.Data = data + return message, nil, nil, nil } +} - return out +// jqMapperAdapter implements the Pluggable interface +type jqMapperAdapter func(i interface{}) (interface{}, error) + +// ProvideDefault implements the ComponentConfigurable interface +func (f jqMapperAdapter) ProvideDefault() (interface{}, error) { + return &JQMapperConfig{ + RunTimeout: 15, + }, nil } -// Some magic may be required in the config parsing bit to enable this! -// If it's impractical we can structure the config in an easier to handle way. -var exampleParsedConfig = map[string]any{ - "field1": ".app_id", - "field2": map[string]any{"nestedField1": ".contexts_com_acme_just_ints_1[0]"}, - "fieldWithCoalesceExample": map[string]any{"coalesce": []string{"app_id", ".contexts_com_acme_just_ints_1[0]"}}, - // Seeing the implementation, the below is way cleaner! - "fieldWithOtherCoalesceExample": ".app_id // .contexts_com_acme_just_ints_1[0]", - - "manualUnnest": map[string]any{"just_ints_integerField": ".contexts_com_acme_just_ints_1[0].integerField"}, - // not sure if this should be allowable in config - "arraySpecified": []string{".app_id", ".event_id"}, +// Create implements the ComponentCreator interface +func (f jqMapperAdapter) Create(i interface{}) (interface{}, error) { + return f(i) } -// // // TODO: function to get values -// // // TODO: function to create objects/iterate config and create objects -// TODO: function to delete keys after? - -// For the delete key function, perhaps we factor the below such that traversing can be re-used? - -// In the actual implementation, we would prbably want to iterate the config to compile or parse queries, then later produce the data -// For this implementation sketch, I'll just do all the work here. -func grabLotsOfValues(inputData map[string]any, config map[string]any) map[string]any { - out := map[string]any{} - - for key, val := range config { - switch val.(type) { - // TODO: figure out what kinds of types our config parsing will actually produce, and if this approach or another is needed to handle them - case map[string]any: - - mapRes := grabLotsOfValues(inputData, val.(map[string]any)) - // TODO: either have this function return nil or check for empty map here. - out[key] = mapRes - case []map[string]any: - // Seems doable but not implemented yet. - case []string: - // The way I've structured this function, it's a bit more complex to support a coalesce option. - // We could refactor things so that this could be handled slightly more elegantly, - // but I think it has become fairly clear that the best option is Nick's suggestion - just let jq syntax support this. - if key == "coalesce" { - for _, item := range val.([]string) { // only slice of string allowed - query, err := gojq.Parse(item) - if err != nil { - panic(err) - } - outVal, err := grabValue(inputData, query) - if outVal != nil { - out[key] = outVal - break - } - } - break - } else { - outSlice := []any{} - // Probably could be done with less repeated code - for _, item := range val.([]string) { - query, err := gojq.Parse(item) - if err != nil { - panic(err) - } - outVal, err := grabValue(inputData, query) - if outVal != nil { - // Don't add nil keys - outSlice = append(outSlice, outVal) - } - } - // TODO: Do something to not add empty arrays - out[key] = outSlice - } - case string: - query, err := gojq.Parse(val.(string)) - if err != nil { - panic(err) - } - outVal, err := grabValue(inputData, query) - if outVal != nil { - // Don't add nil keys - out[key] = outVal - } - default: - fmt.Println("something went wrong here") - fmt.Println(key) - fmt.Println(val) +// jqMapperAdapterGenerator returns a jqAdapter +func jqMapperAdapterGenerator(f func(c *JQMapperConfig) (TransformationFunction, error)) jqMapperAdapter { + return func(i interface{}) (interface{}, error) { + cfg, ok := i.(*JQMapperConfig) + if !ok { + return nil, fmt.Errorf("invalid input, expected JQMapperConfig") } + + return f(cfg) } - return out } -// We may want to run gojq.Compile() on startup for each option, pass a *gojq.Code here -func grabValue(inputData map[string]any, query *gojq.Query) (any, error) { - - var grabbedValue any - - iter := query.Run(inputData) // or query.RunWithContext +// JQMapperConfigFunction returns a jq mapper transformation function from a JQMapperConfig +func JQMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { + query, err := gojq.Parse(c.JQCommand) + if err != nil { + return nil, fmt.Errorf("error parsing jq command: %q", err.Error()) + } - v, ok := iter.Next() - if !ok { - return nil, errors.New("TODO: ADD ERROR HERE") + code, err := gojq.Compile(query) + if err != nil { + return nil, fmt.Errorf("error compiling jq query: %q", err.Error()) } - if err, ok := v.(error); ok { - return nil, err + jq := &jqMapper{ + JQCode: code, + RunTimeout: time.Duration(c.RunTimeout) * time.Second, + SpMode: c.SpMode, } - grabbedValue = v - return grabbedValue, nil + return jq.RunFunction(), nil } -// Mapper is // TODO: Add description -func Mapper(message *models.Message, intermediateState interface{}) { +// JQMapperConfigPair is a configuration pair for the jq mapper transformation +var JQMapperConfigPair = config.ConfigurationPair{ + Name: "jq", + Handle: jqMapperAdapterGenerator(JQMapperConfigFunction), +} - var input map[string]any +// mkJQInput ensures the input to JQ query is of expected type +func mkJQInput(jqm *jqMapper, message *models.Message, interState interface{}) (map[string]interface{}, error) { + if !jqm.SpMode { + // gojq input can only be map[string]any or []any + // here we only consider the first, but we could also expand + var input map[string]interface{} + err := json.Unmarshal(message.Data, &input) + if err != nil { + return nil, err + } - json.Unmarshal(message.Data, &input) + return input, nil + } - // query, err := gojq.Parse(".bar.emptyKey // .bar.baz") - query, err := gojq.Parse(".contexts_com_acme_just_ints_1[0]") + parsedEvent, err := IntermediateAsSpEnrichedParsed(interState, message) if err != nil { - log.Fatalln(err) + return nil, err } - // input := map[string]any{"foo": []any{1, 2, 3}, "bar": map[string]any{"baz": "someValue", "emptyKey": nil}} - iter := query.Run(input) // or query.RunWithContext - for { - v, ok := iter.Next() - if !ok { - break - } - if err, ok := v.(error); ok { - if err, ok := err.(*gojq.HaltError); ok && err.Value() == nil { - break - } - log.Fatalln(err) - } - fmt.Printf("%#v\n", v) + + spInput, err := parsedEvent.ToMap() + if err != nil { + return nil, err } + + return spInput, nil } + +// Commenting out for reference + +// // This works perfectly, is a million times simpler to implement, and prevents us from being blocked in future if we haven't predicted a requirement +// var examplePureJQConfig = `{ +// field1: .app_id, +// field2: { field2: .contexts_com_acme_just_ints_1[0] }, +// fieldWithOtherCoalesceExample: ( .app_id // .contexts_com_acme_just_ints_1[0] ), +// manualUnnest: { just_ints_integerField: .contexts_com_acme_just_ints_1[0].integerField }, +// arraySpecified: [ .app_id, .event_id ] +// }` + +// func grabFromGenericJQConfig(inputData map[string]any, config string) []byte { +// query, err := gojq.Parse(config) +// if err != nil { +// panic(err) +// } + +// res, err := grabValue(inputData, query) +// if err != nil { +// panic(err) +// } + +// out, err := json.Marshal(res) +// if err != nil { +// panic(err) +// } + +// return out +// } + +// // Some magic may be required in the config parsing bit to enable this! +// // If it's impractical we can structure the config in an easier to handle way. +// var exampleParsedConfig = map[string]any{ +// "field1": ".app_id", +// "field2": map[string]any{"nestedField1": ".contexts_com_acme_just_ints_1[0]"}, +// "fieldWithCoalesceExample": map[string]any{"coalesce": []string{"app_id", ".contexts_com_acme_just_ints_1[0]"}}, +// // Seeing the implementation, the below is way cleaner! +// "fieldWithOtherCoalesceExample": ".app_id // .contexts_com_acme_just_ints_1[0]", + +// "manualUnnest": map[string]any{"just_ints_integerField": ".contexts_com_acme_just_ints_1[0].integerField"}, +// // not sure if this should be allowable in config +// "arraySpecified": []string{".app_id", ".event_id"}, +// } + +// // // // TODO: function to get values +// // // // TODO: function to create objects/iterate config and create objects +// // TODO: function to delete keys after? + +// // For the delete key function, perhaps we factor the below such that traversing can be re-used? + +// // In the actual implementation, we would prbably want to iterate the config to compile or parse queries, then later produce the data +// // For this implementation sketch, I'll just do all the work here. +// func grabLotsOfValues(inputData map[string]any, config map[string]any) map[string]any { +// out := map[string]any{} + +// for key, val := range config { +// switch val.(type) { +// // TODO: figure out what kinds of types our config parsing will actually produce, and if this approach or another is needed to handle them +// case map[string]any: + +// mapRes := grabLotsOfValues(inputData, val.(map[string]any)) +// // TODO: either have this function return nil or check for empty map here. +// out[key] = mapRes +// case []map[string]any: +// // Seems doable but not implemented yet. +// case []string: +// // The way I've structured this function, it's a bit more complex to support a coalesce option. +// // We could refactor things so that this could be handled slightly more elegantly, +// // but I think it has become fairly clear that the best option is Nick's suggestion - just let jq syntax support this. +// if key == "coalesce" { +// for _, item := range val.([]string) { // only slice of string allowed +// query, err := gojq.Parse(item) +// if err != nil { +// panic(err) +// } +// outVal, err := grabValue(inputData, query) +// if outVal != nil { +// out[key] = outVal +// break +// } +// } +// break +// } else { +// outSlice := []any{} +// // Probably could be done with less repeated code +// for _, item := range val.([]string) { +// query, err := gojq.Parse(item) +// if err != nil { +// panic(err) +// } +// outVal, err := grabValue(inputData, query) +// if outVal != nil { +// // Don't add nil keys +// outSlice = append(outSlice, outVal) +// } +// } +// // TODO: Do something to not add empty arrays +// out[key] = outSlice +// } +// case string: +// query, err := gojq.Parse(val.(string)) +// if err != nil { +// panic(err) +// } +// outVal, err := grabValue(inputData, query) +// if outVal != nil { +// // Don't add nil keys +// out[key] = outVal +// } +// default: +// fmt.Println("something went wrong here") +// fmt.Println(key) +// fmt.Println(val) +// } +// } +// return out +// } + +// // We may want to run gojq.Compile() on startup for each option, pass a *gojq.Code here +// func grabValue(inputData map[string]any, query *gojq.Query) (any, error) { + +// var grabbedValue any + +// iter := query.Run(inputData) // or query.RunWithContext + +// v, ok := iter.Next() +// if !ok { +// return nil, errors.New("TODO: ADD ERROR HERE") +// } +// if err, ok := v.(error); ok { + +// return nil, err +// } +// grabbedValue = v + +// return grabbedValue, nil +// } + +// // Mapper is // TODO: Add description +// func Mapper(message *models.Message, intermediateState interface{}) { + +// var input map[string]any + +// json.Unmarshal(message.Data, &input) + +// // query, err := gojq.Parse(".bar.emptyKey // .bar.baz") +// query, err := gojq.Parse(".contexts_com_acme_just_ints_1[0]") +// if err != nil { +// log.Fatalln(err) +// } +// // input := map[string]any{"foo": []any{1, 2, 3}, "bar": map[string]any{"baz": "someValue", "emptyKey": nil}} +// iter := query.Run(input) // or query.RunWithContext +// for { +// v, ok := iter.Next() +// if !ok { +// break +// } +// if err, ok := v.(error); ok { +// if err, ok := err.(*gojq.HaltError); ok && err.Value() == nil { +// break +// } +// log.Fatalln(err) +// } +// fmt.Printf("%#v\n", v) +// } +// } diff --git a/pkg/transform/mapper_test.go b/pkg/transform/mapper_test.go new file mode 100644 index 00000000..b86a7790 --- /dev/null +++ b/pkg/transform/mapper_test.go @@ -0,0 +1,397 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package transform + +import ( + "errors" + "reflect" + "strings" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/stretchr/testify/assert" + + "github.com/snowplow/snowbridge/pkg/models" +) + +func TestJQRunFunction_SpMode_true(t *testing.T) { + testCases := []struct { + Scenario string + JQCommand string + InputMsg *models.Message + InputInterState interface{} + Expected map[string]*models.Message + ExpInterState interface{} + Error error + }{ + { + Scenario: "happy_path", + JQCommand: `{foo: .app_id}`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":"test-data1"}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, + } + + for _, tt := range testCases { + t.Run(tt.Scenario, func(t *testing.T) { + assert := assert.New(t) + + jqConfig := &JQMapperConfig{ + JQCommand: tt.JQCommand, + RunTimeout: 15, + SpMode: true, + } + + transFun, err := JQMapperConfigFunction(jqConfig) + assert.Nil(err) + + s, f, e, i := transFun(tt.InputMsg, tt.InputInterState) + + if !reflect.DeepEqual(i, tt.ExpInterState) { + t.Errorf("\nINTERMEDIATE_STATE:\nGOT:\n%s\nEXPECTED:\n%s", + spew.Sdump(i), + spew.Sdump(tt.ExpInterState)) + } + + if e == nil && tt.Error != nil { + t.Fatalf("missed expected error") + } + + if e != nil { + gotErr := e.GetError() + expErr := tt.Error + if expErr == nil { + t.Fatalf("got unexpected error: %s", gotErr.Error()) + } + + if !strings.Contains(gotErr.Error(), expErr.Error()) { + t.Errorf("GOT_ERROR:\n%s\n does not contain\nEXPECTED_ERROR:\n%s", + gotErr.Error(), + expErr.Error()) + } + } + + assertMessagesCompareJQ(t, s, tt.Expected["success"], "success") + assertMessagesCompareJQ(t, f, tt.Expected["filtered"], "filtered") + assertMessagesCompareJQ(t, e, tt.Expected["failed"], "failed") + }) + } +} + +func TestJQRunFunction_SpMode_false(t *testing.T) { + testCases := []struct { + Scenario string + JQCommand string + InputMsg *models.Message + InputInterState interface{} + Expected map[string]*models.Message + ExpInterState interface{} + Error error + }{ + { + Scenario: "happy_path", + JQCommand: `{foo: .app_id}`, + InputMsg: &models.Message{ + Data: snowplowJSON1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":"test-data1"}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, + { + Scenario: "with_multiple_returns", + JQCommand: ` +{ + bar: .foo | .. +}`, + InputMsg: &models.Message{ + Data: []byte(`{"foo":[1,2,3]}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"bar":[1,2,3]}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, + } + + for _, tt := range testCases { + t.Run(tt.Scenario, func(t *testing.T) { + assert := assert.New(t) + + jqConfig := &JQMapperConfig{ + JQCommand: tt.JQCommand, + RunTimeout: 15, + SpMode: false, + } + + transFun, err := JQMapperConfigFunction(jqConfig) + assert.Nil(err) + + s, f, e, i := transFun(tt.InputMsg, tt.InputInterState) + + if !reflect.DeepEqual(i, tt.ExpInterState) { + t.Errorf("\nINTERMEDIATE_STATE:\nGOT:\n%s\nEXPECTED:\n%s", + spew.Sdump(i), + spew.Sdump(tt.ExpInterState)) + } + + if e == nil && tt.Error != nil { + t.Fatalf("missed expected error") + } + + if e != nil { + gotErr := e.GetError() + expErr := tt.Error + if expErr == nil { + t.Fatalf("got unexpected error: %s", gotErr.Error()) + } + + if !strings.Contains(gotErr.Error(), expErr.Error()) { + t.Errorf("GOT_ERROR:\n%s\n does not contain\nEXPECTED_ERROR:\n%s", + gotErr.Error(), + expErr.Error()) + } + } + + assertMessagesCompareJQ(t, s, tt.Expected["success"], "success") + assertMessagesCompareJQ(t, f, tt.Expected["filtered"], "filtered") + assertMessagesCompareJQ(t, e, tt.Expected["failed"], "failed") + }) + } +} + +func TestJQRunFunction_errors(t *testing.T) { + testCases := []struct { + Scenario string + JQConfig *JQMapperConfig + InputMsg *models.Message + InputInterState interface{} + Expected map[string]*models.Message + ExpInterState interface{} + Error error + }{ + { + Scenario: "happy_path", + JQConfig: &JQMapperConfig{ + JQCommand: `{foo: .app_id}`, + RunTimeout: 0, + SpMode: true, + }, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("context deadline"), + }, + } + + for _, tt := range testCases { + t.Run(tt.Scenario, func(t *testing.T) { + assert := assert.New(t) + + transFun, err := JQMapperConfigFunction(tt.JQConfig) + assert.Nil(err) + + s, f, e, i := transFun(tt.InputMsg, tt.InputInterState) + + if !reflect.DeepEqual(i, tt.ExpInterState) { + t.Errorf("\nINTERMEDIATE_STATE:\nGOT:\n%s\nEXPECTED:\n%s", + spew.Sdump(i), + spew.Sdump(tt.ExpInterState)) + } + + if e == nil && tt.Error != nil { + t.Fatalf("missed expected error") + } + + if e != nil { + gotErr := e.GetError() + expErr := tt.Error + if expErr == nil { + t.Fatalf("got unexpected error: %s", gotErr.Error()) + } + + if !strings.Contains(gotErr.Error(), expErr.Error()) { + t.Errorf("GOT_ERROR:\n%s\n does not contain\nEXPECTED_ERROR:\n%s", + gotErr.Error(), + expErr.Error()) + } + } + + assertMessagesCompareJQ(t, s, tt.Expected["success"], "success") + assertMessagesCompareJQ(t, f, tt.Expected["filtered"], "filtered") + assertMessagesCompareJQ(t, e, tt.Expected["failed"], "failed") + }) + } +} + +func assertMessagesCompareJQ(t *testing.T, act, exp *models.Message, hint string) { + t.Helper() + ok := false + headersOk := false + switch { + case act == nil: + ok = exp == nil + case exp == nil: + default: + pkOk := act.PartitionKey == exp.PartitionKey + dataOk := reflect.DeepEqual(act.Data, exp.Data) + cTimeOk := reflect.DeepEqual(act.TimeCreated, exp.TimeCreated) + pTimeOk := reflect.DeepEqual(act.TimePulled, exp.TimePulled) + tTimeOk := reflect.DeepEqual(act.TimeTransformed, exp.TimeTransformed) + ackOk := reflect.DeepEqual(act.AckFunc, exp.AckFunc) + headersOk = reflect.DeepEqual(act.HTTPHeaders, exp.HTTPHeaders) + + if pkOk && dataOk && cTimeOk && pTimeOk && tTimeOk && ackOk && headersOk { + ok = true + } + } + + if !ok { + // message.HTTPHeaders are not printed + if headersOk == false && act != nil && exp != nil { + t.Errorf("\nUnexpected HTTPHeaders:\nGOT:\n%s\nEXPECTED:\n%s\n", + spew.Sdump(act.HTTPHeaders), + spew.Sdump(exp.HTTPHeaders)) + } else { + t.Errorf("\nGOT:\n%s\nEXPECTED:\n%s\n", + spew.Sdump(act), + spew.Sdump(exp)) + } + } +} + +// tmp - Commenting out for reference + +// func TestJustJQ(t *testing.T) { +// // assert := assert.New(t) + +// inputData := &models.Message{ +// Data: snowplowJSON1, +// PartitionKey: "some-key", +// } + +// var input map[string]any + +// json.Unmarshal(inputData.Data, &input) + +// res := grabFromGenericJQConfig(input, examplePureJQConfig) + +// fmt.Println(string(res)) +// } + +// func TestGrabValue(t *testing.T) { +// assert := assert.New(t) + +// inputData := &models.Message{ +// Data: snowplowJSON1, +// PartitionKey: "some-key", +// } + +// query, err := gojq.Parse(".contexts_com_acme_just_ints_1[0].integerField") +// if err != nil { +// panic(err) +// } + +// var input map[string]any + +// json.Unmarshal(inputData.Data, &input) + +// valueFound, err := grabValue(input, query) +// if err != nil { +// panic(err) +// } + +// assert.Equal(float64(0), valueFound) + +// } + +// func TestMapper(t *testing.T) { +// assert := assert.New(t) + +// // Mapper(&models.Message{ +// // Data: snowplowJSON1, +// // PartitionKey: "some-key", +// // }, nil) + +// inputData := &models.Message{ +// Data: snowplowJSON1, +// PartitionKey: "some-key", +// } + +// assert.Nil(nil) + +// var input map[string]any + +// json.Unmarshal(inputData.Data, &input) + +// mapped := grabLotsOfValues(input, exampleParsedConfig) + +// fmt.Println(mapped) + +// // expectedMap := map[string]any{ +// // "arraySpecified": []string{"test-data1", "e9234345-f042-46ad-b1aa-424464066a33"}, +// // "field1": "test-data1", +// // "field2": map[string]any{ +// // "nestedField1": map[string]any{ +// // "integerField": float64(0), +// // }, +// // }, +// // // "fieldWithCoalesceExample": +// // } + +// // assert.Equal(expectedMap, mapped) +// } + +// /* +// fieldWithCoalesceExample:map[coalesce:[map[integerField:0]]] fieldWithOtherCoalesceExample:test-data1 manualUnnest:map[just_ints_integerField:0]] +// */ diff --git a/pkg/transform/mappper_test.go b/pkg/transform/mappper_test.go deleted file mode 100644 index 693c7053..00000000 --- a/pkg/transform/mappper_test.go +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Copyright (c) 2020-present Snowplow Analytics Ltd. - * All rights reserved. - * - * This software is made available by Snowplow Analytics, Ltd., - * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 - * located at https://docs.snowplow.io/limited-use-license-1.0 - * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION - * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. - */ - -package transform - -import ( - "encoding/json" - "fmt" - "testing" - - "github.com/itchyny/gojq" - "github.com/snowplow/snowbridge/pkg/models" - "github.com/stretchr/testify/assert" -) - -func TestJustJQ(t *testing.T) { - // assert := assert.New(t) - - inputData := &models.Message{ - Data: snowplowJSON1, - PartitionKey: "some-key", - } - - var input map[string]any - - json.Unmarshal(inputData.Data, &input) - - res := grabFromGenericJQConfig(input, examplePureJQConfig) - - fmt.Println(string(res)) -} - -func TestGrabValue(t *testing.T) { - assert := assert.New(t) - - inputData := &models.Message{ - Data: snowplowJSON1, - PartitionKey: "some-key", - } - - query, err := gojq.Parse(".contexts_com_acme_just_ints_1[0].integerField") - if err != nil { - panic(err) - } - - var input map[string]any - - json.Unmarshal(inputData.Data, &input) - - valueFound, err := grabValue(input, query) - if err != nil { - panic(err) - } - - assert.Equal(float64(0), valueFound) - -} - -func TestMapper(t *testing.T) { - assert := assert.New(t) - - // Mapper(&models.Message{ - // Data: snowplowJSON1, - // PartitionKey: "some-key", - // }, nil) - - inputData := &models.Message{ - Data: snowplowJSON1, - PartitionKey: "some-key", - } - - assert.Nil(nil) - - var input map[string]any - - json.Unmarshal(inputData.Data, &input) - - mapped := grabLotsOfValues(input, exampleParsedConfig) - - fmt.Println(mapped) - - // expectedMap := map[string]any{ - // "arraySpecified": []string{"test-data1", "e9234345-f042-46ad-b1aa-424464066a33"}, - // "field1": "test-data1", - // "field2": map[string]any{ - // "nestedField1": map[string]any{ - // "integerField": float64(0), - // }, - // }, - // // "fieldWithCoalesceExample": - // } - - // assert.Equal(expectedMap, mapped) -} - -/* - fieldWithCoalesceExample:map[coalesce:[map[integerField:0]]] fieldWithOtherCoalesceExample:test-data1 manualUnnest:map[just_ints_integerField:0]] -*/ From 63b53f9f59997cb60fb87da81f5db26d4671cb62 Mon Sep 17 00:00:00 2001 From: adatzer Date: Mon, 17 Jun 2024 12:27:44 +0300 Subject: [PATCH 2/6] Remove commented out part --- pkg/transform/mapper.go | 167 ----------------------------------- pkg/transform/mapper_test.go | 86 ------------------ 2 files changed, 253 deletions(-) diff --git a/pkg/transform/mapper.go b/pkg/transform/mapper.go index 9b3e5cfc..9c069d79 100644 --- a/pkg/transform/mapper.go +++ b/pkg/transform/mapper.go @@ -154,170 +154,3 @@ func mkJQInput(jqm *jqMapper, message *models.Message, interState interface{}) ( return spInput, nil } - -// Commenting out for reference - -// // This works perfectly, is a million times simpler to implement, and prevents us from being blocked in future if we haven't predicted a requirement -// var examplePureJQConfig = `{ -// field1: .app_id, -// field2: { field2: .contexts_com_acme_just_ints_1[0] }, -// fieldWithOtherCoalesceExample: ( .app_id // .contexts_com_acme_just_ints_1[0] ), -// manualUnnest: { just_ints_integerField: .contexts_com_acme_just_ints_1[0].integerField }, -// arraySpecified: [ .app_id, .event_id ] -// }` - -// func grabFromGenericJQConfig(inputData map[string]any, config string) []byte { -// query, err := gojq.Parse(config) -// if err != nil { -// panic(err) -// } - -// res, err := grabValue(inputData, query) -// if err != nil { -// panic(err) -// } - -// out, err := json.Marshal(res) -// if err != nil { -// panic(err) -// } - -// return out -// } - -// // Some magic may be required in the config parsing bit to enable this! -// // If it's impractical we can structure the config in an easier to handle way. -// var exampleParsedConfig = map[string]any{ -// "field1": ".app_id", -// "field2": map[string]any{"nestedField1": ".contexts_com_acme_just_ints_1[0]"}, -// "fieldWithCoalesceExample": map[string]any{"coalesce": []string{"app_id", ".contexts_com_acme_just_ints_1[0]"}}, -// // Seeing the implementation, the below is way cleaner! -// "fieldWithOtherCoalesceExample": ".app_id // .contexts_com_acme_just_ints_1[0]", - -// "manualUnnest": map[string]any{"just_ints_integerField": ".contexts_com_acme_just_ints_1[0].integerField"}, -// // not sure if this should be allowable in config -// "arraySpecified": []string{".app_id", ".event_id"}, -// } - -// // // // TODO: function to get values -// // // // TODO: function to create objects/iterate config and create objects -// // TODO: function to delete keys after? - -// // For the delete key function, perhaps we factor the below such that traversing can be re-used? - -// // In the actual implementation, we would prbably want to iterate the config to compile or parse queries, then later produce the data -// // For this implementation sketch, I'll just do all the work here. -// func grabLotsOfValues(inputData map[string]any, config map[string]any) map[string]any { -// out := map[string]any{} - -// for key, val := range config { -// switch val.(type) { -// // TODO: figure out what kinds of types our config parsing will actually produce, and if this approach or another is needed to handle them -// case map[string]any: - -// mapRes := grabLotsOfValues(inputData, val.(map[string]any)) -// // TODO: either have this function return nil or check for empty map here. -// out[key] = mapRes -// case []map[string]any: -// // Seems doable but not implemented yet. -// case []string: -// // The way I've structured this function, it's a bit more complex to support a coalesce option. -// // We could refactor things so that this could be handled slightly more elegantly, -// // but I think it has become fairly clear that the best option is Nick's suggestion - just let jq syntax support this. -// if key == "coalesce" { -// for _, item := range val.([]string) { // only slice of string allowed -// query, err := gojq.Parse(item) -// if err != nil { -// panic(err) -// } -// outVal, err := grabValue(inputData, query) -// if outVal != nil { -// out[key] = outVal -// break -// } -// } -// break -// } else { -// outSlice := []any{} -// // Probably could be done with less repeated code -// for _, item := range val.([]string) { -// query, err := gojq.Parse(item) -// if err != nil { -// panic(err) -// } -// outVal, err := grabValue(inputData, query) -// if outVal != nil { -// // Don't add nil keys -// outSlice = append(outSlice, outVal) -// } -// } -// // TODO: Do something to not add empty arrays -// out[key] = outSlice -// } -// case string: -// query, err := gojq.Parse(val.(string)) -// if err != nil { -// panic(err) -// } -// outVal, err := grabValue(inputData, query) -// if outVal != nil { -// // Don't add nil keys -// out[key] = outVal -// } -// default: -// fmt.Println("something went wrong here") -// fmt.Println(key) -// fmt.Println(val) -// } -// } -// return out -// } - -// // We may want to run gojq.Compile() on startup for each option, pass a *gojq.Code here -// func grabValue(inputData map[string]any, query *gojq.Query) (any, error) { - -// var grabbedValue any - -// iter := query.Run(inputData) // or query.RunWithContext - -// v, ok := iter.Next() -// if !ok { -// return nil, errors.New("TODO: ADD ERROR HERE") -// } -// if err, ok := v.(error); ok { - -// return nil, err -// } -// grabbedValue = v - -// return grabbedValue, nil -// } - -// // Mapper is // TODO: Add description -// func Mapper(message *models.Message, intermediateState interface{}) { - -// var input map[string]any - -// json.Unmarshal(message.Data, &input) - -// // query, err := gojq.Parse(".bar.emptyKey // .bar.baz") -// query, err := gojq.Parse(".contexts_com_acme_just_ints_1[0]") -// if err != nil { -// log.Fatalln(err) -// } -// // input := map[string]any{"foo": []any{1, 2, 3}, "bar": map[string]any{"baz": "someValue", "emptyKey": nil}} -// iter := query.Run(input) // or query.RunWithContext -// for { -// v, ok := iter.Next() -// if !ok { -// break -// } -// if err, ok := v.(error); ok { -// if err, ok := err.(*gojq.HaltError); ok && err.Value() == nil { -// break -// } -// log.Fatalln(err) -// } -// fmt.Printf("%#v\n", v) -// } -// } diff --git a/pkg/transform/mapper_test.go b/pkg/transform/mapper_test.go index b86a7790..c60565c7 100644 --- a/pkg/transform/mapper_test.go +++ b/pkg/transform/mapper_test.go @@ -309,89 +309,3 @@ func assertMessagesCompareJQ(t *testing.T, act, exp *models.Message, hint string } } } - -// tmp - Commenting out for reference - -// func TestJustJQ(t *testing.T) { -// // assert := assert.New(t) - -// inputData := &models.Message{ -// Data: snowplowJSON1, -// PartitionKey: "some-key", -// } - -// var input map[string]any - -// json.Unmarshal(inputData.Data, &input) - -// res := grabFromGenericJQConfig(input, examplePureJQConfig) - -// fmt.Println(string(res)) -// } - -// func TestGrabValue(t *testing.T) { -// assert := assert.New(t) - -// inputData := &models.Message{ -// Data: snowplowJSON1, -// PartitionKey: "some-key", -// } - -// query, err := gojq.Parse(".contexts_com_acme_just_ints_1[0].integerField") -// if err != nil { -// panic(err) -// } - -// var input map[string]any - -// json.Unmarshal(inputData.Data, &input) - -// valueFound, err := grabValue(input, query) -// if err != nil { -// panic(err) -// } - -// assert.Equal(float64(0), valueFound) - -// } - -// func TestMapper(t *testing.T) { -// assert := assert.New(t) - -// // Mapper(&models.Message{ -// // Data: snowplowJSON1, -// // PartitionKey: "some-key", -// // }, nil) - -// inputData := &models.Message{ -// Data: snowplowJSON1, -// PartitionKey: "some-key", -// } - -// assert.Nil(nil) - -// var input map[string]any - -// json.Unmarshal(inputData.Data, &input) - -// mapped := grabLotsOfValues(input, exampleParsedConfig) - -// fmt.Println(mapped) - -// // expectedMap := map[string]any{ -// // "arraySpecified": []string{"test-data1", "e9234345-f042-46ad-b1aa-424464066a33"}, -// // "field1": "test-data1", -// // "field2": map[string]any{ -// // "nestedField1": map[string]any{ -// // "integerField": float64(0), -// // }, -// // }, -// // // "fieldWithCoalesceExample": -// // } - -// // assert.Equal(expectedMap, mapped) -// } - -// /* -// fieldWithCoalesceExample:map[coalesce:[map[integerField:0]]] fieldWithOtherCoalesceExample:test-data1 manualUnnest:map[just_ints_integerField:0]] -// */ From 57fdeab7b7c643d9d3194cd8ec99b31833b1ba6e Mon Sep 17 00:00:00 2001 From: adatzer Date: Mon, 17 Jun 2024 14:17:03 +0300 Subject: [PATCH 3/6] Unexport config function --- pkg/transform/mapper.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/transform/mapper.go b/pkg/transform/mapper.go index 9c069d79..ebef1a4b 100644 --- a/pkg/transform/mapper.go +++ b/pkg/transform/mapper.go @@ -101,8 +101,8 @@ func jqMapperAdapterGenerator(f func(c *JQMapperConfig) (TransformationFunction, } } -// JQMapperConfigFunction returns a jq mapper transformation function from a JQMapperConfig -func JQMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { +// jqMapperConfigFunction returns a jq mapper transformation function from a JQMapperConfig +func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { query, err := gojq.Parse(c.JQCommand) if err != nil { return nil, fmt.Errorf("error parsing jq command: %q", err.Error()) @@ -125,7 +125,7 @@ func JQMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) { // JQMapperConfigPair is a configuration pair for the jq mapper transformation var JQMapperConfigPair = config.ConfigurationPair{ Name: "jq", - Handle: jqMapperAdapterGenerator(JQMapperConfigFunction), + Handle: jqMapperAdapterGenerator(jqMapperConfigFunction), } // mkJQInput ensures the input to JQ query is of expected type From 5c104275e828cd873e917b152bb179d27c478fa3 Mon Sep 17 00:00:00 2001 From: adatzer Date: Mon, 17 Jun 2024 20:19:01 +0300 Subject: [PATCH 4/6] Expand test cases --- pkg/transform/mapper_test.go | 210 +++++++++++++++++++++++++++++++++-- 1 file changed, 203 insertions(+), 7 deletions(-) diff --git a/pkg/transform/mapper_test.go b/pkg/transform/mapper_test.go index c60565c7..770779a1 100644 --- a/pkg/transform/mapper_test.go +++ b/pkg/transform/mapper_test.go @@ -52,6 +52,44 @@ func TestJQRunFunction_SpMode_true(t *testing.T) { ExpInterState: nil, Error: nil, }, + { + Scenario: "happy_path_with_Intermediate_state", + JQCommand: `{foo: .app_id}`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: SpTsv1Parsed, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":"test-data1"}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, + { + Scenario: "selecting_from_context", + JQCommand: `{foo: .contexts_nl_basjes_yauaa_context_1[0].operatingSystemName}`, + InputMsg: &models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": { + Data: []byte(`{"foo":"Unknown"}`), + PartitionKey: "some-key", + }, + "filtered": nil, + "failed": nil, + }, + ExpInterState: nil, + Error: nil, + }, } for _, tt := range testCases { @@ -64,8 +102,11 @@ func TestJQRunFunction_SpMode_true(t *testing.T) { SpMode: true, } - transFun, err := JQMapperConfigFunction(jqConfig) - assert.Nil(err) + transFun, err := jqMapperConfigFunction(jqConfig) + assert.NotNil(transFun) + if err != nil { + t.Fatalf("failed to create transformation function with error: %q", err.Error()) + } s, f, e, i := transFun(tt.InputMsg, tt.InputInterState) @@ -163,8 +204,11 @@ func TestJQRunFunction_SpMode_false(t *testing.T) { SpMode: false, } - transFun, err := JQMapperConfigFunction(jqConfig) - assert.Nil(err) + transFun, err := jqMapperConfigFunction(jqConfig) + assert.NotNil(transFun) + if err != nil { + t.Fatalf("failed to create transformation function with error: %q", err.Error()) + } s, f, e, i := transFun(tt.InputMsg, tt.InputInterState) @@ -210,7 +254,76 @@ func TestJQRunFunction_errors(t *testing.T) { Error error }{ { - Scenario: "happy_path", + Scenario: "not_a_map_a", + JQConfig: &JQMapperConfig{ + JQCommand: `.`, + RunTimeout: 5, + SpMode: false, + }, + InputMsg: &models.Message{ + Data: []byte(`[]`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: []byte(`[]`), + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("cannot unmarshal array into Go value of type map[string]interface {}"), + }, + { + Scenario: "not_a_map_b", + JQConfig: &JQMapperConfig{ + JQCommand: `.`, + RunTimeout: 5, + SpMode: false, + }, + InputMsg: &models.Message{ + Data: []byte(`a`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: []byte(`a`), + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("invalid character 'a' looking for beginning of value"), + }, + { + Scenario: "not_snowplow_event_with_spMode_true", + JQConfig: &JQMapperConfig{ + JQCommand: `.`, + RunTimeout: 5, + SpMode: true, + }, + InputMsg: &models.Message{ + Data: []byte(`a`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: []byte(`a`), + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("Cannot parse tsv event"), + }, + { + Scenario: "deadline_exceeded", JQConfig: &JQMapperConfig{ JQCommand: `{foo: .app_id}`, RunTimeout: 0, @@ -232,14 +345,40 @@ func TestJQRunFunction_errors(t *testing.T) { ExpInterState: nil, Error: errors.New("context deadline"), }, + { + Scenario: "no_output", + JQConfig: &JQMapperConfig{ + JQCommand: `.foo[].value`, + RunTimeout: 5, + SpMode: false, + }, + InputMsg: &models.Message{ + Data: []byte(`{"foo": []}`), + PartitionKey: "some-key", + }, + InputInterState: nil, + Expected: map[string]*models.Message{ + "success": nil, + "filtered": nil, + "failed": { + Data: []byte(`{"foo": []}`), + PartitionKey: "some-key", + }, + }, + ExpInterState: nil, + Error: errors.New("jq query got no output"), + }, } for _, tt := range testCases { t.Run(tt.Scenario, func(t *testing.T) { assert := assert.New(t) - transFun, err := JQMapperConfigFunction(tt.JQConfig) - assert.Nil(err) + transFun, err := jqMapperConfigFunction(tt.JQConfig) + assert.NotNil(transFun) + if err != nil { + t.Fatalf("failed to create transformation function with error: %q", err.Error()) + } s, f, e, i := transFun(tt.InputMsg, tt.InputInterState) @@ -274,6 +413,63 @@ func TestJQRunFunction_errors(t *testing.T) { } } +func TestJQMapperConfigFunction(t *testing.T) { + testCases := []struct { + Scenario string + JQCommand string + Error error + }{ + { + Scenario: "compile_error", + JQCommand: ` +{ + foo: something_undefined +} +`, + Error: errors.New("error compiling jq query"), + }, + { + Scenario: "parsing_error", + JQCommand: `^`, + Error: errors.New("error parsing jq command"), + }, + } + + for _, tt := range testCases { + t.Run(tt.Scenario, func(t *testing.T) { + assert := assert.New(t) + + jqCfg := &JQMapperConfig{ + JQCommand: tt.JQCommand, + RunTimeout: 5, + SpMode: false, + } + + transFun, err := jqMapperConfigFunction(jqCfg) + + if err == nil && tt.Error != nil { + t.Fatalf("missed expected error") + } + + if err != nil { + assert.Nil(transFun) + + expErr := tt.Error + if expErr == nil { + t.Fatalf("got unexpected error: %s", err.Error()) + } + + if !strings.Contains(err.Error(), expErr.Error()) { + t.Errorf("GOT_ERROR:\n%s\n does not contain\nEXPECTED_ERROR:\n%s", + err.Error(), + expErr.Error()) + } + } + }) + } +} + +// Helper func assertMessagesCompareJQ(t *testing.T, act, exp *models.Message, hint string) { t.Helper() ok := false From 5106cbb8d516aec264431652f9b7997699c66705 Mon Sep 17 00:00:00 2001 From: adatzer Date: Mon, 17 Jun 2024 20:35:33 +0300 Subject: [PATCH 5/6] Register the new transformation --- .../transformations/builtin/jq-full-example.hcl | 16 ++++++++++++++++ .../builtin/jq-minimal-example.hcl | 5 +++++ .../transformconfig/transform_config.go | 1 + 3 files changed, 22 insertions(+) create mode 100644 assets/docs/configuration/transformations/builtin/jq-full-example.hcl create mode 100644 assets/docs/configuration/transformations/builtin/jq-minimal-example.hcl diff --git a/assets/docs/configuration/transformations/builtin/jq-full-example.hcl b/assets/docs/configuration/transformations/builtin/jq-full-example.hcl new file mode 100644 index 00000000..204f20ed --- /dev/null +++ b/assets/docs/configuration/transformations/builtin/jq-full-example.hcl @@ -0,0 +1,16 @@ +transform { + use "jq" { + jq_command = < Date: Mon, 17 Jun 2024 20:50:13 +0300 Subject: [PATCH 6/6] make format --- pkg/transform/mapper_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/transform/mapper_test.go b/pkg/transform/mapper_test.go index 770779a1..09147759 100644 --- a/pkg/transform/mapper_test.go +++ b/pkg/transform/mapper_test.go @@ -415,9 +415,9 @@ func TestJQRunFunction_errors(t *testing.T) { func TestJQMapperConfigFunction(t *testing.T) { testCases := []struct { - Scenario string - JQCommand string - Error error + Scenario string + JQCommand string + Error error }{ { Scenario: "compile_error", @@ -426,12 +426,12 @@ func TestJQMapperConfigFunction(t *testing.T) { foo: something_undefined } `, - Error: errors.New("error compiling jq query"), + Error: errors.New("error compiling jq query"), }, { - Scenario: "parsing_error", + Scenario: "parsing_error", JQCommand: `^`, - Error: errors.New("error parsing jq command"), + Error: errors.New("error parsing jq command"), }, } @@ -440,9 +440,9 @@ func TestJQMapperConfigFunction(t *testing.T) { assert := assert.New(t) jqCfg := &JQMapperConfig{ - JQCommand: tt.JQCommand, + JQCommand: tt.JQCommand, RunTimeout: 5, - SpMode: false, + SpMode: false, } transFun, err := jqMapperConfigFunction(jqCfg)