Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add http batch tests #342

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 129 additions & 30 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"bytes"
"encoding/json"
"io"
"math"
"net/http"
"net/http/httptest"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
Expand All @@ -29,7 +31,7 @@ import (
"github.com/snowplow/snowbridge/pkg/testutil"
)

func createTestServerWithResponseCode(results *[][]byte, waitgroup *sync.WaitGroup, responseCode int) *httptest.Server {
func createTestServerWithResponseCode(results *[][]byte, responseCode int) *httptest.Server {
mutex := &sync.Mutex{}
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
Expand All @@ -41,12 +43,11 @@ func createTestServerWithResponseCode(results *[][]byte, waitgroup *sync.WaitGro
*results = append(*results, data)
w.WriteHeader(responseCode)
mutex.Unlock()
defer waitgroup.Done()
}))
}

func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Server {
return createTestServerWithResponseCode(results, waitgroup, 200)
func createTestServer(results *[][]byte) *httptest.Server {
return createTestServerWithResponseCode(results, 200)
}

func TestGetHeaders(t *testing.T) {
Expand Down Expand Up @@ -330,6 +331,7 @@ func TestHttpWrite_Simple(t *testing.T) {
testCases := []struct {
Name string
ResponseCode int
BatchSize int
}{
{Name: "200 response Code", ResponseCode: 200},
{Name: "201 response Code", ResponseCode: 201},
Expand All @@ -342,7 +344,7 @@ func TestHttpWrite_Simple(t *testing.T) {

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
server := createTestServerWithResponseCode(&results, tt.ResponseCode)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
Expand All @@ -353,22 +355,94 @@ func TestHttpWrite_Simple(t *testing.T) {
var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)

wg.Done()
}

messages := testutil.GetTestMessages(501, "Hello Server!!", ackFunc)
wg.Add(501)
messages := testutil.GetTestMessages(200, "Hello Server!!", ackFunc)
wg.Add(200)
writeResult, err1 := target.Write(messages)

wg.Wait()
if ok := WaitForAcksWithTimeout(2*time.Second, &wg); !ok {
assert.Fail("Timed out waiting for acks")
}

assert.Nil(err1)
assert.Equal(501, len(writeResult.Sent))
assert.Equal(501, len(results))
assert.Equal(200, len(writeResult.Sent))
assert.Equal(200, len(results))
for _, result := range results {
assert.Equal("[\"Hello Server!!\"]", string(result))
}

assert.Equal(int64(501), ackOps)
assert.Equal(int64(200), ackOps)
})
}
}

func TestHttpWrite_Batched(t *testing.T) {
testCases := []struct {
Name string
BatchSize int
LastBatchExpected int
}{
{Name: "Batches of 20", BatchSize: 20, LastBatchExpected: 20},
{Name: "Batches of 15", BatchSize: 15, LastBatchExpected: 10},
}

for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, 200)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
if err != nil {
t.Fatal(err)
}

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)

wg.Done()
}

messages := testutil.GetTestMessages(100, "Hello Server!!", ackFunc)
wg.Add(100)
writeResult, err1 := target.Write(messages)

if ok := WaitForAcksWithTimeout(2*time.Second, &wg); !ok {
assert.Fail("Timed out waiting for acks")
}

assert.Nil(err1)
assert.Equal(100, len(writeResult.Sent))
assert.Equal(math.Ceil(100/float64(tt.BatchSize)), float64(len(results)))
for i, result := range results {

var res []string
err := json.Unmarshal(result, &res)
if err != nil {
assert.Fail("Request not an array as expected - got error unmarshalling: " + err.Error())
}
// Check the amount f events in the batch is as expected
if i == len(results)-1 {
// Check the last batch size
assert.Equal(tt.LastBatchExpected, len(res))
} else {
// Check the others
assert.Equal(tt.BatchSize, len(res))
}
// Iterate and check the data is what we expect
for _, r := range res {
assert.Equal("Hello Server!!", string(r))
}
}

