Skip to content

Commit

Permalink
Add jq generic mapper transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
adatzer committed Jun 20, 2024
1 parent 31a4b09 commit e594770
Show file tree
Hide file tree
Showing 8 changed files with 704 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
transform {
use "jq" {
jq_command = <<JQEOT
{
my_app_id: .app_id,
my_nested_prop: {
playback_rate: .contexts_com_snowplowanalytics_snowplow_media_player_1[0].playbackRate
}
}
JQEOT

timeout_sec = 5
snowplow_mode = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
transform {
use "jq" {
jq_command = "[.]"
}
}
11 changes: 11 additions & 0 deletions assets/test/transformconfig/TestGetTransformations/configs/jq.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
transform {
use "jq" {
jq_command = <<JQEOT
{
my_app_id: .app_id,
}
JQEOT

snowplow_mode = true
}
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0
golang.org/x/sys v0.19.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/api v0.172.0 // indirect
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda
Expand All @@ -50,6 +50,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/dop251/goja v0.0.0-20240220182346-e401ed450204
github.com/hashicorp/hcl/v2 v2.20.1
github.com/itchyny/gojq v0.12.16
github.com/json-iterator/go v1.1.12
github.com/snowplow/snowplow-golang-tracker/v2 v2.4.1
github.com/yuin/gluamapper v0.0.0-20150323120927-d836955830e7
Expand Down Expand Up @@ -91,6 +92,7 @@ require (
github.com/hashicorp/go-memdb v1.3.4 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/
github.com/iris-contrib/jade v1.1.3/go.mod h1:H/geBymxJhShH5kecoiOCSssPX7QWYH7UaeZTSWddIk=
github.com/iris-contrib/pongo2 v0.0.1/go.mod h1:Ssh+00+3GAZqSQb30AvBRNxBx7rf0GqwkjqxNd0u65g=
github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw=
github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g=
github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM=
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg=
github.com/jarcoal/httpmock v1.0.4 h1:jp+dy/+nonJE4g4xbVtl9QdrUNbn6/3hDT5R4nDIZnA=
github.com/jarcoal/httpmock v1.0.4/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
Expand Down Expand Up @@ -483,8 +487,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
156 changes: 156 additions & 0 deletions pkg/transform/mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package transform

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/itchyny/gojq"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
)

// JQMapperConfig represents the configuration for the JQ transformation
type JQMapperConfig struct {
JQCommand string `hcl:"jq_command"`
RunTimeoutMs int `hcl:"timeout_ms,optional"`
SpMode bool `hcl:"snowplow_mode,optional"`
}

// JQMapper handles jq generic mapping as a transformation
type jqMapper struct {
JQCode *gojq.Code
RunTimeoutMs time.Duration
SpMode bool
}

// RunFunction runs a jq mapper transformation
func (jqm *jqMapper) RunFunction() TransformationFunction {
return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
input, err := mkJQInput(jqm, message, interState)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}

ctx, cancel := context.WithTimeout(context.Background(), jqm.RunTimeoutMs)
defer cancel()

iter := jqm.JQCode.RunWithContext(ctx, input)
// no looping since we only keep first value
v, ok := iter.Next()
if !ok {
message.SetError(fmt.Errorf("jq query got no output"))
return nil, nil, message, nil
}

if err, ok := v.(error); ok {
message.SetError(err)
return nil, nil, message, nil
}

// here v is any, so we Marshal. alternative: gojq.Marshal
data, err := json.Marshal(v)
if err != nil {
message.SetError(fmt.Errorf("error encoding jq query output data"))
return nil, nil, message, nil
}

message.Data = data
return message, nil, nil, nil
}
}

// jqMapperAdapter implements the Pluggable interface
type jqMapperAdapter func(i interface{}) (interface{}, error)

// ProvideDefault implements the ComponentConfigurable interface
func (f jqMapperAdapter) ProvideDefault() (interface{}, error) {
return &JQMapperConfig{
RunTimeoutMs: 100,
}, nil
}

// Create implements the ComponentCreator interface
func (f jqMapperAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// jqMapperAdapterGenerator returns a jqAdapter
func jqMapperAdapterGenerator(f func(c *JQMapperConfig) (TransformationFunction, error)) jqMapperAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*JQMapperConfig)
if !ok {
return nil, fmt.Errorf("invalid input, expected JQMapperConfig")
}

return f(cfg)
}
}

// jqMapperConfigFunction returns a jq mapper transformation function from a JQMapperConfig
func jqMapperConfigFunction(c *JQMapperConfig) (TransformationFunction, error) {
query, err := gojq.Parse(c.JQCommand)
if err != nil {
return nil, fmt.Errorf("error parsing jq command: %q", err.Error())
}

code, err := gojq.Compile(query)
if err != nil {
return nil, fmt.Errorf("error compiling jq query: %q", err.Error())
}

jq := &jqMapper{
JQCode: code,
RunTimeoutMs: time.Duration(c.RunTimeoutMs) * time.Millisecond,
SpMode: c.SpMode,
}

return jq.RunFunction(), nil
}

// JQMapperConfigPair is a configuration pair for the jq mapper transformation
var JQMapperConfigPair = config.ConfigurationPair{
Name: "jq",
Handle: jqMapperAdapterGenerator(jqMapperConfigFunction),
}

// mkJQInput ensures the input to JQ query is of expected type
func mkJQInput(jqm *jqMapper, message *models.Message, interState interface{}) (map[string]interface{}, error) {
if !jqm.SpMode {
// gojq input can only be map[string]any or []any
// here we only consider the first, but we could also expand
var input map[string]interface{}
err := json.Unmarshal(message.Data, &input)
if err != nil {
return nil, err
}

return input, nil
}

parsedEvent, err := IntermediateAsSpEnrichedParsed(interState, message)
if err != nil {
return nil, err
}

spInput, err := parsedEvent.ToMap()
if err != nil {
return nil, err
}

return spInput, nil
}
Loading

0 comments on commit e594770

Please sign in to comment.