From 1b50fe8d3e6dc218ed711d07e2ffda5434653e9c Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Fri, 15 Dec 2023 14:39:22 +0100 Subject: [PATCH] feat: obey http status and backoff on 429, 50x (#156) --- metrics.go | 36 +++++++++++++++++- metrics_test.go | 91 +++++++++++++++++++++++++++++++++++++++++++++- repository.go | 44 ++++++++++++++++++++-- repository_test.go | 60 ++++++++++++++++++++++++++++++ spec_test.go | 1 + utils.go | 2 +- utils_test.go | 1 - 7 files changed, 226 insertions(+), 9 deletions(-) diff --git a/metrics.go b/metrics.go index 04a426a..94eb178 100644 --- a/metrics.go +++ b/metrics.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net/http" "net/url" "sync" @@ -66,6 +67,9 @@ type metrics struct { closed chan struct{} ctx context.Context cancel func() + maxSkips float64 + errors float64 + skips float64 } func newMetrics(options metricsOptions, channels metricsChannels) *metrics { @@ -75,6 +79,9 @@ func newMetrics(options metricsOptions, channels metricsChannels) *metrics { started: time.Now(), close: make(chan struct{}), closed: make(chan struct{}), + maxSkips: 10, + errors: 0, + skips: 0, } ctx, cancel := context.WithCancel(context.Background()) m.ctx = ctx @@ -111,7 +118,11 @@ func (m *metrics) sync() { for { select { case <-m.ticker.C: - m.sendMetrics() + if m.skips == 0 { + m.sendMetrics() + } else { + m.decrementSkip() + } case <-m.close: close(m.closed) return @@ -136,7 +147,24 @@ func (m *metrics) registerInstance() { m.registered <- payload } +func (m *metrics) backoff() { + m.errors = math.Min(m.maxSkips, m.errors+1) + m.skips = m.errors +} + +func (m *metrics) configurationError() { + m.errors = m.maxSkips + m.skips = m.errors +} +func (m *metrics) successfulPost() { + m.errors = math.Max(0, m.errors-1) + m.skips = m.errors +} + +func (m *metrics) decrementSkip() { + m.skips = math.Max(0, m.skips-1) +} func (m *metrics) sendMetrics() { m.bucketMu.Lock() bucket := m.resetBucket() @@ -160,6 +188,11 @@ func (m *metrics) sendMetrics() { defer resp.Body.Close() if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusMultipleChoices { + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound { + m.configurationError() + } else if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= http.StatusInternalServerError { + m.backoff() + } m.warn(fmt.Errorf("%s return %d", u.String(), resp.StatusCode)) // The post failed, re-add the metrics we attempted to send so // they are included in the next post. @@ -174,6 +207,7 @@ func (m *metrics) sendMetrics() { m.bucket.Start = bucket.Start m.bucketMu.Unlock() } else { + m.successfulPost() m.sent <- payload } } diff --git a/metrics_test.go b/metrics_test.go index 689d6c5..82fca5d 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -328,14 +328,101 @@ func TestMetrics_ShouldNotCountMetricsForParentToggles(t *testing.T) { WithInstanceId(mockInstanceId), WithListener(mockListener), ) - + assert.Nil(err, "client should not return an error") client.WaitForReady() client.IsEnabled("child") assert.EqualValues(client.metrics.bucket.Toggles["child"].Yes, 1) assert.EqualValues(client.metrics.bucket.Toggles["parent"].Yes, 0) - client.Close() + err = client.Close() assert.Nil(err, "client should not return an error") assert.True(gock.IsDone(), "there should be no more mocks") } + +func TestMetrics_ShouldBackoffOn500(t *testing.T) { + assert := assert.New(t) + defer gock.OffAll() + + gock.New(mockerServer). + Post("/client/register"). + Reply(200) + gock.New(mockerServer). + Post("/client/metrics"). + Persist(). + Reply(500) + gock.New(mockerServer). + Get("/client/features"). + Reply(200). + JSON(api.FeatureResponse{}) + mockListener := &MockedListener{} + mockListener.On("OnReady").Return() + mockListener.On("OnRegistered", mock.AnythingOfType("ClientData")).Return() + mockListener.On("OnCount", "foo", false).Return() + mockListener.On("OnCount", "bar", false).Return() + mockListener.On("OnCount", "baz", false).Return() + mockListener.On("OnWarning", mock.MatchedBy(func(e error) bool { + return strings.HasSuffix(e.Error(), "http://foo.com/client/metrics return 500") + })).Return() + mockListener.On("OnError", mock.Anything).Return() + + client, err := NewClient( + WithUrl(mockerServer), + WithMetricsInterval(50*time.Millisecond), + WithAppName(mockAppName), + WithInstanceId(mockInstanceId), + WithListener(mockListener), + ) + assert.Nil(err, "client should not return an error") + + client.WaitForReady() + client.IsEnabled("foo") + client.IsEnabled("bar") + client.IsEnabled("baz") + + time.Sleep(320 * time.Millisecond) + err = client.Close() + assert.Equal(float64(3), client.metrics.errors) + assert.Nil(err, "Client should close without a problem") + +} + +func TestMetrics_ErrorCountShouldDecreaseIfSuccessful(t *testing.T) { + assert := assert.New(t) + defer gock.OffAll() + + gock.New(mockerServer). + Post("/client/register"). + Reply(200) + gock.New(mockerServer). + Post("/client/metrics"). + Times(2). + Reply(500) + gock.New(mockerServer). + Get("/client/features"). + Reply(200). + JSON(api.FeatureResponse{}) + gock.New(mockerServer). + Post("/client/metrics"). + Persist(). + Reply(200) + + client, err := NewClient( + WithUrl(mockerServer), + WithMetricsInterval(50*time.Millisecond), + WithAppName(mockAppName), + WithInstanceId(mockInstanceId), + ) + assert.Nil(err, "client should not return an error") + + client.WaitForReady() + client.IsEnabled("foo") + client.IsEnabled("bar") + client.IsEnabled("baz") + time.Sleep(360 * time.Millisecond) + client.IsEnabled("foo") + time.Sleep(100 * time.Millisecond) + err = client.Close() + assert.Equal(float64(0), client.metrics.errors) + assert.Nil(err, "Client should close without a problem") +} diff --git a/repository.go b/repository.go index 31e915f..7921f38 100644 --- a/repository.go +++ b/repository.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net/http" "net/url" "sync" @@ -26,6 +27,9 @@ type repository struct { isReady bool refreshTicker *time.Ticker segments map[int][]api.Constraint + errors float64 + maxSkips float64 + skips float64 } func newRepository(options repositoryOptions, channels repositoryChannels) *repository { @@ -36,6 +40,9 @@ func newRepository(options repositoryOptions, channels repositoryChannels) *repo closed: make(chan struct{}), refreshTicker: time.NewTicker(options.refreshInterval), segments: map[int][]api.Constraint{}, + errors: 0, + maxSkips: 10, + skips: 0, } ctx, cancel := context.WithCancel(context.Background()) repo.ctx = ctx @@ -80,11 +87,33 @@ func (r *repository) sync() { close(r.closed) return case <-r.refreshTicker.C: - r.fetchAndReportError() + if r.skips == 0 { + r.fetchAndReportError() + } else { + r.decrementSkips() + } } } } +func (r *repository) backoff() { + r.errors = math.Min(r.maxSkips, r.errors+1) + r.skips = r.errors +} + +func (r *repository) successfulFetch() { + r.errors = math.Max(0, r.errors-1) + r.skips = r.errors +} + +func (r *repository) decrementSkips() { + r.skips = math.Max(0, r.skips-1) +} +func (r *repository) configurationError() { + r.errors = r.maxSkips + r.skips = r.errors +} + func (r *repository) fetch() error { u, _ := r.options.url.Parse(getFetchURLPath(r.options.projectName)) @@ -119,7 +148,7 @@ func (r *repository) fetch() error { if resp.StatusCode == http.StatusNotModified { return nil } - if err := statusIsOK(resp); err != nil { + if err := r.statusIsOK(resp); err != nil { return err } @@ -133,14 +162,21 @@ func (r *repository) fetch() error { r.etag = resp.Header.Get("Etag") r.segments = featureResp.SegmentsMap() r.options.storage.Reset(featureResp.FeatureMap(), true) + r.successfulFetch() r.Unlock() return nil } -func statusIsOK(resp *http.Response) error { +func (r *repository) statusIsOK(resp *http.Response) error { s := resp.StatusCode - if 200 <= s && s < 300 { + if http.StatusOK <= s && s < http.StatusMultipleChoices { return nil + } else if s == http.StatusUnauthorized || s == http.StatusForbidden || s == http.StatusNotFound { + r.configurationError() + return fmt.Errorf("%s %s returned status code %d your SDK is most likely misconfigured, backing off to maximum (%f times our interval)", resp.Request.Method, resp.Request.URL, s, r.maxSkips) + } else if s == http.StatusTooManyRequests || s >= http.StatusInternalServerError { + r.backoff() + return fmt.Errorf("%s %s returned status code %d, backing off (%f times our interval)", resp.Request.Method, resp.Request.URL, s, r.errors) } return fmt.Errorf("%s %s returned status code %d", resp.Request.Method, resp.Request.URL, s) diff --git a/repository_test.go b/repository_test.go index dcdbaeb..2854730 100644 --- a/repository_test.go +++ b/repository_test.go @@ -3,6 +3,7 @@ package unleash import ( "bytes" "encoding/json" + "gopkg.in/h2non/gock.v1" "net/http" "net/http/httptest" "strings" @@ -127,3 +128,62 @@ func TestRepository_ParseAPIResponse(t *testing.T) { assert.Equal(2, len(response.Features)) assert.Equal(0, len(response.Segments)) } + + +func TestRepository_backs_off_on_http_statuses(t *testing.T) { + a := assert.New(t) + testCases := []struct { + statusCode int + errorCount float64 + }{ + { 401, 10}, + { 403, 10}, + { 404, 10}, + { 429, 1}, + { 500, 1}, + { 502, 1}, + { 503, 1}, + } + defer gock.Off() + for _, tc := range testCases { + gock.New(mockerServer). + Get("/client/features"). + Reply(tc.statusCode) + client, err := NewClient( + WithUrl(mockerServer), + WithAppName(mockAppName), + WithDisableMetrics(true), + WithInstanceId(mockInstanceId), + WithRefreshInterval(time.Millisecond * 15), + ) + a.Nil(err) + time.Sleep(20 * time.Millisecond) + err = client.Close() + a.Equal(tc.errorCount, client.repository.errors) + a.Nil(err) + } +} +func TestRepository_back_offs_are_gradually_reduced_on_success(t *testing.T) { + a := assert.New(t) + defer gock.Off() + gock.New(mockerServer). + Get("/client/features"). + Times(4). + Reply(429) + gock.New(mockerServer). + Get("/client/features"). + Reply(200). + BodyString(`{ "version": 2, "features": []}`) + client, err := NewClient( + WithUrl(mockerServer), + WithAppName(mockAppName), + WithDisableMetrics(true), + WithInstanceId(mockInstanceId), + WithRefreshInterval(time.Millisecond * 10), + ) + a.Nil(err) + client.WaitForReady() + err = client.Close() + a.Equal(float64(3), client.repository.errors) // 4 failures, and then one success, should reduce error count to 3 + a.Nil(err) +} \ No newline at end of file diff --git a/spec_test.go b/spec_test.go index ac4b2ef..7fb566a 100644 --- a/spec_test.go +++ b/spec_test.go @@ -1,3 +1,4 @@ +//go:build norace // +build norace package unleash diff --git a/utils.go b/utils.go index d79d3d5..7021925 100644 --- a/utils.go +++ b/utils.go @@ -66,7 +66,7 @@ func every(slice interface{}, condition func(interface{}) bool) bool { return false } - if (sliceValue.Len() == 0) { + if sliceValue.Len() == 0 { return false } diff --git a/utils_test.go b/utils_test.go index 204269b..58bca64 100644 --- a/utils_test.go +++ b/utils_test.go @@ -116,4 +116,3 @@ func TestContains(t *testing.T) { } }) } -