Skip to content

Commit

Permalink
event-exporter: add retries on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
xxbbxb committed Sep 17, 2024
1 parent e2e10fd commit 6d25408
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
51 changes: 39 additions & 12 deletions exporters/events/cmd/events-exporter/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -48,6 +49,7 @@ type HeadReader struct {
wsclient *client.RPCClient
decoder *decoder.Decoder
log *logrus.Logger
rw *sync.Mutex
paraSessionValidators sync.Map //map[uint32][]string
sessionValidators sync.Map //map[uint32][]string
cfg Config
Expand Down Expand Up @@ -77,6 +79,7 @@ func NewHeadReader(l *logrus.Logger, cfg Config, ctx context.Context) (*HeadRead
identities: make(map[string]string),
log: l,
cfg: cfg,
rw: &sync.Mutex{},
}
for _, path := range cfg.KnownValidatorCfg {
l.Infof("loading config from %s", path)
Expand Down Expand Up @@ -150,12 +153,13 @@ func (reader *HeadReader) ProcessBlockParaVotes(ctx context.Context, hash string
votedValidators = append(votedValidators, groupVotes.MustInt("col1"))
// can go deeper and check backable (explicit)/seconded (implicit) statements if need be
}
missingVotes := getMissingValidatorsFrom(validatorGroups, votedValidators)
for _, vv := range votedValidators {
if reader.registry == nil || reader.GetValidatorsHostname(psValidators[vv]) != "" {
reader.mon.ProcessEvent(MetricBackingVotesMissedCount, 0, reader.LabelValues(psValidators[vv])...)
reader.mon.ProcessEvent(MetricBackingVotesExpectedCount, 1, reader.LabelValues(psValidators[vv])...)
}
}
missingVotes := getMissingValidatorsFrom(validatorGroups, votedValidators)
if len(missingVotes) > 0 {
for _, mv := range missingVotes {
if reader.registry == nil || reader.GetValidatorsHostname(psValidators[mv]) != "" {
Expand Down Expand Up @@ -285,24 +289,43 @@ func (reader *HeadReader) Read(ctx context.Context) error {
// handle input hashes
g.Go(func() error {
for {
callCtx, cancel := context.WithTimeout(ctx, 120*time.Second)
defer cancel()
select {
case h := <-headHashes:
if err := reader.ProcessBlockEvents(callCtx, h); err != nil {
return err
}
blockctx, cancel := context.WithTimeout(ctx, 60*time.Second)
start := time.Now()
g, ctx := errgroup.WithContext(blockctx)
g.Go(func() error {
return retry.Do(
func() error {
return reader.ProcessBlockEvents(ctx, h)
},
retry.Context(blockctx),
retry.Delay(time.Second),
)
})
if reader.cfg.ExposeParaVotes {
if err := reader.ProcessBlockParaVotes(callCtx, h); err != nil {
return err
}
g.Go(func() error {
return retry.Do(
func() error {
return reader.ProcessBlockParaVotes(ctx, h)
},
retry.Context(blockctx),
retry.Delay(time.Second),
)
})
}
err := g.Wait()
cancel()
duration := time.Since(start).Seconds()
if duration > 5.0 {
reader.log.Infof("slow block %s processing took: %.2f sec", h, time.Since(start).Seconds())
}
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-callCtx.Done():
return callCtx.Err()
}
cancel()
}
})
if err := g.Wait(); err != nil {
Expand Down Expand Up @@ -384,6 +407,8 @@ func (reader *HeadReader) HandleAllEvents(ctx context.Context, hash string, even

// TODO: cleanup
func (reader *HeadReader) GetParaSessionValidators(ctx context.Context, hash string, session uint32) []string {
reader.rw.Lock()
defer reader.rw.Unlock()
if _, ok := reader.paraSessionValidators.Load(session); !ok {
req, _ := reader.decoder.NewStorageRequest("paraSessionInfo", "accountKeys", session)
resp, err := reader.wsclient.StateGetStorage(ctx, req, hash)
Expand Down Expand Up @@ -416,6 +441,8 @@ func (reader *HeadReader) GetParaSessionValidatorBy(ctx context.Context, hash st

// TODO: cleanup
func (reader *HeadReader) GetSessionValidators(ctx context.Context, hash string, session uint32) []string {
reader.rw.Lock()
defer reader.rw.Unlock()
if session == 0 {
reader.sessionValidators.Range(func(key, value any) bool {
i := key.(uint32)
Expand Down
1 change: 1 addition & 0 deletions exporters/events/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
require github.com/kr/text v0.2.0 // indirect

require (
github.com/avast/retry-go/v4 v4.6.0
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/decred/base58 v1.0.5 // indirect
Expand Down
6 changes: 4 additions & 2 deletions exporters/events/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/ChainSafe/go-schnorrkel v1.0.0 h1:3aDA67lAykLaG1y3AOjs88dMxC88PgUuHRrLeDnvGIM=
github.com/ChainSafe/go-schnorrkel v1.0.0/go.mod h1:dpzHYVxLZcp8pjlV+O+UR8K0Hp/z7vcchBSbMBEhCw4=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
Expand Down Expand Up @@ -60,8 +62,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vedhavyas/go-subkey/v2 v2.0.0 h1:LemDIsrVtRSOkp0FA8HxP6ynfKjeOj3BY2U9UNfeDMA=
github.com/vedhavyas/go-subkey/v2 v2.0.0/go.mod h1:95aZ+XDCWAUUynjlmi7BtPExjXgXxByE0WfBwbmIRH4=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down

0 comments on commit 6d25408

Please sign in to comment.