From b54e9e8d2c37ebc1875497b9ddf652f6ef450b13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Wed, 12 Jun 2024 07:52:46 +0200 Subject: [PATCH] Configurable error handling and retrying in HTTP target Drafting: * Flexible configuration allowing us to categorize HTTP responses and match specific retrying strategies. * Very simple application health store as global variable. * Very simple monitoring interface used to send alerts during retries in HTTP target. It doesn't compile now! It's just to show how such configuration could look like and validate such approach. --- assets/docs/configuration/targets/default.hcl | 24 ++ .../targets/http_response_handler.hcl | 60 +++++ pkg/health/health_check.go | 30 +++ pkg/monitoring/monitoring.go | 45 ++++ pkg/target/http.go | 227 +++++++++++++++--- 5 files changed, 355 insertions(+), 31 deletions(-) create mode 100644 assets/docs/configuration/targets/default.hcl create mode 100644 assets/docs/configuration/targets/http_response_handler.hcl create mode 100644 pkg/health/health_check.go create mode 100644 pkg/monitoring/monitoring.go diff --git a/assets/docs/configuration/targets/default.hcl b/assets/docs/configuration/targets/default.hcl new file mode 100644 index 00000000..bc78c7c4 --- /dev/null +++ b/assets/docs/configuration/targets/default.hcl @@ -0,0 +1,24 @@ +target { + use "http" { + + // by default no custom invalid/retryable rules so we use current default retrying strategy for target failures in `sourceWriteFunc` (in cli.go) + response_handler { + success = [{ http_status: ["2**"]}] + invalid/badrow = [] + retryable = [] + + retry_strategies { + transient { + policy: "exponential" + max_attempts: 5 + delay: "2 seconds" + } + setup { + policy: "exponential" + max_attempts: 10000 //very high value making it basically unlimited + delay: "30 seconds" + } + } + } + } +} diff --git a/assets/docs/configuration/targets/http_response_handler.hcl b/assets/docs/configuration/targets/http_response_handler.hcl new file mode 100644 index 00000000..078512f9 --- /dev/null +++ b/assets/docs/configuration/targets/http_response_handler.hcl @@ -0,0 +1,60 @@ +target { + use "http" { + + response_handler { + + // everything is fine, we can ack our data from source. + // Can such configuration be useful or it's overkill and we should commit to 2** all the time? + success = [ + //we can have concrete status or wildcard using * + { http_status: ["2**"]} + ] + + // data is for some reason not accepted by target, there is no point of retrying. Just send it to bad/failed target and unblock processing. + invalid/badrow = [ + + //first rule... + { http_status: ["41*"]}, + + //second rule... + { + http_status: ["499", "424"], + + //not only http status but check if response body matches. Extract some part of body (e.g. error code) and check if it's equal/not equal to value. + body { + path: "some jq here??" + equal/not_equal: "some value" + } + } + ] + + // For these retryable errors (assuming we have health check): + // - set app status as unhealthy for each failed retry + // - set app status as healthy after successful retry + // - if we run out of attempts => exit/crash/restart + // - if we don't have max attempts (e.g. like setup errors), we don't get stuck retrying indefinitely, because we are protected by health check. If app is unhealthy for extended period time, it will be killed eventually by kubernetes. + retryable = [ + { http_status: ["503"], strategy: "transient"}, + { + http_status: ["403"], + strategy: "setup", + alert: "This is alert sent to ops1. You can configure message for specific error code" + } + ] + + // and the list of retry strategies, you can define how many you want and then reference them in your 'retryable' rules + retry_strategies { + transient { + policy: "exponential" + max_attempts: 5 + delay: "2 seconds" + } + setup { + policy: "exponential" + max_attempts: 10000 + delay: "30 seconds" + } + } + } + } +} diff --git a/pkg/health/health_check.go b/pkg/health/health_check.go new file mode 100644 index 00000000..5f8df5bb --- /dev/null +++ b/pkg/health/health_check.go @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package health + +import ( + "sync/atomic" +) + +var isHealthy atomic.Bool + +func SetHealthy() { + isHealthy.Store(true) +} + +func SetUnhealthy() { + isHealthy.Store(false) +} + +func IsHealthy() bool { + return isHealthy.Load() +} diff --git a/pkg/monitoring/monitoring.go b/pkg/monitoring/monitoring.go new file mode 100644 index 00000000..6a52279e --- /dev/null +++ b/pkg/monitoring/monitoring.go @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package monitoring + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/url" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/snowplow/snowbridge/pkg/common" + "github.com/snowplow/snowbridge/pkg/models" + + "golang.org/x/oauth2" +) + +type Alert struct { + Message string +} + +type Monitoring struct { + client *http.Client + httpURL string +} + +func (m *Monitoring) SendAlert (alert Alert) { + m.client.Do(....) +} diff --git a/pkg/target/http.go b/pkg/target/http.go index 7738f163..de60bdee 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -16,6 +16,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "time" @@ -24,12 +25,46 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + retry "github.com/snowplow-devops/go-retry" "github.com/snowplow/snowbridge/pkg/common" + "github.com/snowplow/snowbridge/pkg/health" "github.com/snowplow/snowbridge/pkg/models" - + "github.com/snowplow/snowbridge/pkg/monitoring" "golang.org/x/oauth2" ) +type ResponseHandler struct { + SuccessCriteria []*Rule + InvalidCriteria []*Rule + RetryableCriteria []*Rule + RetryStrategies map[string]RetryConfig +} + +type Rule struct { + HttpStatusExpectations []string + *ResponseBodyExpectations + + //only for retries... + RetryStrategy string + Alert string +} + +type ResponseBodyExpectations struct { + Path string + expectedValue string +} + +type RetryConfig struct { + Policy string + MaxAttempts int + Delay time.Duration +} + +type Response struct { + Status int + Body string +} + // HTTPTargetConfig configures the destination for records consumed type HTTPTargetConfig struct { HTTPURL string `hcl:"url" env:"TARGET_HTTP_URL"` @@ -49,6 +84,9 @@ type HTTPTargetConfig struct { OAuth2ClientSecret string `hcl:"oauth2_client_secret,optional" env:"TARGET_HTTP_OAUTH2_CLIENT_SECRET"` OAuth2RefreshToken string `hcl:"oauth2_refresh_token,optional" env:"TARGET_HTTP_OAUTH2_REFRESH_TOKEN"` OAuth2TokenURL string `hcl:"oauth2_token_url,optional" env:"TARGET_HTTP_OAUTH2_TOKEN_URL"` + + //we have flat config structure everywhere so not sure if it's good idea to add struct here? + ResponseHandler ResponseHandler } // HTTPTarget holds a new client for writing messages to HTTP endpoints @@ -62,6 +100,10 @@ type HTTPTarget struct { basicAuthPassword string log *log.Entry dynamicHeaders bool + + //simply passing from HTTPTargetConfig + responseHandler ResponseHandler + monitoring monitoring.Monitoring } func checkURL(str string) error { @@ -226,52 +268,175 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu var errResult error for _, msg := range safeMessages { + request := ht.createHTTPRequest(msg) + response := ht.executeHTTPRequest(request, msg) + ht.handleHTTPResponse(request, response, msg, sent, invalid, failed, errResult, nil) + } + + if errResult != nil { + errResult = errors.Wrap(errResult, "Error sending http requests") + } + + ht.log.Debugf("Successfully wrote %d/%d messages", len(sent), len(messages)) + return models.NewTargetWriteResult( + sent, + failed, + oversized, + invalid, + ), errResult +} + +func (ht *HTTPTarget) createHTTPRequest(msg *models.Message) *http.Request { + //we have it defined in default config + setupConfig := ht.responseHandler.RetryStrategies["setup"] + + // First place with possible failure - creating http request. The only way it can fail in our case is unparseable URL. + // It's kind of setup error. So I imagine it would be nice to receive some alert about invalid URL? + result := ht.retry(setupConfig, func() (interface{}, error) { request, err := http.NewRequest("POST", ht.httpURL, bytes.NewBuffer(msg.Data)) + if err != nil { - errResult = multierror.Append(errResult, errors.Wrap(err, "Error creating request")) - failed = append(failed, msg) - continue + ht.monitoring.SendAlert(monitoring.Alert{Message: fmt.Sprintf("Could not create HTTP request, error - %s", err.Error())}) + health.SetUnhealthy() + return nil, err } - request.Header.Add("Content-Type", ht.contentType) // Add content type - addHeadersToRequest(request, ht.headers, ht.retrieveHeaders(msg)) // Add headers if there are any - if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set + + request.Header.Add("Content-Type", ht.contentType) + addHeadersToRequest(request, ht.headers, ht.retrieveHeaders(msg)) + if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword) } + return nil, err + }) + + return result.(*http.Request) +} + +func (ht *HTTPTarget) executeHTTPRequest(request *http.Request, msg *models.Message) Response { + // Second possible place for failure - error after making HTTP call. We don't have HTTP response, so there is no way to check status/response body. + // It's connection/response timeout or some other unexpected network issue. + // Could we categorize it as transient error? So it deserves bunch of retries, after reaching max attempt just fail. + transientConfig := ht.responseHandler.RetryStrategies["transient"] + result := ht.retry(transientConfig, func() (interface{}, error) { requestStarted := time.Now() - resp, err := ht.client.Do(request) // Make request + resp, err := ht.client.Do(request) requestFinished := time.Now() msg.TimeRequestStarted = requestStarted msg.TimeRequestFinished = requestFinished - if err != nil { - errResult = multierror.Append(errResult, err) - failed = append(failed, msg) - continue + health.SetUnhealthy() + return nil, err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return &Response{Body: string(body), Status: resp.StatusCode}, nil + }) + + return result.(Response) +} + +// We have HTTP response so we can check status and response body details. +// Returned error means we have to retry +func (ht *HTTPTarget) handleHTTPResponse(request *http.Request, response Response, msg *models.Message, sent []*models.Message, invalid []*models.Message, failed []*models.Message, errResult error, previouslyMatchedRule *Rule) error { + //ACK and set healthy on success... + if ht.isSuccess(response) { + health.SetHealthy() + sent = append(sent, msg) + if msg.AckFunc != nil { + msg.AckFunc() } - defer resp.Body.Close() - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - sent = append(sent, msg) - if msg.AckFunc != nil { // Ack successful messages - msg.AckFunc() - } - } else { - errResult = multierror.Append(errResult, errors.New("Got response status: "+resp.Status)) - failed = append(failed, msg) - continue + return nil + } + + if ht.isInvalid(response) { + // App is healthy, our data is not + health.SetHealthy() + invalid = append(invalid, msg) + return nil + } + + if currentRule := ht.findRetryableRule(response); currentRule != nil { + //we found matching rule! Start retrying + return ht.handleRetryableResponse(currentRule, response, previouslyMatchedRule, request, msg, sent, invalid, failed, errResult) + } + + //no success/invalid/retryable rule matches, so we fallback to 'old default' layer of retrying (in main) and append to failed list. + errResult = multierror.Append(errResult, errors.New("TODO")) + failed = append(failed, msg) + return nil +} + +func (ht *HTTPTarget) handleRetryableResponse(currentRule *Rule, response Response, previouslyMatchedRule *Rule, request *http.Request, msg *models.Message, sent []*models.Message, invalid []*models.Message, failed []*models.Message, errResult error) error { + health.SetUnhealthy() + //send alert if configured... + if currentRule.Alert != "" { + ht.monitoring.SendAlert(monitoring.Alert{Message: fmt.Sprintf("%s, response body - %s", currentRule.Alert, response.Body)}) + } + // If we have some 'previouslyMatchedRule' then we know we're currently retrying some request. It answers question - 'are we currently retrying something??'. + // If it's equal to the new rule we've just matched, we have to retry again using the same strategy, instead of starting new retry loop. That's why we stop here and return control to the caller. + if currentRule == previouslyMatchedRule { + return errors.New("Same rule, probably same error, retry using the same strategy in upper layer") + } + + // If `previouslyMatchedRule` is not defined (e.g. at the beginning it's ) or is not equal to the new rule (so we have some different error to deal with) we change retrying strategy. + strategy := ht.responseHandler.RetryStrategies[currentRule.RetryStrategy] + + ht.retry(strategy, func() (interface{}, error) { + response := ht.executeHTTPRequest(request, msg) + return nil, ht.handleHTTPResponse(request, response, msg, sent, invalid, failed, errResult, currentRule) + }) + return nil +} + +func (ht *HTTPTarget) retry(config RetryConfig, action func() (interface{}, error)) interface{} { + // Should we use some third-party retrying library? Where we can configure more stuff. + // Now we're fixed on exponential. + result, err := retry.ExponentialWithInterface(config.MaxAttempts, config.Delay, "HTTP target", action) + // If we run out of attempts just crash? + if err != nil { + log.Fatal("Time to crash..?", err) + } + return result +} + +func (ht *HTTPTarget) isSuccess(bodyWithStatus Response) bool { + return findMatchingRule(bodyWithStatus, ht.responseHandler.SuccessCriteria) != nil +} + +func (ht *HTTPTarget) isInvalid(bodyWithStatus Response) bool { + return findMatchingRule(bodyWithStatus, ht.responseHandler.InvalidCriteria) != nil +} + +func (ht *HTTPTarget) findRetryableRule(bodyWithStatus Response) *Rule { + return findMatchingRule(bodyWithStatus, ht.responseHandler.RetryableCriteria) +} + +func findMatchingRule(bodyWithStatus Response, rules []*Rule) *Rule { + for _, rule := range rules { + if ruleMatches(bodyWithStatus, rule) { + return rule } } - if errResult != nil { - errResult = errors.Wrap(errResult, "Error sending http requests") + return nil +} + +func ruleMatches(bodyWithStatus Response, rule *Rule) bool { + codeMatch := httpStatusMatches(bodyWithStatus.Status, rule.HttpStatusExpectations) + if rule.ResponseBodyExpectations != nil { + return codeMatch && responseBodyMatches(bodyWithStatus.Body, rule.ResponseBodyExpectations) } + return codeMatch +} - ht.log.Debugf("Successfully wrote %d/%d messages", len(sent), len(messages)) - return models.NewTargetWriteResult( - sent, - failed, - oversized, - invalid, - ), errResult +func httpStatusMatches(actual int, expectedPatterns []string) bool { + return false +} + +func responseBodyMatches(actual string, expectations *ResponseBodyExpectations) bool { + return false } // Open does nothing for this target