Skip to content

Commit

Permalink
Add full HTTP templater implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jun 27, 2024
1 parent 0d40454 commit 4f3eab0
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 55 deletions.
11 changes: 7 additions & 4 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ target {
dynamic_headers = true

# Optional. One of client credentials required when authorizing using OAuth2.
oauth2_client_id = env.CLIENT_ID
oauth2_client_id = env.CLIENT_ID

# Optional. One of client credentials required when authorizing using OAuth2.
oauth2_client_secret = env.CLIENT_SECRET
oauth2_client_secret = env.CLIENT_SECRET

# Optional. Required when using OAuth2. Long-lived token used to generate new short-lived access token when previous one experies.
oauth2_refresh_token = env.REFRESH_TOKEN
oauth2_refresh_token = env.REFRESH_TOKEN

# Optional. Required when using OAuth2. URL to authorization server providing access token. E.g. for Goggle API "https://oauth2.googleapis.com/token"
oauth2_token_url = "https://my.auth.server/token"
oauth2_token_url = "https://my.auth.server/token"

# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"
}
}
1 change: 1 addition & 0 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
CaFile: "myRootCA.crt",
SkipVerifyTLS: true,
DynamicHeaders: true,
TemplateFile: "myTemplate.file",
},
},
{
Expand Down
4 changes: 4 additions & 0 deletions integration/http/template
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"attributes": [{{range $i, $data := .}}{{if $i}},{{end}}{{.attribute_data}}{{end}}],
"events": [{{range $i, $data := .}}{{if $i}},{{end}}{{prettyPrint .event_data}}{{end}}]
}
104 changes: 62 additions & 42 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"text/template"
"time"

Expand Down Expand Up @@ -54,6 +55,8 @@ type HTTPTargetConfig struct {
RequestMaxMessages int `hcl:"request_max_messages,optional"`
RequestByteLimit int `hcl:"request_byte_limit,optional"` // note: breaking change here
MessageByteLimit int `hcl:"message_byte_limit,optional"`

TemplateFile string `hcl:"template_file,optional"`
}

// HTTPTarget holds a new client for writing messages to HTTP endpoints
Expand All @@ -70,6 +73,8 @@ type HTTPTarget struct {
requestMaxMessages int
requestByteLimit int
messageByteLimit int

requestTemplate *template.Template
}

