Skip to content

Commit

Permalink
Add configurable retries
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Aug 2, 2024
1 parent 80eacaa commit 3089177
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 60 deletions.
22 changes: 22 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,27 @@ target {

# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"

# 2 invalid + 1 setup error rules
response_rules {
# This one is a match when...
invalid {
# ...HTTP statuses match...
http_codes = [400]
# AND this string exists in a response body
body = "Invalid value for 'purchase' field"
}
# If no match yet, we can check the next one...
invalid {
# again 400 status...
http_codes = [400]
# BUT we expect different error message in the response body
body = "Invalid value for 'attributes' field"
}
# Same for 'setup' rules..
setup {
http_codes = [401, 403]
}
}
}
}
108 changes: 73 additions & 35 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
retry "github.com/snowplow-devops/go-retry"
"github.com/urfave/cli"

"net/http"
// pprof imported for the side effect of registering its HTTP handlers
_ "net/http/pprof"

retry "github.com/avast/retry-go/v4"
"github.com/snowplow/snowbridge/cmd"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/failure/failureiface"
Expand Down Expand Up @@ -171,7 +171,7 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
WriteToTarget: sourceWriteFunc(t, ft, tr, o, cfg),
}

// Read is a long running process and will only return when the source
Expand All @@ -195,7 +195,7 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup
// 4. Observing these results
//
// All with retry logic baked in to remove any of this handling from the implementations
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error {
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer, cfg *config.Config) func(messages []*models.Message) error {
return func(messages []*models.Message) error {

// Apply transformations
Expand All @@ -216,64 +216,102 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
// Send message buffer
messagesToSend := transformed.Result

res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
res, err := t.Write(messagesToSend)
write := func() (*models.TargetWriteResult, error) {
result, err := t.Write(messagesToSend)

o.TargetWrite(result)
messagesToSend = result.Failed
return result, err
}

writeResult, err := handleWrite(cfg, write)

o.TargetWrite(res)
messagesToSend = res.Failed
return res, err
})
if err != nil {
return err
}
resCast := res.(*models.TargetWriteResult)