assert.Equal(int64(100), ackOps)
})
}
}
Expand All @@ -378,7 +452,7 @@ func TestHttpWrite_Concurrent(t *testing.T) {

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
Expand All @@ -398,15 +472,17 @@ func TestHttpWrite_Concurrent(t *testing.T) {
messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc)

for _, message := range messages {
wg.Add(2) // Both acking and returning results from server can have race conditions, so we add both to the waitgroup.
wg.Add(1)
go func(msg *models.Message) {
writeResult, err1 := target.Write([]*models.Message{msg})
assert.Nil(err1)
assert.Equal(1, len(writeResult.Sent))
}(message)
}

wg.Wait()
if ok := WaitForAcksWithTimeout(2*time.Second, &wg); !ok {
assert.Fail("Timed out waiting for acks")
}

assert.Equal(10, len(results))
for _, result := range results {
Expand All @@ -420,8 +496,7 @@ func TestHttpWrite_Failure(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
Expand Down Expand Up @@ -462,8 +537,7 @@ func TestHttpWrite_InvalidResponseCode(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
server := createTestServerWithResponseCode(&results, tt.ResponseCode)
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
if err != nil {
Expand All @@ -476,7 +550,6 @@ func TestHttpWrite_InvalidResponseCode(t *testing.T) {
}

messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc)
wg.Add(10)
writeResult, err1 := target.Write(messages)

assert.NotNil(err1)
Expand All @@ -496,7 +569,7 @@ func TestHttpWrite_Oversized(t *testing.T) {

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "")
Expand All @@ -507,6 +580,8 @@ func TestHttpWrite_Oversized(t *testing.T) {
var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)

wg.Done()
}

messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc)
Expand All @@ -515,7 +590,9 @@ func TestHttpWrite_Oversized(t *testing.T) {
wg.Add(10)
writeResult, err1 := target.Write(messages)

wg.Wait()
if ok := WaitForAcksWithTimeout(2*time.Second, &wg); !ok {
assert.Fail("Timed out waiting for acks")
}

assert.Nil(err1)
assert.Equal(10, len(writeResult.Sent))
Expand Down Expand Up @@ -764,9 +841,7 @@ func TestHTTPWrite_GroupedRequests(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
wg.Add(3)
server := createTestServer(&results, &wg)
server := createTestServer(&results)
defer server.Close()

//dynamicHeaders enabled
Expand All @@ -775,16 +850,26 @@ func TestHTTPWrite_GroupedRequests(t *testing.T) {
t.Fatal(err)
}

wg := sync.WaitGroup{}

ackFunc := func() {
wg.Done()
}

wg.Add(5)
inputMessages := []*models.Message{
{Data: []byte("value1")}, //group 1
{Data: []byte("value2")}, //group 1
{Data: []byte("value3"), HTTPHeaders: map[string]string{"h1": "v1"}}, //group 2
{Data: []byte("value4"), HTTPHeaders: map[string]string{"h1": "v1"}}, //group 2
{Data: []byte("value5"), HTTPHeaders: map[string]string{"h1": "v2"}}, //group 3
{Data: []byte("value1"), AckFunc: ackFunc}, //group 1
{Data: []byte("value2"), AckFunc: ackFunc}, //group 1
{Data: []byte("value3"), AckFunc: ackFunc, HTTPHeaders: map[string]string{"h1": "v1"}}, //group 2
{Data: []byte("value4"), AckFunc: ackFunc, HTTPHeaders: map[string]string{"h1": "v1"}}, //group 2
{Data: []byte("value5"), AckFunc: ackFunc, HTTPHeaders: map[string]string{"h1": "v2"}}, //group 3
}

writeResult, err1 := target.Write(inputMessages)
wg.Wait()

if ok := WaitForAcksWithTimeout(2*time.Second, &wg); !ok {
assert.Fail("Timed out waiting for acks")
}

assert.Nil(err1)
assert.Equal(5, len(writeResult.Sent))
Expand Down Expand Up @@ -834,3 +919,17 @@ func getNgrokAddress() string {
}
panic("no ngrok https endpoint found")
}

func WaitForAcksWithTimeout(timeout time.Duration, wg *sync.WaitGroup) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return true
case <-time.After(timeout * time.Millisecond):
return false
}
}
Loading