diff --git a/internal/config/config.go b/internal/config/config.go index 01835d9..ebb76bb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,8 +19,6 @@ type NodeConfig struct { BlockchainWsEndpoint Redacted[string] LegacyBlockchainEnabled bool EvmReaderDefaultBlock DefaultBlock - EvmReaderRetryPolicyMaxRetries uint64 - EvmReaderRetryPolicyMaxDelay Duration BlockchainBlockTimeout int SnapshotDir string PostgresEndpoint Redacted[string] @@ -36,6 +34,8 @@ type NodeConfig struct { EspressoStartingBlock uint64 EspressoNamespace uint64 EspressoServiceEndpoint string + MaxRetries uint64 + MaxDelay Duration } // Auth is used to sign transactions. @@ -76,8 +76,6 @@ func FromEnv() NodeConfig { config.BlockchainWsEndpoint = Redacted[string]{GetBlockchainWsEndpoint()} config.LegacyBlockchainEnabled = GetLegacyBlockchainEnabled() config.EvmReaderDefaultBlock = GetEvmReaderDefaultBlock() - config.EvmReaderRetryPolicyMaxRetries = GetEvmReaderRetryPolicyMaxRetries() - config.EvmReaderRetryPolicyMaxDelay = GetEvmReaderRetryPolicyMaxDelay() config.BlockchainBlockTimeout = GetBlockchainBlockTimeout() config.SnapshotDir = GetSnapshotDir() config.PostgresEndpoint = Redacted[string]{GetPostgresEndpoint()} @@ -95,6 +93,8 @@ func FromEnv() NodeConfig { config.EspressoStartingBlock = GetStartingBlock() config.EspressoNamespace = GetNamespace() config.EspressoServiceEndpoint = GetServiceEndpoint() + config.MaxRetries = GetPolicyMaxRetries() + config.MaxDelay = GetPolicyMaxDelay() return config } diff --git a/internal/config/generate/Config.toml b/internal/config/generate/Config.toml index 7ea717a..63e0646 100644 --- a/internal/config/generate/Config.toml +++ b/internal/config/generate/Config.toml @@ -36,18 +36,6 @@ the snapshot matches the hash in the Application contract.""" # Rollups # -[rollups.CARTESI_EVM_READER_RETRY_POLICY_MAX_RETRIES] -default = "3" -go-type = "uint64" -description = """ -How many times some functions should be retried after an error.""" - -[rollups.CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY] -default = "3" -go-type = "Duration" -description = """ -How many seconds the retry policy will wait between retries.""" - [rollups.CARTESI_ADVANCER_POLLING_INTERVAL] default = "7" go-type = "Duration" @@ -224,6 +212,18 @@ go-type = "string" description = """ URL to Espresso nonce and submit service""" +[espresso.RETRY_POLICY_MAX_RETRIES] +default = "10" +go-type = "uint64" +description = """ +How many times some functions should be retried after an error.""" + +[rollups.RETRY_POLICY_MAX_DELAY] +default = "2" +go-type = "Duration" +description = """ +How many seconds the retry policy will wait between retries.""" + # # Temporary # diff --git a/internal/config/generated.go b/internal/config/generated.go index 220c3e5..4aac533 100644 --- a/internal/config/generated.go +++ b/internal/config/generated.go @@ -324,6 +324,18 @@ func GetStartingBlock() uint64 { return val } +func GetPolicyMaxRetries() uint64 { + s, ok := os.LookupEnv("RETRY_POLICY_MAX_RETRIES") + if !ok { + s = "10" + } + val, err := toUint64(s) + if err != nil { + panic(fmt.Sprintf("failed to parse RETRY_POLICY_MAX_RETRIES: %v", err)) + } + return val +} + func GetFeatureClaimSubmissionEnabled() bool { s, ok := os.LookupEnv("CARTESI_FEATURE_CLAIM_SUBMISSION_ENABLED") if !ok { @@ -432,38 +444,26 @@ func GetClaimerPollingInterval() Duration { return val } -func GetEvmReaderRetryPolicyMaxDelay() Duration { - s, ok := os.LookupEnv("CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY") +func GetValidatorPollingInterval() Duration { + s, ok := os.LookupEnv("CARTESI_VALIDATOR_POLLING_INTERVAL") if !ok { - s = "3" + s = "7" } val, err := toDuration(s) if err != nil { - panic(fmt.Sprintf("failed to parse CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY: %v", err)) - } - return val -} - -func GetEvmReaderRetryPolicyMaxRetries() uint64 { - s, ok := os.LookupEnv("CARTESI_EVM_READER_RETRY_POLICY_MAX_RETRIES") - if !ok { - s = "3" - } - val, err := toUint64(s) - if err != nil { - panic(fmt.Sprintf("failed to parse CARTESI_EVM_READER_RETRY_POLICY_MAX_RETRIES: %v", err)) + panic(fmt.Sprintf("failed to parse CARTESI_VALIDATOR_POLLING_INTERVAL: %v", err)) } return val } -func GetValidatorPollingInterval() Duration { - s, ok := os.LookupEnv("CARTESI_VALIDATOR_POLLING_INTERVAL") +func GetPolicyMaxDelay() Duration { + s, ok := os.LookupEnv("RETRY_POLICY_MAX_DELAY") if !ok { - s = "7" + s = "2" } val, err := toDuration(s) if err != nil { - panic(fmt.Sprintf("failed to parse CARTESI_VALIDATOR_POLLING_INTERVAL: %v", err)) + panic(fmt.Sprintf("failed to parse RETRY_POLICY_MAX_DELAY: %v", err)) } return val } diff --git a/internal/espressoreader/espresso_reader.go b/internal/espressoreader/espresso_reader.go index 0f08873..62e60d1 100644 --- a/internal/espressoreader/espresso_reader.go +++ b/internal/espressoreader/espresso_reader.go @@ -22,6 +22,7 @@ import ( "github.com/cartesi/rollups-espresso-reader/internal/evmreader" "github.com/cartesi/rollups-espresso-reader/internal/model" "github.com/cartesi/rollups-espresso-reader/internal/repository" + "github.com/cartesi/rollups-espresso-reader/internal/services/retry" "github.com/EspressoSystems/espresso-sequencer-go/client" "github.com/ethereum/go-ethereum/common" @@ -37,11 +38,13 @@ type EspressoReader struct { evmReader *evmreader.EvmReader chainId uint64 inputBoxDeploymentBlock uint64 + maxRetries uint64 + maxDelay uint64 } -func NewEspressoReader(url string, startingBlock uint64, namespace uint64, repository repository.Repository, evmReader *evmreader.EvmReader, chainId uint64, inputBoxDeploymentBlock uint64) EspressoReader { +func NewEspressoReader(url string, startingBlock uint64, namespace uint64, repository repository.Repository, evmReader *evmreader.EvmReader, chainId uint64, inputBoxDeploymentBlock uint64, maxRetries uint64, maxDelay uint64) EspressoReader { client := client.NewClient(url) - return EspressoReader{url: url, client: *client, startingBlock: startingBlock, namespace: namespace, repository: repository, evmReader: evmReader, chainId: chainId, inputBoxDeploymentBlock: inputBoxDeploymentBlock} + return EspressoReader{url: url, client: *client, startingBlock: startingBlock, namespace: namespace, repository: repository, evmReader: evmReader, chainId: chainId, inputBoxDeploymentBlock: inputBoxDeploymentBlock, maxRetries: maxRetries, maxDelay: maxDelay} } func (e *EspressoReader) Run(ctx context.Context, ready chan<- struct{}) error { @@ -54,10 +57,16 @@ func (e *EspressoReader) Run(ctx context.Context, ready chan<- struct{}) error { return ctx.Err() default: // fetch latest espresso block height - latestBlockHeight, err := e.client.FetchLatestBlockHeight(ctx) + latestBlockHeight, err := retry.CallFunctionWithRetryPolicy( + e.client.FetchLatestBlockHeight, + ctx, + e.maxRetries, + time.Duration(e.maxDelay), + "EspressoReader::FetchLatestBlockHeight", + ) if err != nil { slog.Error("failed fetching latest espresso block height", "error", err) - continue + return err } slog.Debug("Espresso:", "latestBlockHeight", latestBlockHeight) @@ -368,6 +377,7 @@ func (e *EspressoReader) getL1FinalizedHeight(ctx context.Context, espressoBlock if len(espressoHeader) == 0 { slog.Error("error fetching espresso header", "at height", espressoBlockHeight, "header", espressoHeader) slog.Error("retrying fetching header") + time.Sleep(time.Duration(e.maxDelay)) continue } @@ -375,14 +385,14 @@ func (e *EspressoReader) getL1FinalizedHeight(ctx context.Context, espressoBlock l1FinalizedTimestampStr := gjson.Get(espressoHeader, "fields.l1_finalized.timestamp").Str if len(l1FinalizedTimestampStr) < 2 { slog.Debug("Espresso header not ready. Retry fetching", "height", espressoBlockHeight) - var delay time.Duration = 3000 - time.Sleep(delay * time.Millisecond) + time.Sleep(time.Duration(e.maxDelay)) continue } l1FinalizedTimestampInt, err := strconv.ParseInt(l1FinalizedTimestampStr[2:], 16, 64) if err != nil { slog.Error("hex to int conversion failed", "err", err) slog.Error("retrying") + time.Sleep(time.Duration(e.maxDelay)) continue } l1FinalizedTimestamp := uint64(l1FinalizedTimestampInt) @@ -403,12 +413,14 @@ func (e *EspressoReader) readEspressoHeadersByRange(ctx context.Context, from ui if err != nil { slog.Error("error making http request", "err", err) slog.Error("retrying") + time.Sleep(time.Duration(e.maxDelay)) continue } resBody, err := io.ReadAll(res.Body) if err != nil { slog.Error("could not read response body", "err", err) slog.Error("retrying") + time.Sleep(time.Duration(e.maxDelay)) continue } diff --git a/internal/espressoreader/espresso_reader_test.go b/internal/espressoreader/espresso_reader_test.go index 3711af5..6895d47 100644 --- a/internal/espressoreader/espresso_reader_test.go +++ b/internal/espressoreader/espresso_reader_test.go @@ -69,11 +69,11 @@ func (suite *EspressoReaderTestSuite) SetupSuite() { suite.c.EspressoBaseUrl, suite.c.EspressoStartingBlock, suite.c.EspressoNamespace, - suite.c.EvmReaderRetryPolicyMaxRetries, - suite.c.EvmReaderRetryPolicyMaxDelay, config.Value.ChainID, config.Value.InputBoxDeploymentBlock, suite.c.EspressoServiceEndpoint, + suite.c.MaxRetries, + suite.c.MaxDelay, ) go service.Start(suite.ctx, make(chan struct{}, 1)) // let reader run for some time diff --git a/internal/espressoreader/espressoreader_service.go b/internal/espressoreader/espressoreader_service.go index 33b213f..d63f0d6 100644 --- a/internal/espressoreader/espressoreader_service.go +++ b/internal/espressoreader/espressoreader_service.go @@ -48,11 +48,11 @@ func NewEspressoReaderService( EspressoBaseUrl string, EspressoStartingBlock uint64, EspressoNamespace uint64, - maxRetries uint64, - maxDelay time.Duration, chainId uint64, inputBoxDeploymentBlock uint64, espressoServiceEndpoint string, + maxRetries uint64, + maxDelay time.Duration, ) *EspressoReaderService { return &EspressoReaderService{ blockchainHttpEndpoint: blockchainHttpEndpoint, @@ -61,11 +61,11 @@ func NewEspressoReaderService( EspressoBaseUrl: EspressoBaseUrl, EspressoStartingBlock: EspressoStartingBlock, EspressoNamespace: EspressoNamespace, - maxRetries: maxRetries, - maxDelay: maxDelay, chainId: chainId, inputBoxDeploymentBlock: inputBoxDeploymentBlock, espressoServiceEndpoint: espressoServiceEndpoint, + maxRetries: maxRetries, + maxDelay: maxDelay, } } @@ -76,7 +76,7 @@ func (s *EspressoReaderService) Start( evmReader := s.setupEvmReader(ctx, s.database) - espressoReader := NewEspressoReader(s.EspressoBaseUrl, s.EspressoStartingBlock, s.EspressoNamespace, s.database, evmReader, s.chainId, s.inputBoxDeploymentBlock) + espressoReader := NewEspressoReader(s.EspressoBaseUrl, s.EspressoStartingBlock, s.EspressoNamespace, s.database, evmReader, s.chainId, s.inputBoxDeploymentBlock, s.maxRetries, uint64(s.maxDelay)) go s.setupNonceHttpServer() diff --git a/main.go b/main.go index 0b12441..9ee6e35 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( + "context" "fmt" "log/slog" "os" @@ -13,6 +14,7 @@ import ( "github.com/cartesi/rollups-espresso-reader/internal/model" "github.com/cartesi/rollups-espresso-reader/internal/repository" "github.com/cartesi/rollups-espresso-reader/internal/repository/factory" + "github.com/cartesi/rollups-espresso-reader/internal/services/retry" "github.com/cartesi/rollups-espresso-reader/internal/services/startup" "github.com/spf13/cobra" @@ -35,6 +37,12 @@ var Cmd = &cobra.Command{ Run: run, } +type loadNodeConfigArgs struct { + ctx context.Context + database repository.Repository + configKey string +} + func run(cmd *cobra.Command, args []string) { startTime := time.Now() @@ -57,16 +65,17 @@ func run(cmd *cobra.Command, args []string) { // load node configuration max_attempts := 10 var config *model.NodeConfig[model.NodeConfigValue] - for i := 1; i <= max_attempts; i++ { - config, err = repository.LoadNodeConfig[model.NodeConfigValue](ctx, database, model.BaseConfigKey) - if err == nil { - break - } - slog.Warn("Failed to load configuration, retrying...", "attempt", i, "error", err) - if i < max_attempts { - time.Sleep(3 * time.Second) - } - } + config, err = retry.CallFunctionWithRetryPolicy( + loadNodeConfig, + loadNodeConfigArgs{ + ctx: ctx, + database: database, + configKey: model.BaseConfigKey, + }, + c.MaxRetries, + c.MaxDelay, + "Main::LoadNodeConfig", + ) if err != nil { slog.Error(fmt.Sprintf("Failed to load configuration after %d attempts", max_attempts), "error", err) os.Exit(1) @@ -80,11 +89,11 @@ func run(cmd *cobra.Command, args []string) { c.EspressoBaseUrl, c.EspressoStartingBlock, c.EspressoNamespace, - c.EvmReaderRetryPolicyMaxRetries, - c.EvmReaderRetryPolicyMaxDelay, config.Value.ChainID, config.Value.InputBoxDeploymentBlock, c.EspressoServiceEndpoint, + c.MaxRetries, + c.MaxDelay, ) // logs startup time @@ -105,6 +114,10 @@ func run(cmd *cobra.Command, args []string) { } } +func loadNodeConfig(args loadNodeConfigArgs) (*model.NodeConfig[model.NodeConfigValue], error) { + return repository.LoadNodeConfig[model.NodeConfigValue](args.ctx, args.database, args.configKey) +} + func main() { err := Cmd.Execute() if err != nil {