Skip to content

Commit

Permalink
made indexer more resilient against RPC issues / startup problems
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 15, 2023
1 parent 68fe3fd commit cd01a8c
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 19 deletions.
6 changes: 3 additions & 3 deletions cmd/explorer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

func main() {
configPath := flag.String("config", "", "Path to the config file, if empty string defaults will be used")

flag.Parse()

cfg := &types.Config{}
Expand All @@ -30,9 +29,10 @@ func main() {
utils.Config = cfg
logWriter := utils.InitLogger()
defer logWriter.Dispose()

logger.WithFields(logger.Fields{
"config": *configPath,
//"version": version.Version,
"config": *configPath,
"version": utils.BuildVersion,
"chainName": utils.Config.Chain.Config.ConfigName}).Printf("starting")

if utils.Config.Chain.Config.SlotsPerEpoch == 0 || utils.Config.Chain.Config.SecondsPerSlot == 0 {
Expand Down
37 changes: 35 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,36 @@ func (indexer *Indexer) runIndexer() {
indexer.runMutex.Lock()
defer indexer.runMutex.Unlock()

for {
genesis, err := indexer.rpcClient.GetGenesis()
if err != nil {
logger.Errorf("Indexer Error while fetching genesis: %v", err)
} else if genesis != nil {
genesisTime := uint64(genesis.Data.GenesisTime)
logger.Infof("RPC Genesis: Time: %v, ForkVersion: %v, GVR: %v", genesisTime, genesis.Data.GenesisForkVersion, genesis.Data.GenesisValidatorsRoot)
if genesisTime != utils.Config.Chain.GenesisTimestamp {
logger.Warnf("Genesis time from RPC does not match the genesis time from explorer configuration.")
}
if genesis.Data.GenesisForkVersion.String() != utils.Config.Chain.Config.GenesisForkVersion {
logger.Warnf("Genesis fork version from RPC does not match the genesis fork version explorer configuration.")
}

err := indexer.runIndexerLoop()
if err == nil {
break
}
}

logger.Warnf("Indexer couldn't do stuff it is supposed to do. Retrying in 10 sec...")
select {
case <-time.After(10 * time.Second):
}
}

logger.Debugf("Indexer process shutdown")
}

func (indexer *Indexer) runIndexerLoop() error {
chainConfig := utils.Config.Chain.Config
genesisTime := time.Unix(int64(utils.Config.Chain.GenesisTimestamp), 0)

Expand All @@ -268,6 +298,7 @@ func (indexer *Indexer) runIndexer() {
err := indexer.pollHeadBlock()
if err != nil {
logger.Errorf("Indexer Error while polling latest head: %v", err)
return err
}

// start block stream
Expand Down Expand Up @@ -314,11 +345,13 @@ func (indexer *Indexer) runIndexer() {
}
}

//now := time.Now()
now := time.Now()
indexer.processIndexing()
indexer.processCacheCleanup()
//logger.Infof("indexer loop processing time: %v ms", time.Now().Sub(now).Milliseconds())
logger.Debugf("indexer loop processing time: %v ms", time.Now().Sub(now).Milliseconds())
}

return nil
}

func (indexer *Indexer) startSynchronization(startEpoch uint64) error {
Expand Down
12 changes: 7 additions & 5 deletions rpc/beaconapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"time"

"github.com/ethereum/go-ethereum/common/lru"
logger "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"

"github.com/pk910/light-beaconchain-explorer/rpctypes"
"github.com/pk910/light-beaconchain-explorer/utils"
)

var logger = logrus.StandardLogger().WithField("module", "rpc")

type BeaconClient struct {
endpoint string
assignmentsCache *lru.Cache[uint64, *rpctypes.EpochAssignments]
Expand All @@ -39,8 +41,8 @@ func NewBeaconClient(endpoint string, assignmentsCacheSize int) (*BeaconClient,
var errNotFound = errors.New("not found 404")

func (bc *BeaconClient) get(url string) ([]byte, error) {
//t0 := time.Now()
//defer func() { fmt.Println("RPC GET: ", url, time.Since(t0)) }()
t0 := time.Now()
defer func() { logger.Debugf("RPC call (byte): %v [%v ms]", url, time.Since(t0).Milliseconds()) }()
client := &http.Client{Timeout: time.Second * 120}
resp, err := client.Get(url)
if err != nil {
Expand All @@ -62,8 +64,8 @@ func (bc *BeaconClient) get(url string) ([]byte, error) {
}

func (bc *BeaconClient) getJson(url string, returnValue interface{}) error {
//t0 := time.Now()
//defer func() { fmt.Println("RPC GET (json): ", url, time.Since(t0)) }()
t0 := time.Now()
defer func() { logger.Debugf("RPC call (json): %v [%v ms]", url, time.Since(t0).Milliseconds()) }()
client := &http.Client{Timeout: time.Second * 120}
resp, err := client.Get(url)
if err != nil {
Expand Down
40 changes: 31 additions & 9 deletions rpc/beaconstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/donovanhide/eventsource"
logger "github.com/sirupsen/logrus"

"github.com/pk910/light-beaconchain-explorer/rpctypes"
)
Expand Down Expand Up @@ -58,40 +57,62 @@ func (bs *BeaconStream) startStream(endpoint string) {
bs.runMutex.Lock()
defer bs.runMutex.Unlock()

stream, err := eventsource.Subscribe(fmt.Sprintf("%s/eth/v1/events?topics=block,head", endpoint), "")
if err != nil {
logger.Errorf("Error while subscribing beacon block stream: %v", err)
} else {
defer stream.Close()

stream := bs.subscribeStream(endpoint)
if stream != nil {
running := true
for running {
select {
case evt := <-stream.Events:
logger.Debugf("Event received from rpc event stream: %v", evt.Event())
if evt.Event() == "block" {
bs.processBlockEvent(evt)
} else if evt.Event() == "head" {
bs.processHeadEvent(evt)
}
case <-bs.killChan:
running = false
case <-time.After(120 * time.Second):
// timeout - no block since 2 mins
case <-time.After(300 * time.Second):
// timeout - no block since 5 mins
logger.Errorf("beacon block stream error, no new head retrieved since %v (%v ago)", bs.lastHeadSeen, time.Since(bs.lastHeadSeen))
stream.Close()
stream = bs.subscribeStream(endpoint)
if stream == nil {
running = false
}
}
}
}
if stream != nil {
stream.Close()
}
bs.running = false
bs.CloseChan <- true
}

func (bs *BeaconStream) subscribeStream(endpoint string) *eventsource.Stream {
for {
stream, err := eventsource.Subscribe(fmt.Sprintf("%s/eth/v1/events?topics=block,head", endpoint), "")
if err != nil {
logger.Errorf("Error while subscribing beacon event stream: %v", err)
select {
case <-bs.killChan:
return nil
case <-time.After(10 * time.Second):
}
} else {
return stream
}
}
}

func (bs *BeaconStream) processBlockEvent(evt eventsource.Event) {
var parsed rpctypes.StandardV1StreamedBlockEvent
err := json.Unmarshal([]byte(evt.Data()), &parsed)
if err != nil {
logger.Warnf("beacon block stream failed to decode block event: %v", err)
return
}
logger.Debugf("RPC block event! slot: %v, block: %v", parsed.Slot, parsed.Block)
bs.BlockChan <- &parsed
}

Expand All @@ -102,6 +123,7 @@ func (bs *BeaconStream) processHeadEvent(evt eventsource.Event) {
logger.Warnf("beacon block stream failed to decode block event: %v", err)
return
}
logger.Debugf("RPC head event! slot: %v, block: %v, state: %v", parsed.Slot, parsed.Block, parsed.State)
bs.lastHeadSeen = time.Now()
bs.HeadChan <- &parsed
}
1 change: 1 addition & 0 deletions utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func ReadConfig(cfg *types.Config, path string) error {
"depositContractAddress": cfg.Chain.Config.DepositContractAddress,
}).Infof("did init config")

cfg.Logging.OutputLevel = "debug"
return nil
}

Expand Down

0 comments on commit cd01a8c

Please sign in to comment.