Skip to content

Commit

Permalink
Direct implementation of batching and templating in http target
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Jun 19, 2024
1 parent 31a4b09 commit 4391101
Showing 1 changed file with 164 additions and 48 deletions.
212 changes: 164 additions & 48 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"context"
"encoding/json"
"fmt"
"html/template"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -212,66 +213,181 @@ func AdaptHTTPTargetFunc(f func(c *HTTPTargetConfig) (*HTTPTarget, error)) HTTPT
}
}

// requestTemplater creates a request from a batch of messages
func (ht *HTTPTarget) requestTemplater(tmpl string, messages []*models.Message) (templated []byte, success []*models.Message, failed []*models.Message, err error) {
invalid := make([]*models.Message, 0)
safe := make([]*models.Message, 0)

formatted := []map[string]json.RawMessage{}
for _, msg := range messages {
// Use json.RawMessage to ensure templating format works (real implementation has a problem to figure out here)
var asMap map[string]json.RawMessage

if err := json.Unmarshal(msg.Data, &asMap); err != nil {
msg.SetError(errors.Wrap(err, "templater error")) // TODO: Cleanup!
invalid = append(invalid, msg)
continue
}

formatted = append(formatted, asMap)
}
var buf bytes.Buffer

t := template.Must(template.New("example").Parse(tmpl))
if err := t.Execute(&buf, formatted); err != nil {

invalid = append(invalid, safe...)

return nil, nil, invalid, err
}

return buf.Bytes(), safe, nil, nil
}

// Where no transformation function provides a request body, we must provide one - this necessarily must happen last.
// This is a http specific function so we define it here to avoid scope for misconfiguration
func (ht *HTTPTarget) provideRequestBody(messages []*models.Message) (templated []byte, success []*models.Message, failed []*models.Message, err error) {

// TODO: Add test for when messagess are just strings & confirm that it all works

// TODO: Note: This would mean that the GTM client gets arrays of single events instead of single events.
// But we could configure an explicit templater to change that if we wanted
// We should test to be certain that it's still compatible.

requestData := []string{}
for _, msg := range messages {
requestData = append(requestData, string(msg.Data))
}
// TODO: Add tests to be sure this produces the desired request
requestBody, err := json.Marshal(requestData)
if err != nil {
// TODO: Handle errors here
return nil, nil, messages, err
}

return requestBody, messages, nil, nil
}

const templ = `{
attributes: [ {{$first_1 := true}}
{{range .}}{{if $first_1}}{{$first_1 = false}}{{else}},{{end}}
{{printf "%s" .attribute_data}}{{end}}
],
events: [ {{$first_2 := true}}
{{range .}}{{if $first_2}}{{$first_2 = false}}{{else}},{{end}}
{{printf "%s" .event_data}}{{end}}
]
}`

// groupByDynamicHeaders batches data by header if the dynamic header feature is turned on.
func (ht *HTTPTarget) groupByDynamicHeaders(messages []*models.Message) [][]*models.Message {
if !ht.dynamicHeaders {
// If the feature is disabled just return
return [][]*models.Message{messages}
}

// Make a map of stringified header values
headersFound := make(map[string][]*models.Message)

// Group data by that index
for _, msg := range messages {
headerKey := fmt.Sprint(msg.HTTPHeaders)
if headersFound[headerKey] != nil {
// If a key already exists, just add this message
headersFound[headerKey] = append(headersFound[headerKey], msg)
} else {
headersFound[headerKey] = []*models.Message{msg}
}
}

outBatches := [][]*models.Message{}
for _, batch := range headersFound {
outBatches = append(outBatches, batch)
}

return outBatches
}

func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) {
ht.log.Debugf("Writing %d messages to endpoint ...", len(messages))

safeMessages, oversized := models.FilterOversizedMessages(
chunks, oversized := models.GetChunkedMessages(
messages,
ht.MaximumAllowedMessageSizeBytes(),
)
1, // kt.requestMaxMessages,
1, // kt.MaximumAllowedMessageSizeBytes(),
1, // kinesisPutRecordsRequestByteLimit,
) // Just bodged in for now

var invalid []*models.Message
var failed []*models.Message
var sent []*models.Message
sent := []*models.Message{}
failed := []*models.Message{}
var errResult error

for _, msg := range safeMessages {
request, err := http.NewRequest("POST", ht.httpURL, bytes.NewBuffer(msg.Data))
if err != nil {
errResult = multierror.Append(errResult, errors.Wrap(err, "Error creating request"))
failed = append(failed, msg)
continue
}
request.Header.Add("Content-Type", ht.contentType) // Add content type
addHeadersToRequest(request, ht.headers, ht.retrieveHeaders(msg)) // Add headers if there are any
if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set
request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword)
}
requestStarted := time.Now()
resp, err := ht.client.Do(request) // Make request
requestFinished := time.Now()

msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
for _, chunk := range chunks {
grouped := ht.groupByDynamicHeaders(chunk)

for _, group := range grouped {
var reqBody []byte
var goodMsgs []*models.Message
var badMsgs []*models.Message
var err error

// just for now to spike
templaterConfigured := true
if templaterConfigured {
reqBody, goodMsgs, badMsgs, err = ht.requestTemplater(templ, group)
} else {
reqBody, goodMsgs, badMsgs, err = ht.provideRequestBody(group)
}
failed = append(failed, badMsgs...)
if err != nil {
errResult = multierror.Append(errResult, errors.New("Error constructing request"))
continue
}

if err != nil {
errResult = multierror.Append(errResult, err)
failed = append(failed, msg)
continue
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
sent = append(sent, msg)
if msg.AckFunc != nil { // Ack successful messages
msg.AckFunc()
request, err := http.NewRequest("POST", ht.httpURL, bytes.NewBuffer(reqBody))
if err != nil {
failed = append(failed, goodMsgs...)
errResult = errors.Wrap(errResult, "Error creating request: "+err.Error())
continue
}
request.Header.Add("Content-Type", ht.contentType) // Add content type
addHeadersToRequest(request, ht.headers, ht.retrieveHeaders(goodMsgs[0])) // Add headers if there are any
if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set
request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword)
}
// requestStarted := time.Now()
resp, err := ht.client.Do(request) // Make request
// requestFinished := time.Now()

// TODO: fit in the request times
if err != nil {
failed = append(failed, goodMsgs...)
errResult = errors.Wrap(errResult, "Error sending request: "+err.Error())
continue
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
for _, msg := range goodMsgs {
if msg.AckFunc != nil { // Ack successful messages
msg.AckFunc()
}
sent = append(sent, msg)
}
} else {
errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status))
failed = append(failed, goodMsgs...)
continue
}
} else {
errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status))
failed = append(failed, msg)
continue
}
}
if errResult != nil {
errResult = errors.Wrap(errResult, "Error sending http requests")
}

ht.log.Debugf("Successfully wrote %d/%d messages", len(sent), len(messages))
return models.NewTargetWriteResult(
sent,
failed,
oversized,
invalid,
), errResult
// ht.log.Debugf("Successfully wrote %d/%d messages", len(sent), len(messages))
return &models.TargetWriteResult{
Sent: sent,
Failed: failed,
Oversized: oversized,
Invalid: nil, // TODO: should some of the above be invaid not failed?
}, errResult
}

// Open does nothing for this target
Expand Down

0 comments on commit 4391101

Please sign in to comment.