Skip to content

Commit

Permalink
Handle all 2XX HTTP response status codes as success
Browse files Browse the repository at this point in the history
For HTTP target:

Before - only 200 HTTP response code was treated as successful response.
After - all 2XX are treated as successful response.
  • Loading branch information
pondzix committed May 14, 2024
1 parent a09adbe commit 9f307e5
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
continue
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
sent = append(sent, msg)
if msg.AckFunc != nil { // Ack successful messages
msg.AckFunc()
Expand Down
111 changes: 86 additions & 25 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/snowplow/snowbridge/pkg/testutil"
)

func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Server {
func createTestServerWithResponseCode(results *[][]byte, waitgroup *sync.WaitGroup, responseCode int) *httptest.Server {
mutex := &sync.Mutex{}
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
Expand All @@ -39,11 +39,16 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se
}
mutex.Lock()
*results = append(*results, data)
w.WriteHeader(responseCode)
mutex.Unlock()
defer waitgroup.Done()
}))
}

func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Server {
return createTestServerWithResponseCode(results, waitgroup, 200)
}

func TestGetHeaders(t *testing.T) {
assert := assert.New(t)
valid1 := `{"Max Forwards": "10", "Accept-Language": "en-US", "Accept-Datetime": "Thu, 31 May 2007 20:35:00 GMT"}`
Expand Down Expand Up @@ -322,37 +327,50 @@ func TestNewHTTPTarget(t *testing.T) {
}

func TestHttpWrite_Simple(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
Name string
ResponseCode int
}{
{Name: "200 response Code", ResponseCode: 200},
{Name: "201 response Code", ResponseCode: 201},
{Name: "226 response Code", ResponseCode: 226},
}

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
defer server.Close()
for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
assert := assert.New(t)

target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false)
if err != nil {
t.Fatal(err)
}
var results [][]byte
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}
target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false)
if err != nil {
t.Fatal(err)
}

messages := testutil.GetTestMessages(501, "Hello Server!!", ackFunc)
wg.Add(501)
writeResult, err1 := target.Write(messages)
var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}

wg.Wait()
messages := testutil.GetTestMessages(501, "Hello Server!!", ackFunc)
wg.Add(501)
writeResult, err1 := target.Write(messages)

assert.Nil(err1)
assert.Equal(501, len(writeResult.Sent))
assert.Equal(501, len(results))
for _, result := range results {
assert.Equal("Hello Server!!", string(result))
}
wg.Wait()

assert.Nil(err1)
assert.Equal(501, len(writeResult.Sent))
assert.Equal(501, len(results))
for _, result := range results {
assert.Equal("Hello Server!!", string(result))
}

assert.Equal(int64(501), ackOps)
assert.Equal(int64(501), ackOps)
})
}
}

func TestHttpWrite_Concurrent(t *testing.T) {
Expand Down Expand Up @@ -430,6 +448,49 @@ func TestHttpWrite_Failure(t *testing.T) {
assert.Nil(writeResult.Oversized)
}

func TestHttpWrite_InvalidResponseCode(t *testing.T) {
testCases := []struct {
Name string
ResponseCode int
}{
{Name: "300 response Code", ResponseCode: 300},
{Name: "400 response Code", ResponseCode: 400},
{Name: "503 response Code", ResponseCode: 503},
}
for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false)
if err != nil {
t.Fatal(err)
}

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}

messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc)
wg.Add(10)
writeResult, err1 := target.Write(messages)

assert.NotNil(err1)
if err1 != nil {
assert.Regexp("Error sending http requests: 10 errors occurred:.*", err1.Error())
}

assert.Equal(10, len(writeResult.Failed))
assert.Nil(writeResult.Sent)
assert.Nil(writeResult.Oversized)
})
}
}

func TestHttpWrite_Oversized(t *testing.T) {
assert := assert.New(t)

Expand Down

0 comments on commit 9f307e5

Please sign in to comment.