Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: suppress and log max http request retry er…
Browse files Browse the repository at this point in the history
…rors (#37160)

The retryablehttp.Client will return an error based on the ErrorHandler
field when the number of requests exceeds the maximum configuration.
In the case that the field is nil, this is a non-nil error that causes
issues with some APIs and does not allow the CEL code to evaluate the
HTTP response status. So add a retryablehttp.ErrorHandler that will log
the failure, but return the response unaltered and a nil error.
  • Loading branch information
efd6 authored Nov 22, 2023
1 parent 15857a8 commit cc39376
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Made Azure Blob Storage input GA and updated docs accordingly. {pull}37128[37128]
- Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957]
- Made GCS input GA and updated docs accordingly. {pull}37127[37127]
- Suppress and log max HTTP request retry errors in CEL input. {pull}37160[37160]

*Auditbeat*

Expand Down
15 changes: 14 additions & 1 deletion x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,14 +730,16 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
c.CheckRedirect = checkRedirect(cfg.Resource, log)

if cfg.Resource.Retry.getMaxAttempts() > 1 {
maxAttempts := cfg.Resource.Retry.getMaxAttempts()
c = (&retryablehttp.Client{
HTTPClient: c,
Logger: newRetryLog(log),
RetryWaitMin: cfg.Resource.Retry.getWaitMin(),
RetryWaitMax: cfg.Resource.Retry.getWaitMax(),
RetryMax: cfg.Resource.Retry.getMaxAttempts(),
RetryMax: maxAttempts,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
ErrorHandler: retryErrorHandler(maxAttempts, log),
}).StandardClient()
}

Expand Down Expand Up @@ -831,6 +833,17 @@ func checkRedirect(cfg *ResourceConfig, log *logp.Logger) func(*http.Request, []
}
}

// retryErrorHandler returns a retryablehttp.ErrorHandler that will log retry resignation
// but return the last retry attempt's response and a nil error so that the CEL code
// can evaluate the response status itself. Any error passed to the retryablehttp.ErrorHandler
// is returned unaltered.
func retryErrorHandler(max int, log *logp.Logger) retryablehttp.ErrorHandler {
return func(resp *http.Response, err error, numTries int) (*http.Response, error) {
log.Warnw("giving up retries", "method", resp.Request.Method, "url", resp.Request.URL, "retries", max+1)
return resp, err
}
}

func newRateLimiterFromConfig(cfg *ResourceConfig) *rate.Limiter {
r := rate.Inf
b := 1
Expand Down
31 changes: 31 additions & 0 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,37 @@ var inputTests = []struct {
{"hello": "world"},
},
},
{
name: "retry_failure_no_success",
server: newTestServer(httptest.NewServer),
config: map[string]interface{}{
"interval": 1,
"resource": map[string]interface{}{
"retry": map[string]interface{}{
"max_attempts": 2,
},
},
"program": `
get(state.url).as(resp, {
"url": state.url,
"events": [
bytes(resp.Body).decode_json(),
{"status": resp.StatusCode},
],
})
`,
},
handler: func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
w.WriteHeader(http.StatusGatewayTimeout)
//nolint:errcheck // No point checking errors in test server.
w.Write([]byte(`{"error":"we were too slow"}`))
},
want: []map[string]interface{}{
{"error": "we were too slow"},
{"status": float64(504)}, // Float because of JSON.
},
},

{
name: "POST_request",
Expand Down

0 comments on commit cc39376

Please sign in to comment.