func checkURL(str string) error {
Expand Down Expand Up @@ -127,7 +132,8 @@ func newHTTPTarget(
oAuth2ClientID string,
oAuth2ClientSecret string,
oAuth2RefreshToken string,
oAuth2TokenURL string) (*HTTPTarget, error) {
oAuth2TokenURL string,
templateFile string) (*HTTPTarget, error) {
err := checkURL(httpURL)
if err != nil {
return nil, err
Expand All @@ -149,6 +155,12 @@ func newHTTPTarget(
client := createHTTPClient(oAuth2ClientID, oAuth2ClientSecret, oAuth2TokenURL, oAuth2RefreshToken, transport)
client.Timeout = time.Duration(requestTimeout) * time.Second

requestTemplate, err := loadRequestTemplate(templateFile)

if err != nil {
return nil, err
}

return &HTTPTarget{
client: client,
httpURL: httpURL,
Expand All @@ -162,9 +174,41 @@ func newHTTPTarget(
requestMaxMessages: requestMaxMessages,
requestByteLimit: requestByteLimit,
messageByteLimit: messageByteLimit,

requestTemplate: requestTemplate,
}, nil
}

func loadRequestTemplate(templateFile string) (*template.Template, error) {
if templateFile != "" {
content, err := os.ReadFile(templateFile)

if err != nil {
return nil, err
}
return parseRequestTemplate(string(content))
}
return nil, nil
}

func parseRequestTemplate(templateContent string) (*template.Template, error) {
customTemplateFunctions := template.FuncMap{
// If you use this in your template on struct-like fields, you get rendered nice JSON `{"field":"value"}` instead of stringified map `map[field:value]`
// TODO: This works for now but we should check if there is more efficient solution.
"prettyPrint": func(v interface{}) string {
a, _ := json.Marshal(v)
return string(a)
},
}

parsedTemplate, err := template.New("HTTP").Funcs(customTemplateFunctions).Parse(templateContent)
if err != nil {
return nil, err
}

return parsedTemplate, nil
}

func createHTTPClient(oAuth2ClientID string, oAuth2ClientSecret string, oAuth2TokenURL string, oAuth2RefreshToken string, transport *http.Transport) *http.Client {
if oAuth2ClientID != "" {
oauth2Config := oauth2.Config{
Expand Down Expand Up @@ -205,6 +249,7 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) {
c.OAuth2ClientSecret,
c.OAuth2RefreshToken,
c.OAuth2TokenURL,
c.TemplateFile,
)
}

Expand Down Expand Up @@ -268,13 +313,12 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
var badMsgs []*models.Message
var err error

// just for now to spike
templaterConfigured := false
if templaterConfigured {
reqBody, goodMsgs, badMsgs, err = ht.requestTemplater(templ, group)
if ht.requestTemplate != nil {
reqBody, goodMsgs, badMsgs, err = ht.renderBatchUsingTemplate(group)
} else {
reqBody, goodMsgs, badMsgs, err = ht.provideRequestBody(group)
}

failed = append(failed, badMsgs...)
if err != nil {
errResult = multierror.Append(errResult, errors.New("Error constructing request"))
Expand Down Expand Up @@ -361,45 +405,32 @@ func (ht *HTTPTarget) retrieveHeaders(msg *models.Message) map[string]string {
return msg.HTTPHeaders
}

// requestTemplater creates a request from a batch of messages
func (ht *HTTPTarget) requestTemplater(tmpl string, messages []*models.Message) (templated []byte, success []*models.Message, failed []*models.Message, err error) {
// renderBatchUsingTemplate creates a request from a batch of messages based on configured template
func (ht *HTTPTarget) renderBatchUsingTemplate(messages []*models.Message) (templated []byte, success []*models.Message, failed []*models.Message, err error) {
invalid := make([]*models.Message, 0)
safe := make([]*models.Message, 0)
validMsgs := make([]*models.Message, 0)
validJsons := []map[string]interface{}{}

formatted := []map[string]interface{}{}
for _, msg := range messages {
// Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here)
var asMap map[string]interface{}
var asJSON map[string]interface{}

if err := json.Unmarshal(msg.Data, &asMap); err != nil {
msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup!
if err := json.Unmarshal(msg.Data, &asJSON); err != nil {
msg.SetError(errors.Wrap(err, "Message can't be parsed as valid JSON"))
invalid = append(invalid, msg)
continue
}

formatted = append(formatted, asMap)
validMsgs = append(validMsgs, msg)
validJsons = append(validJsons, asJSON)
}
var buf bytes.Buffer

customFunctions := template.FuncMap{
// If you use this in your template on struct-like fields, you get rendered nice JSON `{"field":"value"}` instead of stringified map `map[field:value]`
// TODO: This works for now but we should check if there is more efficient solution.
"asJson": func(v interface{}) string {
a, _ := json.Marshal(v)
return string(a)
},
}

//TODO parse when creating target
t := template.Must(template.New("example").Funcs(customFunctions).Parse(tmpl))
if err := t.Execute(&buf, formatted); err != nil {

invalid = append(invalid, safe...)

var buf bytes.Buffer
if err := ht.requestTemplate.Execute(&buf, validJsons); err != nil {
invalid = append(invalid, validMsgs...)
return nil, nil, invalid, err
}

return buf.Bytes(), safe, nil, nil
return buf.Bytes(), validMsgs, invalid, nil
}

// Where no transformation function provides a request body, we must provide one - this necessarily must happen last.
Expand All @@ -426,17 +457,6 @@ func (ht *HTTPTarget) provideRequestBody(messages []*models.Message) (templated
return requestBody, messages, nil, nil
}

const templ = `{
attributes: [ {{$first_1 := true}}
{{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}}
{{printf "%s" .attribute_data}}{{end}}
],
events: [ {{$first_2 := true}}
{{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}}
{{printf "%s" .event_data}}{{end}}
]
}`

// groupByDynamicHeaders batches data by header if the dynamic header feature is turned on.
func (ht *HTTPTarget) groupByDynamicHeaders(messages []*models.Message) [][]*models.Message {
if !ht.dynamicHeaders {
Expand Down
52 changes: 44 additions & 8 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,20 +305,20 @@ func TestAddHeadersToRequest_WithDynamicHeaders(t *testing.T) {
func TestNewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")

assert.Nil(err)
assert.NotNil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")

assert.NotNil(err1)
if err1 != nil {
assert.Equal("Invalid url for HTTP target: 'something'", err1.Error())
}
assert.Nil(failedHTTPTarget)

failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")
assert.NotNil(err2)
if err2 != nil {
assert.Equal("Invalid url for HTTP target: ''", err2.Error())
Expand All @@ -345,7 +345,7 @@ func TestHttpWrite_Simple(t *testing.T) {
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestHttpWrite_Concurrent(t *testing.T) {
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestHttpWrite_Failure(t *testing.T) {
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestHttpWrite_InvalidResponseCode(t *testing.T) {
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestHttpWrite_Oversized(t *testing.T) {
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -528,6 +528,39 @@ func TestHttpWrite_Oversized(t *testing.T) {
assert.Equal(int64(10), ackOps)
}

func TestHTTPWrite_EnabledTemplating(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`))
if err != nil {
t.Fatal(err)
}

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}

input := `{ "event_data": { "nested": "value1"}, "attribute_data": 1}`
messages := testutil.GetTestMessages(3, input, ackFunc)
wg.Add(1) //only one HTTP request is expected containing all data
writeResult, err1 := target.Write(messages)
wg.Wait()

assert.Nil(err1)
assert.Equal(3, len(writeResult.Sent))
assert.Equal(1, len(results))

expectedOutput := "{\n \"attributes\": [1,1,1],\n \"events\": [{\"nested\":\"value1\"},{\"nested\":\"value1\"},{\"nested\":\"value1\"}]\n}\n"
assert.Equal(expectedOutput, string(results[0]))
assert.Equal(int64(3), ackOps)
}

// Steps to create certs manually:

// openssl genrsa -out rootCA.key 4096
Expand Down Expand Up @@ -561,6 +594,7 @@ func TestHttpWrite_TLS(t *testing.T) {
"",
"",
"",
"",
"")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -600,6 +634,7 @@ func TestHttpWrite_TLS(t *testing.T) {
"",
"",
"",
"",
"")
if err2 != nil {
t.Fatal(err2)
Expand Down Expand Up @@ -632,6 +667,7 @@ func TestHttpWrite_TLS(t *testing.T) {
"",
"",
"",
"",
"")
if err4 != nil {
t.Fatal(err4)
Expand Down
2 changes: 1 addition & 1 deletion pkg/target/oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input
}

func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget {
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL)
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "")
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 4f3eab0

Please sign in to comment.