diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 23562be9e81d8..f97130bacb4c6 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -26,16 +26,17 @@ import ( ) type APIError struct { - StatusCode int - Title string - Description string + Err error + StatusCode int + Retryable bool } func (e APIError) Error() string { - if e.Description != "" { - return fmt.Sprintf("%s: %s", e.Title, e.Description) - } - return e.Title + return e.Err.Error() +} + +func (e APIError) Unwrap() error { + return e.Err } const ( @@ -185,7 +186,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } batches[bucket] = append(batches[bucket], metric) - batchIndices[c.bucket] = append(batchIndices[c.bucket], i) + batchIndices[bucket] = append(batchIndices[bucket], i) } } @@ -201,10 +202,14 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error var apiErr *APIError if errors.As(err, &apiErr) { if apiErr.StatusCode == http.StatusRequestEntityTooLarge { + // TODO: Need a testcase to verify rejected metrics are not retried... return c.splitAndWriteBatch(ctx, c.bucket, metrics) } wErr.Err = err - wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...) + if !apiErr.Retryable { + wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...) + } + // TODO: Clarify if we should continue here to try the remaining buckets? return &wErr } @@ -301,11 +306,10 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te } // We got an error and now try to decode further + var desc string writeResp := &genericRespError{} - err = json.NewDecoder(resp.Body).Decode(writeResp) - desc := writeResp.Error() - if err != nil { - desc = resp.Status + if json.NewDecoder(resp.Body).Decode(writeResp) == nil { + desc = ": " + writeResp.Error() } switch resp.StatusCode { @@ -313,9 +317,8 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te case http.StatusRequestEntityTooLarge: c.log.Errorf("Failed to write metric to %s, request was too large (413)", bucket) return &APIError{ - StatusCode: resp.StatusCode, - Title: resp.Status, - Description: desc, + Err: fmt.Errorf("%s: %s", resp.Status, desc), + StatusCode: resp.StatusCode, } case // request was malformed: @@ -325,17 +328,13 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te http.StatusUnprocessableEntity, http.StatusNotAcceptable: - // Clients should *not* repeat the request and the metrics should be dropped. - rejected := make([]int, 0, len(metrics)) - for i := range len(metrics) { - rejected = append(rejected, i) - } - return &internal.PartialWriteError{ - Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc), - MetricsReject: rejected, + // Clients should *not* repeat the request and the metrics should be rejected. + return &APIError{ + Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc), + StatusCode: resp.StatusCode, } case http.StatusUnauthorized, http.StatusForbidden: - return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc) + return fmt.Errorf("failed to write metric to %s (%s)%s", bucket, resp.Status, desc) case http.StatusTooManyRequests, http.StatusServiceUnavailable, http.StatusBadGateway, @@ -351,26 +350,22 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te // if it's any other 4xx code, the client should not retry as it's the client's mistake. // retrying will not make the request magically work. if len(resp.Status) > 0 && resp.Status[0] == '4' { - rejected := make([]int, 0, len(metrics)) - for i := range len(metrics) { - rejected = append(rejected, i) - } - return &internal.PartialWriteError{ - Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc), - MetricsReject: rejected, + return &APIError{ + Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc), + StatusCode: resp.StatusCode, } } // This is only until platform spec is fully implemented. As of the // time of writing, there is no error body returned. if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" { - desc = fmt.Sprintf("%s; %s", desc, xErr) + desc = fmt.Sprintf(": %s; %s", desc, xErr) } return &APIError{ - StatusCode: resp.StatusCode, - Title: resp.Status, - Description: desc, + Err: fmt.Errorf("failed to write metric to bucket %q: %s%s", bucket, resp.Status, desc), + StatusCode: resp.StatusCode, + Retryable: true, } } diff --git a/plugins/outputs/influxdb_v2/influxdb_v2.go b/plugins/outputs/influxdb_v2/influxdb_v2.go index 89b0d8d2f875a..3b3df4db3eae3 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2.go @@ -199,11 +199,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { for _, n := range rand.Perm(len(i.clients)) { client := i.clients[n] if err := client.Write(ctx, metrics); err != nil { + i.Log.Errorf("When writing to [%s]: %v", client.url, err) var werr *internal.PartialWriteError if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) { return err } - i.Log.Errorf("When writing to [%s]: %v", client.url, err) continue } return nil diff --git a/plugins/outputs/influxdb_v2/influxdb_v2_test.go b/plugins/outputs/influxdb_v2/influxdb_v2_test.go index f93617a38744e..39220c9d508f3 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2_test.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2_test.go @@ -1,11 +1,13 @@ package influxdb_v2_test import ( + "fmt" "io" "net" "net/http" "net/http/httptest" "reflect" + "strconv" "strings" "sync/atomic" "testing" @@ -486,3 +488,374 @@ func TestRateLimit(t *testing.T) { require.NoError(t, plugin.Write(metrics[3:])) require.Equal(t, uint64(121), received.Load()) } + +func TestStatusCodeNonRetryable4xx(t *testing.T) { + codes := []int{ + // Explicitly checked non-retryable status codes + http.StatusBadRequest, http.StatusUnprocessableEntity, http.StatusNotAcceptable, + // Other non-retryable 4xx status codes not explicitly checked + http.StatusNotFound, http.StatusExpectationFailed, + } + + for _, code := range codes { + t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) { + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + if strings.Contains(string(body), "bucket=foo") { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(code) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + BucketTag: "bucket", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 1), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 43.0, + }, + time.Unix(0, 2), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 3), + ), + } + + // Write the metrics the first time and check for the expected errors + err := plugin.Write(metrics) + require.ErrorContains(t, err, "failed to write metric to my_bucket (will be dropped:") + + var apiErr *influxdb.APIError + require.ErrorAs(t, err, &apiErr) + require.Equal(t, code, apiErr.StatusCode) + + var writeErr *internal.PartialWriteError + require.ErrorAs(t, err, &writeErr) + require.Len(t, writeErr.MetricsReject, 2, "rejected metrics") + }) + } +} + +func TestStatusCodeInvalidAuthentication(t *testing.T) { + codes := []int{http.StatusUnauthorized, http.StatusForbidden} + + for _, code := range codes { + t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) { + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + if strings.Contains(string(body), "bucket=foo") { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(code) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + BucketTag: "bucket", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 1), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 43.0, + }, + time.Unix(0, 2), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 3), + ), + } + + // Write the metrics the first time and check for the expected errors + err := plugin.Write(metrics) + require.ErrorContains(t, err, "failed to write metric to my_bucket") + require.ErrorContains(t, err, strconv.Itoa(code)) + + var writeErr *internal.PartialWriteError + require.ErrorAs(t, err, &writeErr) + require.Empty(t, writeErr.MetricsReject, "rejected metrics") + require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics") + }) + } +} + +func TestStatusCodeServiceUnavailable(t *testing.T) { + codes := []int{ + http.StatusTooManyRequests, + http.StatusServiceUnavailable, + http.StatusBadGateway, + http.StatusGatewayTimeout, + } + + for _, code := range codes { + t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) { + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + if strings.Contains(string(body), "bucket=foo") { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(code) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + BucketTag: "bucket", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 1), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 43.0, + }, + time.Unix(0, 2), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 3), + ), + } + + // Write the metrics the first time and check for the expected errors + err := plugin.Write(metrics) + require.ErrorContains(t, err, "waiting 25ms for server (my_bucket) before sending metric again") + + var writeErr *internal.PartialWriteError + require.ErrorAs(t, err, &writeErr) + require.Empty(t, writeErr.MetricsReject, "rejected metrics") + require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics") + }) + } +} + +func TestStatusCodeUnexpected(t *testing.T) { + codes := []int{http.StatusInternalServerError} + + for _, code := range codes { + t.Run(fmt.Sprintf("code %d", code), func(t *testing.T) { + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + if strings.Contains(string(body), "bucket=foo") { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(code) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + BucketTag: "bucket", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 1), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "my_bucket", + }, + map[string]interface{}{ + "value": 43.0, + }, + time.Unix(0, 2), + ), + metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 0.0, + }, + time.Unix(0, 3), + ), + } + + // Write the metrics the first time and check for the expected errors + err := plugin.Write(metrics) + require.ErrorContains(t, err, "failed to write metric to bucket \"my_bucket\"") + require.ErrorContains(t, err, strconv.Itoa(code)) + + var writeErr *internal.PartialWriteError + require.ErrorAs(t, err, &writeErr) + require.Empty(t, writeErr.MetricsReject, "rejected metrics") + require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics") + }) + } +}