Skip to content

Commit

Permalink
fix(outputs.influxdb_v2): Fix panic and API error handling (#16388)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jan 10, 2025
1 parent 1de8e7a commit 6ea22bd
Show file tree
Hide file tree
Showing 3 changed files with 405 additions and 37 deletions.
67 changes: 31 additions & 36 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import (
)

type APIError struct {
StatusCode int
Title string
Description string
Err error
StatusCode int
Retryable bool
}

func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("%s: %s", e.Title, e.Description)
}
return e.Title
return e.Err.Error()
}

func (e APIError) Unwrap() error {
return e.Err
}

const (
Expand Down Expand Up @@ -185,7 +186,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

batches[bucket] = append(batches[bucket], metric)
batchIndices[c.bucket] = append(batchIndices[c.bucket], i)
batchIndices[bucket] = append(batchIndices[bucket], i)
}
}

Expand All @@ -201,10 +202,14 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
// TODO: Need a testcase to verify rejected metrics are not retried...
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
wErr.Err = err
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
if !apiErr.Retryable {
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
}
// TODO: Clarify if we should continue here to try the remaining buckets?
return &wErr
}

Expand Down Expand Up @@ -301,21 +306,19 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
}

// We got an error and now try to decode further
var desc string
writeResp := &genericRespError{}
err = json.NewDecoder(resp.Body).Decode(writeResp)
desc := writeResp.Error()
if err != nil {
desc = resp.Status
if json.NewDecoder(resp.Body).Decode(writeResp) == nil {
desc = ": " + writeResp.Error()
}

switch resp.StatusCode {
// request was too large, send back to try again
case http.StatusRequestEntityTooLarge:
c.log.Errorf("Failed to write metric to %s, request was too large (413)", bucket)
return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
Err: fmt.Errorf("%s: %s", resp.Status, desc),
StatusCode: resp.StatusCode,
}
case
// request was malformed:
Expand All @@ -325,17 +328,13 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
http.StatusUnprocessableEntity,
http.StatusNotAcceptable:

// Clients should *not* repeat the request and the metrics should be dropped.
rejected := make([]int, 0, len(metrics))
for i := range len(metrics) {
rejected = append(rejected, i)
}
return &internal.PartialWriteError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
MetricsReject: rejected,
// Clients should *not* repeat the request and the metrics should be rejected.
return &APIError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
}
case http.StatusUnauthorized, http.StatusForbidden:
return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc)
return fmt.Errorf("failed to write metric to %s (%s)%s", bucket, resp.Status, desc)
case http.StatusTooManyRequests,
http.StatusServiceUnavailable,
http.StatusBadGateway,
Expand All @@ -351,26 +350,22 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
// retrying will not make the request magically work.
if len(resp.Status) > 0 && resp.Status[0] == '4' {
rejected := make([]int, 0, len(metrics))
for i := range len(metrics) {
rejected = append(rejected, i)
}
return &internal.PartialWriteError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
MetricsReject: rejected,
return &APIError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s)%s", bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
}
}

// This is only until platform spec is fully implemented. As of the
// time of writing, there is no error body returned.
if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" {
desc = fmt.Sprintf("%s; %s", desc, xErr)
desc = fmt.Sprintf(": %s; %s", desc, xErr)
}

return &APIError{
StatusCode: resp.StatusCode,
Title: resp.Status,
Description: desc,
Err: fmt.Errorf("failed to write metric to bucket %q: %s%s", bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
Retryable: true,
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/influxdb_v2/influxdb_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
for _, n := range rand.Perm(len(i.clients)) {
client := i.clients[n]
if err := client.Write(ctx, metrics); err != nil {
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
var werr *internal.PartialWriteError
if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) {
return err
}
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
continue
}
return nil
Expand Down
Loading

0 comments on commit 6ea22bd

Please sign in to comment.