Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add max tries and interval #18

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ type NodeConfig struct {
BlockchainWsEndpoint Redacted[string]
LegacyBlockchainEnabled bool
EvmReaderDefaultBlock DefaultBlock
EvmReaderRetryPolicyMaxRetries uint64
EvmReaderRetryPolicyMaxDelay Duration
BlockchainBlockTimeout int
SnapshotDir string
PostgresEndpoint Redacted[string]
Expand All @@ -36,6 +34,8 @@ type NodeConfig struct {
EspressoStartingBlock uint64
EspressoNamespace uint64
EspressoServiceEndpoint string
MaxRetries uint64
MaxDelay Duration
}

// Auth is used to sign transactions.
Expand Down Expand Up @@ -76,8 +76,6 @@ func FromEnv() NodeConfig {
config.BlockchainWsEndpoint = Redacted[string]{GetBlockchainWsEndpoint()}
config.LegacyBlockchainEnabled = GetLegacyBlockchainEnabled()
config.EvmReaderDefaultBlock = GetEvmReaderDefaultBlock()
config.EvmReaderRetryPolicyMaxRetries = GetEvmReaderRetryPolicyMaxRetries()
config.EvmReaderRetryPolicyMaxDelay = GetEvmReaderRetryPolicyMaxDelay()
config.BlockchainBlockTimeout = GetBlockchainBlockTimeout()
config.SnapshotDir = GetSnapshotDir()
config.PostgresEndpoint = Redacted[string]{GetPostgresEndpoint()}
Expand All @@ -95,6 +93,8 @@ func FromEnv() NodeConfig {
config.EspressoStartingBlock = GetStartingBlock()
config.EspressoNamespace = GetNamespace()
config.EspressoServiceEndpoint = GetServiceEndpoint()
config.MaxRetries = GetPolicyMaxRetries()
config.MaxDelay = GetPolicyMaxDelay()
return config
}

Expand Down
24 changes: 12 additions & 12 deletions internal/config/generate/Config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ the snapshot matches the hash in the Application contract."""
# Rollups
#

[rollups.CARTESI_EVM_READER_RETRY_POLICY_MAX_RETRIES]
default = "3"
go-type = "uint64"
description = """
How many times some functions should be retried after an error."""

[rollups.CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY]
default = "3"
go-type = "Duration"
description = """
How many seconds the retry policy will wait between retries."""

[rollups.CARTESI_ADVANCER_POLLING_INTERVAL]
default = "7"
go-type = "Duration"
Expand Down Expand Up @@ -224,6 +212,18 @@ go-type = "string"
description = """
URL to Espresso nonce and submit service"""

[espresso.RETRY_POLICY_MAX_RETRIES]
default = "10"
go-type = "uint64"
description = """
How many times some functions should be retried after an error."""

[rollups.RETRY_POLICY_MAX_DELAY]
default = "2"
go-type = "Duration"
description = """
How many seconds the retry policy will wait between retries."""

#
# Temporary
#
Expand Down
40 changes: 20 additions & 20 deletions internal/config/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 18 additions & 6 deletions internal/espressoreader/espresso_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cartesi/rollups-espresso-reader/internal/evmreader"
"github.com/cartesi/rollups-espresso-reader/internal/model"
"github.com/cartesi/rollups-espresso-reader/internal/repository"
"github.com/cartesi/rollups-espresso-reader/internal/services/retry"

"github.com/EspressoSystems/espresso-sequencer-go/client"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -37,11 +38,13 @@ type EspressoReader struct {
evmReader *evmreader.EvmReader
chainId uint64
inputBoxDeploymentBlock uint64
maxRetries uint64
maxDelay uint64
}

func NewEspressoReader(url string, startingBlock uint64, namespace uint64, repository repository.Repository, evmReader *evmreader.EvmReader, chainId uint64, inputBoxDeploymentBlock uint64) EspressoReader {
func NewEspressoReader(url string, startingBlock uint64, namespace uint64, repository repository.Repository, evmReader *evmreader.EvmReader, chainId uint64, inputBoxDeploymentBlock uint64, maxRetries uint64, maxDelay uint64) EspressoReader {
client := client.NewClient(url)
return EspressoReader{url: url, client: *client, startingBlock: startingBlock, namespace: namespace, repository: repository, evmReader: evmReader, chainId: chainId, inputBoxDeploymentBlock: inputBoxDeploymentBlock}
return EspressoReader{url: url, client: *client, startingBlock: startingBlock, namespace: namespace, repository: repository, evmReader: evmReader, chainId: chainId, inputBoxDeploymentBlock: inputBoxDeploymentBlock, maxRetries: maxRetries, maxDelay: maxDelay}
}

func (e *EspressoReader) Run(ctx context.Context, ready chan<- struct{}) error {
Expand All @@ -54,10 +57,16 @@ func (e *EspressoReader) Run(ctx context.Context, ready chan<- struct{}) error {
return ctx.Err()
default:
// fetch latest espresso block height
latestBlockHeight, err := e.client.FetchLatestBlockHeight(ctx)
latestBlockHeight, err := retry.CallFunctionWithRetryPolicy(
e.client.FetchLatestBlockHeight,
ctx,
e.maxRetries,
time.Duration(e.maxDelay),
"EspressoReader::FetchLatestBlockHeight",
)
if err != nil {
slog.Error("failed fetching latest espresso block height", "error", err)
continue
return err
}
slog.Debug("Espresso:", "latestBlockHeight", latestBlockHeight)

Expand Down Expand Up @@ -368,21 +377,22 @@ func (e *EspressoReader) getL1FinalizedHeight(ctx context.Context, espressoBlock
if len(espressoHeader) == 0 {
slog.Error("error fetching espresso header", "at height", espressoBlockHeight, "header", espressoHeader)
slog.Error("retrying fetching header")
time.Sleep(time.Duration(e.maxDelay))
continue
}

l1FinalizedNumber := gjson.Get(espressoHeader, "fields.l1_finalized.number").Uint()
l1FinalizedTimestampStr := gjson.Get(espressoHeader, "fields.l1_finalized.timestamp").Str
if len(l1FinalizedTimestampStr) < 2 {
slog.Debug("Espresso header not ready. Retry fetching", "height", espressoBlockHeight)
var delay time.Duration = 3000
time.Sleep(delay * time.Millisecond)
time.Sleep(time.Duration(e.maxDelay))
continue
}
l1FinalizedTimestampInt, err := strconv.ParseInt(l1FinalizedTimestampStr[2:], 16, 64)
if err != nil {
slog.Error("hex to int conversion failed", "err", err)
slog.Error("retrying")
time.Sleep(time.Duration(e.maxDelay))
continue
}
l1FinalizedTimestamp := uint64(l1FinalizedTimestampInt)
Expand All @@ -403,12 +413,14 @@ func (e *EspressoReader) readEspressoHeadersByRange(ctx context.Context, from ui
if err != nil {
slog.Error("error making http request", "err", err)
slog.Error("retrying")
time.Sleep(time.Duration(e.maxDelay))
continue
}
resBody, err := io.ReadAll(res.Body)
if err != nil {
slog.Error("could not read response body", "err", err)
slog.Error("retrying")
time.Sleep(time.Duration(e.maxDelay))
continue
}

Expand Down
4 changes: 2 additions & 2 deletions internal/espressoreader/espresso_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ func (suite *EspressoReaderTestSuite) SetupSuite() {
suite.c.EspressoBaseUrl,
suite.c.EspressoStartingBlock,
suite.c.EspressoNamespace,
suite.c.EvmReaderRetryPolicyMaxRetries,
suite.c.EvmReaderRetryPolicyMaxDelay,
config.Value.ChainID,
config.Value.InputBoxDeploymentBlock,
suite.c.EspressoServiceEndpoint,
suite.c.MaxRetries,
suite.c.MaxDelay,
)
go service.Start(suite.ctx, make(chan struct{}, 1))
// let reader run for some time
Expand Down
10 changes: 5 additions & 5 deletions internal/espressoreader/espressoreader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func NewEspressoReaderService(
EspressoBaseUrl string,
EspressoStartingBlock uint64,
EspressoNamespace uint64,
maxRetries uint64,
maxDelay time.Duration,
chainId uint64,
inputBoxDeploymentBlock uint64,
espressoServiceEndpoint string,
maxRetries uint64,
maxDelay time.Duration,
) *EspressoReaderService {
return &EspressoReaderService{
blockchainHttpEndpoint: blockchainHttpEndpoint,
Expand All @@ -61,11 +61,11 @@ func NewEspressoReaderService(
EspressoBaseUrl: EspressoBaseUrl,
EspressoStartingBlock: EspressoStartingBlock,
EspressoNamespace: EspressoNamespace,
maxRetries: maxRetries,
maxDelay: maxDelay,
chainId: chainId,
inputBoxDeploymentBlock: inputBoxDeploymentBlock,
espressoServiceEndpoint: espressoServiceEndpoint,
maxRetries: maxRetries,
maxDelay: maxDelay,
}
}

Expand All @@ -76,7 +76,7 @@ func (s *EspressoReaderService) Start(

evmReader := s.setupEvmReader(ctx, s.database)

espressoReader := NewEspressoReader(s.EspressoBaseUrl, s.EspressoStartingBlock, s.EspressoNamespace, s.database, evmReader, s.chainId, s.inputBoxDeploymentBlock)
espressoReader := NewEspressoReader(s.EspressoBaseUrl, s.EspressoStartingBlock, s.EspressoNamespace, s.database, evmReader, s.chainId, s.inputBoxDeploymentBlock, s.maxRetries, uint64(s.maxDelay))

go s.setupNonceHttpServer()

Expand Down
37 changes: 25 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"fmt"
"log/slog"
"os"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/cartesi/rollups-espresso-reader/internal/model"
"github.com/cartesi/rollups-espresso-reader/internal/repository"
"github.com/cartesi/rollups-espresso-reader/internal/repository/factory"
"github.com/cartesi/rollups-espresso-reader/internal/services/retry"
"github.com/cartesi/rollups-espresso-reader/internal/services/startup"

"github.com/spf13/cobra"
Expand All @@ -35,6 +37,12 @@ var Cmd = &cobra.Command{
Run: run,
}

type loadNodeConfigArgs struct {
ctx context.Context
database repository.Repository
configKey string
}

func run(cmd *cobra.Command, args []string) {
startTime := time.Now()

Expand All @@ -57,16 +65,17 @@ func run(cmd *cobra.Command, args []string) {
// load node configuration
max_attempts := 10
var config *model.NodeConfig[model.NodeConfigValue]
for i := 1; i <= max_attempts; i++ {
config, err = repository.LoadNodeConfig[model.NodeConfigValue](ctx, database, model.BaseConfigKey)
if err == nil {
break
}
slog.Warn("Failed to load configuration, retrying...", "attempt", i, "error", err)
if i < max_attempts {
time.Sleep(3 * time.Second)
}
}
config, err = retry.CallFunctionWithRetryPolicy(
loadNodeConfig,
loadNodeConfigArgs{
ctx: ctx,
database: database,
configKey: model.BaseConfigKey,
},
c.MaxRetries,
c.MaxDelay,
"Main::LoadNodeConfig",
)
if err != nil {
slog.Error(fmt.Sprintf("Failed to load configuration after %d attempts", max_attempts), "error", err)
os.Exit(1)
Expand All @@ -80,11 +89,11 @@ func run(cmd *cobra.Command, args []string) {
c.EspressoBaseUrl,
c.EspressoStartingBlock,
c.EspressoNamespace,
c.EvmReaderRetryPolicyMaxRetries,
c.EvmReaderRetryPolicyMaxDelay,
config.Value.ChainID,
config.Value.InputBoxDeploymentBlock,
c.EspressoServiceEndpoint,
c.MaxRetries,
c.MaxDelay,
)

// logs startup time
Expand All @@ -105,6 +114,10 @@ func run(cmd *cobra.Command, args []string) {
}
}

func loadNodeConfig(args loadNodeConfigArgs) (*model.NodeConfig[model.NodeConfigValue], error) {
return repository.LoadNodeConfig[model.NodeConfigValue](args.ctx, args.database, args.configKey)
}

func main() {
err := Cmd.Execute()
if err != nil {
Expand Down