Skip to content

Commit

Permalink
add retryable errs
Browse files Browse the repository at this point in the history
  • Loading branch information
taratorio committed Jan 14, 2025
1 parent 10541e1 commit 2a799b6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
40 changes: 36 additions & 4 deletions cmd/integration/commands/cross_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/spf13/cobra"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/turbo/debug"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func crossReferenceBlockHashes(ctx context.Context, logger log.Logger, startBloc
return err
}

goldenBlockFields, err := fetchBlockViaRpc(logger, secondaryRpcUrl, blockNum)
goldenBlockFields, err := fetchBlockViaRpcWithRetry(ctx, logger, secondaryRpcUrl, blockNum)
if err != nil {
return err
}
Expand Down Expand Up @@ -96,6 +97,34 @@ func crossReferenceBlockHashes(ctx context.Context, logger log.Logger, startBloc
return nil
}

var (
errFailedResponse = errors.New("failed response")
errResponseStatusNotOk = errors.New("response status not ok")
)

func fetchBlockViaRpcWithRetry(ctx context.Context, logger log.Logger, rpcUrl string, blockNum uint64) (map[string]interface{}, error) {
var err error
var fields map[string]interface{}
for i := 0; i < rpcMaxRetries+1; i++ {
fields, err = fetchBlockViaRpc(logger, rpcUrl, blockNum)
if err == nil {
return fields, nil
}
if !errors.Is(err, errResponseStatusNotOk) && !errors.Is(err, errFailedResponse) {
return nil, err
}

// otherwise this is a retry-able error - sleep and retry
logger.Error("Failed to fetch block via RPC - retrying after some time", "backOffDuration", rpcBackOffDuration, "err", err)
err = libcommon.Sleep(ctx, rpcBackOffDuration)
if err != nil {
return nil, err
}
}

return nil, errors.New("max retries reached")
}

func fetchBlockViaRpc(logger log.Logger, rpcUrl string, blockNum uint64) (map[string]interface{}, error) {
client := &http.Client{}
payload := fmt.Sprintf(`{"method":"eth_getBlockByNumber","params":["0x%x",false],"id":1,"jsonrpc":"2.0"}`, blockNum)
Expand All @@ -108,23 +137,26 @@ func fetchBlockViaRpc(logger log.Logger, rpcUrl string, blockNum uint64) (map[st
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, err
return nil, fmt.Errorf("%w: rpcUrl=%s, blockNum=%d: %w", errFailedResponse, rpcUrl, blockNum, err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
logger.Error("Could not close body", "err", err)
}
}()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%w: rpcUrl=%s, blockNum=%d", errResponseStatusNotOk, rpcUrl, blockNum)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed io.ReadAll for response body: rpcUrl=%s, blockNum=%d: %w", rpcUrl, blockNum, err)
}

var fields map[string]interface{}
err = json.Unmarshal(body, &fields)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed json.Unmarshal for response body: rpcUrl=%s, blockNum=%d: %w", rpcUrl, blockNum, err)
}

return fields, nil
Expand Down
12 changes: 12 additions & 0 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package commands

import (
"time"

"github.com/spf13/cobra"

"github.com/erigontech/erigon/turbo/cli"
Expand Down Expand Up @@ -56,6 +58,8 @@ var (

startBlockNum, endBlockNum uint64
rpcUrl, secondaryRpcUrl string
rpcMaxRetries int
rpcBackOffDuration time.Duration

chainTipMode bool
syncCfg = ethconfig.Defaults.Sync
Expand Down Expand Up @@ -216,3 +220,11 @@ func withRpcUrl(cmd *cobra.Command) {
func withSecondaryRpcUrl(cmd *cobra.Command) {
cmd.Flags().StringVar(&secondaryRpcUrl, "rpc.url.secondary", "", "secondary rpc url")
}

func withRpcMaxRetries(cmd *cobra.Command) {
cmd.Flags().IntVar(&rpcMaxRetries, "rpc.max.retries", 2, "max retries for failed rpc requests")
}

func withRpcBackOffDuration(cmd *cobra.Command) {
cmd.Flags().DurationVar(&rpcBackOffDuration, "rpc.backoff.duration", 30*time.Second, "backoff duration for retry-able rpc request errors")
}
2 changes: 2 additions & 0 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ func init() {
withEndBlockNum(cmdCrossReferenceBlockHashes)
withRpcUrl(cmdCrossReferenceBlockHashes)
withSecondaryRpcUrl(cmdCrossReferenceBlockHashes)
withRpcMaxRetries(cmdCrossReferenceBlockHashes)
withRpcBackOffDuration(cmdCrossReferenceBlockHashes)
rootCmd.AddCommand(cmdCrossReferenceBlockHashes)
}

Expand Down

0 comments on commit 2a799b6

Please sign in to comment.