Skip to content

Commit

Permalink
Add config for chunking
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Jun 20, 2024
1 parent 2775f8e commit 9971ec5
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 29 deletions.
8 changes: 6 additions & 2 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func TestCreateTargetComponentHCL(t *testing.T) {
Plug: testHTTPTargetAdapter(testHTTPTargetFunc),
Expected: &target.HTTPTargetConfig{
HTTPURL: "testUrl",
ByteLimit: 1048576,
RequestMaxMessages: 1,
RequestByteLimit: 1048576,
MessageByteLimit: 1048576,
RequestTimeoutInSeconds: 5,
ContentType: "application/json",
Headers: "",
Expand All @@ -92,7 +94,9 @@ func TestCreateTargetComponentHCL(t *testing.T) {
Plug: testHTTPTargetAdapter(testHTTPTargetFunc),
Expected: &target.HTTPTargetConfig{
HTTPURL: "testUrl",
ByteLimit: 1000000,
RequestMaxMessages: 10,
RequestByteLimit: 1000000,
MessageByteLimit: 1000000,
RequestTimeoutInSeconds: 2,
ContentType: "test/test",
Headers: "{\"Accept-Language\":\"en-US\"}",
Expand Down
66 changes: 49 additions & 17 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"context"
"encoding/json"
"fmt"
"text/template"
"net/http"
"net/url"
"text/template"
"time"

"github.com/hashicorp/go-multierror"
Expand All @@ -33,8 +33,8 @@ import (

// HTTPTargetConfig configures the destination for records consumed
type HTTPTargetConfig struct {
HTTPURL string `hcl:"url" env:"TARGET_HTTP_URL"`
ByteLimit int `hcl:"byte_limit,optional" env:"TARGET_HTTP_BYTE_LIMIT"`
HTTPURL string `hcl:"url" env:"TARGET_HTTP_URL"`

RequestTimeoutInSeconds int `hcl:"request_timeout_in_seconds,optional" env:"TARGET_HTTP_TIMEOUT_IN_SECONDS"`
ContentType string `hcl:"content_type,optional" env:"TARGET_HTTP_CONTENT_TYPE"`
Headers string `hcl:"headers,optional" env:"TARGET_HTTP_HEADERS" `
Expand All @@ -50,19 +50,26 @@ type HTTPTargetConfig struct {
OAuth2ClientSecret string `hcl:"oauth2_client_secret,optional" env:"TARGET_HTTP_OAUTH2_CLIENT_SECRET"`
OAuth2RefreshToken string `hcl:"oauth2_refresh_token,optional" env:"TARGET_HTTP_OAUTH2_REFRESH_TOKEN"`
OAuth2TokenURL string `hcl:"oauth2_token_url,optional" env:"TARGET_HTTP_OAUTH2_TOKEN_URL"`

RequestMaxMessages int `hcl:"request_max_messages,optional"`
RequestByteLimit int `hcl:"request_byte_limit,optional"` // note: breaking change here
MessageByteLimit int `hcl:"message_byte_limit,optional"`
}

// HTTPTarget holds a new client for writing messages to HTTP endpoints
type HTTPTarget struct {
client *http.Client
httpURL string
byteLimit int
contentType string
headers map[string]string
basicAuthUsername string
basicAuthPassword string
log *log.Entry
dynamicHeaders bool

requestMaxMessages int
requestByteLimit int
messageByteLimit int
}

func checkURL(str string) error {
Expand Down Expand Up @@ -102,8 +109,25 @@ func addHeadersToRequest(request *http.Request, headers map[string]string, dynam
}

// newHTTPTarget creates a client for writing events to HTTP
func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentType string, headers string, basicAuthUsername string, basicAuthPassword string,
certFile string, keyFile string, caFile string, skipVerifyTLS bool, dynamicHeaders bool, oAuth2ClientID string, oAuth2ClientSecret string, oAuth2RefreshToken string, oAuth2TokenURL string) (*HTTPTarget, error) {
func newHTTPTarget(
httpURL string,
requestTimeout int,
requestMaxMessages int,
requestByteLimit int,
messageByteLimit int,
contentType string,
headers string,
basicAuthUsername string,
basicAuthPassword string,
certFile string,
keyFile string,
caFile string,
skipVerifyTLS bool,
dynamicHeaders bool,
oAuth2ClientID string,
oAuth2ClientSecret string,
oAuth2RefreshToken string,
oAuth2TokenURL string) (*HTTPTarget, error) {
err := checkURL(httpURL)
if err != nil {
return nil, err
Expand All @@ -128,13 +152,16 @@ func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp
return &HTTPTarget{
client: client,
httpURL: httpURL,
byteLimit: byteLimit,
contentType: contentType,
headers: parsedHeaders,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
log: log.WithFields(log.Fields{"target": "http", "url": httpURL}),
dynamicHeaders: dynamicHeaders,

requestMaxMessages: requestMaxMessages,
requestByteLimit: requestByteLimit,
messageByteLimit: messageByteLimit,
}, nil
}

Expand Down Expand Up @@ -162,7 +189,9 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) {
return newHTTPTarget(
c.HTTPURL,
c.RequestTimeoutInSeconds,
c.ByteLimit,
c.RequestMaxMessages,
c.RequestByteLimit,
c.MessageByteLimit,
c.ContentType,
c.Headers,
c.BasicAuthUsername,
Expand Down Expand Up @@ -193,7 +222,10 @@ func (f HTTPTargetAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults for the optional parameters
// whose default is not their zero value.
cfg := &HTTPTargetConfig{
ByteLimit: 1048576,
RequestMaxMessages: 20,
RequestByteLimit: 1048576,
MessageByteLimit: 1048576,

RequestTimeoutInSeconds: 5,
ContentType: "application/json",
}
Expand All @@ -218,10 +250,10 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu

chunks, oversized := models.GetChunkedMessages(
messages,
1, // kt.requestMaxMessages,
1, // kt.MaximumAllowedMessageSizeBytes(),
1, // kinesisPutRecordsRequestByteLimit,
) // TOOD: Just bodged in for now - implement this bit
ht.requestMaxMessages,
ht.messageByteLimit,
ht.requestByteLimit,
)

sent := []*models.Message{}
failed := []*models.Message{}
Expand Down Expand Up @@ -310,7 +342,7 @@ func (ht *HTTPTarget) Close() {}
// MaximumAllowedMessageSizeBytes returns the max number of bytes that can be sent
// per message for this target
func (ht *HTTPTarget) MaximumAllowedMessageSizeBytes() int {
return ht.byteLimit
return ht.messageByteLimit
}

// GetID returns an identifier for this target
Expand Down Expand Up @@ -347,15 +379,15 @@ func (ht *HTTPTarget) requestTemplater(tmpl string, messages []*models.Message)
var buf bytes.Buffer

customFunctions := template.FuncMap{
// If you use this in your template on struct-like fields, you get rendered nice JSON `{"field":"value"}` instead of stringified map `map[field:value]`
// TODO: This works for now but we should check if there is more efficient solution.
// If you use this in your template on struct-like fields, you get rendered nice JSON `{"field":"value"}` instead of stringified map `map[field:value]`
// TODO: This works for now but we should check if there is more efficient solution.
"asJson": func(v interface{}) string {
a, _ := json.Marshal(v)
return string(a)
},
}

//TODO parse when creating target
//TODO parse when creating target
t := template.Must(template.New("example").Funcs(customFunctions).Parse(tmpl))
if err := t.Execute(&buf, formatted); err != nil {

Expand Down
24 changes: 15 additions & 9 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestRetrieveHeaders(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
testTargetConfig := &HTTPTargetConfig{
HTTPURL: "http://test",
ByteLimit: 1048576,
RequestByteLimit: 1048576,
RequestTimeoutInSeconds: 5,
ContentType: "application/json",
DynamicHeaders: tt.Dynamic,
Expand Down Expand Up @@ -305,20 +305,20 @@ func TestAddHeadersToRequest_WithDynamicHeaders(t *testing.T) {
func TestNewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")

assert.Nil(err)
assert.NotNil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")

assert.NotNil(err1)
if err1 != nil {
assert.Equal("Invalid url for HTTP target: 'something'", err1.Error())
}
assert.Nil(failedHTTPTarget)

failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
assert.NotNil(err2)
if err2 != nil {
assert.Equal("Invalid url for HTTP target: ''", err2.Error())
Expand All @@ -345,7 +345,7 @@ func TestHttpWrite_Simple(t *testing.T) {
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestHttpWrite_Concurrent(t *testing.T) {
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestHttpWrite_Failure(t *testing.T) {
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestHttpWrite_InvalidResponseCode(t *testing.T) {
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestHttpWrite_Oversized(t *testing.T) {
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -546,6 +546,8 @@ func TestHttpWrite_TLS(t *testing.T) {
// Test that https requests work with manually provided certs
target, err := newHTTPTarget("https://localhost:8999/hello",
5,
1,
1048576,
1048576,
"application/json",
"",
Expand Down Expand Up @@ -583,6 +585,8 @@ func TestHttpWrite_TLS(t *testing.T) {
// Test that https requests work for different endpoints when different certs are provided manually
target2, err2 := newHTTPTarget(ngrokAddress,
5,
1,
1048576,
1048576,
"application/json",
"",
Expand Down Expand Up @@ -613,6 +617,8 @@ func TestHttpWrite_TLS(t *testing.T) {
// Test that https requests work for different endpoints when different certs are provided manually
target3, err4 := newHTTPTarget(ngrokAddress,
5,
1,
1048576,
1048576,
"application/json",
"",
Expand Down
2 changes: 1 addition & 1 deletion pkg/target/oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input
}

func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget {
target, err := newHTTPTarget(targetURL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL)
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 9971ec5

Please sign in to comment.