// Send oversized message buffer
messagesToSend = resCast.Oversized
messagesToSend = writeResult.Oversized
if len(messagesToSend) > 0 {
err2 := retry.Exponential(5, time.Second, "failureTarget.WriteOversized", func() error {
res, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {

writeOversized := func() (*models.TargetWriteResult, error) {
result, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if len(result.Oversized) != 0 || len(result.Invalid) != 0 {
log.Fatal("Oversized message transformation resulted in new oversized / invalid messages")
}

o.TargetWriteOversized(res)
messagesToSend = res.Failed
o.TargetWriteOversized(result)
messagesToSend = result.Failed
return result, err
}

_, err := handleWrite(cfg, writeOversized)

if err != nil {
return err
})
if err2 != nil {
return err2
}
}

// Send invalid message buffer
messagesToSend = append(resCast.Invalid, transformed.Invalid...)
messagesToSend = append(writeResult.Invalid, transformed.Invalid...)
if len(messagesToSend) > 0 {
err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error {
res, err := ft.WriteInvalid(messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
writeInvalid := func() (*models.TargetWriteResult, error) {
result, err := ft.WriteInvalid(messagesToSend)
if len(result.Oversized) != 0 || len(result.Invalid) != 0 {
log.Fatal("Invalid message transformation resulted in new invalid / oversized messages")
}

o.TargetWriteInvalid(res)
messagesToSend = res.Failed
o.TargetWriteInvalid(result)
messagesToSend = result.Failed
return result, err
}

_, err := handleWrite(cfg, writeInvalid)

if err != nil {
return err
})
if err3 != nil {
return err3
}
}

return nil
}
}

// Wrap each target write operation with 2 kinds of retries:
// - setup errors: long delay, unlimited attempts, unhealthy state + alerts
// - transient errors: short delay, limited attempts
// If it's setup/transient error is decided based on a response returned by the target.
func handleWrite(cfg *config.Config, write func() (*models.TargetWriteResult, error)) (*models.TargetWriteResult, error) {
retryOnlySetupErrors := retry.RetryIf(func(err error) bool {
_, isSetup := err.(models.SetupWriteError)
return isSetup
})

onSetupError := retry.OnRetry(func(attempt uint, err error) {
// Here we can set unhealthy status + send monitoring alerts in the future. Nothing happens here now.
})

//First try to handle error as setup...
writeResult, err := retry.DoWithData[*models.TargetWriteResult](
write,
retryOnlySetupErrors,
onSetupError,
retry.Delay(time.Duration(cfg.Data.Retry.Transient.Delay)*time.Second),
retry.Attempts(0), //unlimited
)

if err == nil {
return writeResult, err
}

//If no setup, then handle as transient
writeResult, err = retry.DoWithData[*models.TargetWriteResult](
write,
retry.Delay(time.Duration(cfg.Data.Retry.Transient.Delay)*time.Second),
retry.Attempts(uint(cfg.Data.Retry.Transient.MaxAttempts)),
)
return writeResult, err
}

// exitWithError will ensure we log the error and leave time for Sentry to flush
func exitWithError(err error, flushSentry bool) {
log.WithFields(log.Fields{"error": err}).Error(err)
Expand Down
21 changes: 21 additions & 0 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func TestCreateTargetComponentHCL(t *testing.T) {
KeyFile: "",
CaFile: "",
SkipVerifyTLS: false,
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{},
SetupError: []target.Rule{},
},
},
},
{
Expand All @@ -112,6 +116,23 @@ func TestCreateTargetComponentHCL(t *testing.T) {
SkipVerifyTLS: true,
DynamicHeaders: true,
TemplateFile: "myTemplate.file",
ResponseRules: &target.ResponseRules{
Invalid: []target.Rule{
{
MatchingHTTPCodes: []int{400},
MatchingBodyPart: "Invalid value for 'purchase' field",
},
{
MatchingHTTPCodes: []int{400},
MatchingBodyPart: "Invalid value for 'attributes' field",
},
},
SetupError: []target.Rule{
{
MatchingHTTPCodes: []int{401, 403},
},
},
},
},
},
{
Expand Down
24 changes: 24 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type configurationData struct {
UserProvidedID string `hcl:"user_provided_id,optional"`
DisableTelemetry bool `hcl:"disable_telemetry,optional"`
License *licenseConfig `hcl:"license,block"`
Retry *retryConfig `hcl:"retry,block"`
}

// component is a type to abstract over configuration blocks.
Expand Down Expand Up @@ -94,6 +95,20 @@ type licenseConfig struct {
Accept bool `hcl:"accept,optional"`
}

type retryConfig struct {
Transient *transientRetryConfig `hcl:"transient,block"`
Setup *setupRetryConfig `hcl:"setup,block"`
}

type transientRetryConfig struct {
Delay int `hcl:"delay_sec,optional"`
MaxAttempts int `hcl:"max_attempts,optional"`
}

type setupRetryConfig struct {
Delay int `hcl:"delay_sec,optional"`
}

// defaultConfigData returns the initial main configuration target.
func defaultConfigData() *configurationData {
return &configurationData{
Expand All @@ -118,6 +133,15 @@ func defaultConfigData() *configurationData {
License: &licenseConfig{
Accept: false,
},
Retry: &retryConfig{
Transient: &transientRetryConfig{
Delay: 1,
MaxAttempts: 5,
},
Setup: &setupRetryConfig{
Delay: 20,
},
},
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
)

require (
github.com/avast/retry-go/v4 v4.6.0
github.com/davecgh/go-spew v1.1.1
github.com/dop251/goja v0.0.0-20240220182346-e401ed450204
github.com/hashicorp/hcl/v2 v2.20.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY
github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY=
github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go v1.44.334 h1:h2bdbGb//fez6Sv6PaYv868s9liDeoYM6hYsAqTB4MU=
github.com/aws/aws-sdk-go v1.44.334/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
21 changes: 21 additions & 0 deletions pkg/models/target_write_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package models

// SetupWriteError is a wrapper for target write error. It is used by any target as a signal for a caller that this kind of error should be retried using 'setup-like' retry strategy.
type SetupWriteError struct {
Err error
}

func (err SetupWriteError) Error() string {
return "Setup error returned by the target: " + err.Error()
}
Loading

0 comments on commit 3089177

Please sign in to comment.