Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle all 2XX HTTP response status codes as success #324

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
continue
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should 3xx be treated as successful as well? I know in the Trackers a broad range of codes are seen as successful to ensure events do not get re-tried.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, 3xx should not be treated as a success.

The go http client by default follows redirects, up to 10 times per request. So if we get a 3xx here then it means the server has issued 10 redirects in a row. That would be highly unexpected, it probably means the server is misconfigured, and it means our event has not landed safely in the destination.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair! Feels like we might need a follow-on ticket to think through how to handle other status code groups though as currently all 3xx, 4xx and 5xx will end up being retried forever.

It might not be worth retrying certain codes forever and certain codes might not be worth retrying at all (like if we hit 3xx requests with 10+ redirects - is it worth retrying that event to have that very expensive flow followed again and again?)...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbeemster yeah agree - it's beyond the scope of this ticket (which is just a quick fix for a customer issue), but the intention is to make this configurable, so that behaviour can be tailored to what's best suited to a given target.

I'm hoping to flesh this out in a design doc alongside other supportability work. :)

sent = append(sent, msg)
if msg.AckFunc != nil { // Ack successful messages
msg.AckFunc()
Expand Down
111 changes: 86 additions & 25 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/snowplow/snowbridge/pkg/testutil"
)

func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Server {
func createTestServerWithResponseCode(results *[][]byte, waitgroup *sync.WaitGroup, responseCode int) *httptest.Server {
mutex := &sync.Mutex{}
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
Expand All @@ -39,11 +39,16 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se
}
mutex.Lock()
*results = append(*results, data)
w.WriteHeader(responseCode)
mutex.Unlock()
defer waitgroup.Done()
}))
}

func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Server {
return createTestServerWithResponseCode(results, waitgroup, 200)
}

func TestGetHeaders(t *testing.T) {
assert := assert.New(t)
valid1 := `{"Max Forwards": "10", "Accept-Language": "en-US", "Accept-Datetime": "Thu, 31 May 2007 20:35:00 GMT"}`
Expand Down Expand Up @@ -322,37 +327,50 @@ func TestNewHTTPTarget(t *testing.T) {
}

func TestHttpWrite_Simple(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
Name string
ResponseCode int
}{
{Name: "200 response Code", ResponseCode: 200},
{Name: "201 response Code", ResponseCode: 201},
{Name: "226 response Code", ResponseCode: 226},
}

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
defer server.Close()
for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
assert := assert.New(t)

target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false)
if err != nil {
t.Fatal(err)
}
var results [][]byte
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}
target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false)
if err != nil {
t.Fatal(err)
}

messages := testutil.GetTestMessages(501, "Hello Server!!", ackFunc)
wg.Add(501)
writeResult, err1 := target.Write(messages)
var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}

wg.Wait()
messages := testutil.GetTestMessages(501, "Hello Server!!", ackFunc)
wg.Add(501)
writeResult, err1 := target.Write(messages)

assert.Nil(err1)
assert.Equal(501, len(writeResult.Sent))
assert.Equal(501, len(results))
for _, result := range results {
assert.Equal("Hello Server!!", string(result))
}
wg.Wait()

assert.Nil(err1)
assert.Equal(501, len(writeResult.Sent))
assert.Equal(501, len(results))
for _, result := range results {
assert.Equal("Hello Server!!", string(result))
}

assert.Equal(int64(501), ackOps)
assert.Equal(int64(501), ackOps)
})
}
}

func TestHttpWrite_Concurrent(t *testing.T) {
Expand Down Expand Up @@ -430,6 +448,49 @@ func TestHttpWrite_Failure(t *testing.T) {
assert.Nil(writeResult.Oversized)
}

func TestHttpWrite_InvalidResponseCode(t *testing.T) {
testCases := []struct {
Name string
ResponseCode int
}{
{Name: "300 response Code", ResponseCode: 300},
{Name: "400 response Code", ResponseCode: 400},
{Name: "503 response Code", ResponseCode: 503},
}
for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServerWithResponseCode(&results, &wg, tt.ResponseCode)
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1048576, "application/json", "", "", "", "", "", "", true, false)
if err != nil {
t.Fatal(err)
}

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}

messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc)
wg.Add(10)
writeResult, err1 := target.Write(messages)

assert.NotNil(err1)
if err1 != nil {
assert.Regexp("Error sending http requests: 10 errors occurred:.*", err1.Error())
}

assert.Equal(10, len(writeResult.Failed))
assert.Nil(writeResult.Sent)
assert.Nil(writeResult.Oversized)
})
}
}

func TestHttpWrite_Oversized(t *testing.T) {
assert := assert.New(t)

Expand Down
Loading