From 6d30397448e02b038d8d2faada2fa787c6fad814 Mon Sep 17 00:00:00 2001 From: pk910 Date: Tue, 15 Aug 2023 18:46:48 +0200 Subject: [PATCH] fixed indexer skipping epoch 0 & handle epoch 0 validators properly --- indexer/indexer.go | 57 ++++++++++++++++++++++------------------- indexer/synchronizer.go | 2 +- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 5a944be4..2ca46998 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -32,17 +32,17 @@ type Indexer struct { } type indexerState struct { - lastHeadBlock uint64 - lastHeadRoot []byte - lastFinalizedBlock uint64 - cacheMutex sync.RWMutex - cachedBlocks map[uint64][]*BlockInfo - epochStats map[uint64]*EpochStats - headValidators *rpctypes.StandardV1StateValidatorsResponse - headValidatorsSlot uint64 - lowestCachedSlot int64 - highestCachedSlot int64 - lastProcessedEpoch uint64 + lastHeadBlock uint64 + lastHeadRoot []byte + lastFinalizedBlock uint64 + cacheMutex sync.RWMutex + cachedBlocks map[uint64][]*BlockInfo + epochStats map[uint64]*EpochStats + headValidators *rpctypes.StandardV1StateValidatorsResponse + headValidatorsEpoch int64 + lowestCachedSlot int64 + highestCachedSlot int64 + lastProcessedEpoch int64 } type EpochStats struct { @@ -84,10 +84,12 @@ func NewIndexer(rpcClient *rpc.BeaconClient) (*Indexer, error) { inMemoryEpochs: inMemoryEpochs, epochProcessingDelay: epochProcessingDelay, state: indexerState{ - cachedBlocks: make(map[uint64][]*BlockInfo), - epochStats: make(map[uint64]*EpochStats), - lowestCachedSlot: -1, - highestCachedSlot: -1, + cachedBlocks: make(map[uint64][]*BlockInfo), + epochStats: make(map[uint64]*EpochStats), + headValidatorsEpoch: -1, + lowestCachedSlot: -1, + highestCachedSlot: -1, + lastProcessedEpoch: -1, }, }, nil } @@ -289,7 +291,7 @@ func (indexer *Indexer) runIndexerLoop() error { indexer.state.lastHeadBlock = uint64((currentEpoch-int64(indexer.prepopulateEpochs)+1)*int64(chainConfig.SlotsPerEpoch)) - 1 } if currentEpoch > int64(indexer.epochProcessingDelay) { - indexer.state.lastProcessedEpoch = uint64(currentEpoch - int64(indexer.epochProcessingDelay)) + indexer.state.lastProcessedEpoch = currentEpoch - int64(indexer.epochProcessingDelay) } } @@ -308,7 +310,7 @@ func (indexer *Indexer) runIndexerLoop() error { if indexer.writeDb { syncState := dbtypes.IndexerSyncState{} db.GetExplorerState("indexer.syncstate", &syncState) - if syncState.Epoch < indexer.state.lastProcessedEpoch { + if int64(syncState.Epoch) < indexer.state.lastProcessedEpoch { indexer.startSynchronization(syncState.Epoch) } } @@ -522,7 +524,7 @@ func (indexer *Indexer) processHeadBlock(slot uint64, header *rpctypes.StandardV logger.Errorf("Large chain reorg detected, resync needed") // TODO: Start synchronization } else { - reorgMinEpoch := utils.EpochOfSlot(uint64(canonicalBlock.Header.Data.Header.Message.Slot)) + reorgMinEpoch := int64(utils.EpochOfSlot(uint64(canonicalBlock.Header.Data.Header.Message.Slot))) if reorgMinEpoch <= indexer.state.lastProcessedEpoch { logger.Infof("Chain reorg touched processed epochs, reset epoch processing to %v", reorgMinEpoch-1) indexer.state.lastProcessedEpoch = reorgMinEpoch - 1 @@ -561,7 +563,7 @@ func (indexer *Indexer) newEpochStats(epoch uint64, dependentRoot []byte) (*Epoc indexer.state.cacheMutex.Lock() defer indexer.state.cacheMutex.Unlock() - if epoch < indexer.state.lastProcessedEpoch { + if int64(epoch) < indexer.state.lastProcessedEpoch { return nil, false, false } oldEpochStats := indexer.state.epochStats[epoch] @@ -627,9 +629,12 @@ func (indexer *Indexer) loadEpochValidators(epoch uint64, epochStats *EpochStats if err != nil { logger.Errorf("Error fetching epoch %v validators: %v", epoch, err) } else { - if epoch > indexer.state.headValidatorsSlot { + indexer.state.cacheMutex.Lock() + if int64(epoch) > indexer.state.headValidatorsEpoch { + indexer.state.headValidatorsEpoch = int64(epoch) indexer.state.headValidators = epochValidators } + indexer.state.cacheMutex.Unlock() epochStats.Validators.ValidatorsStatsMutex.Lock() for idx := 0; idx < len(epochValidators.Data); idx++ { validator := epochValidators.Data[idx] @@ -648,13 +653,11 @@ func (indexer *Indexer) loadEpochValidators(epoch uint64, epochStats *EpochStats func (indexer *Indexer) processIndexing() { // process old epochs currentEpoch := utils.EpochOfSlot(indexer.state.lastHeadBlock) - if currentEpoch >= uint64(indexer.epochProcessingDelay) { - maxProcessEpoch := currentEpoch - uint64(indexer.epochProcessingDelay) - for indexer.state.lastProcessedEpoch < maxProcessEpoch { - processEpoch := indexer.state.lastProcessedEpoch + 1 - indexer.processEpoch(processEpoch) - indexer.state.lastProcessedEpoch = processEpoch - } + maxProcessEpoch := int64(currentEpoch) - int64(indexer.epochProcessingDelay) + for indexer.state.lastProcessedEpoch < maxProcessEpoch { + processEpoch := indexer.state.lastProcessedEpoch + 1 + indexer.processEpoch(uint64(processEpoch)) + indexer.state.lastProcessedEpoch = processEpoch } } diff --git a/indexer/synchronizer.go b/indexer/synchronizer.go index 522b623b..fec5c117 100644 --- a/indexer/synchronizer.go +++ b/indexer/synchronizer.go @@ -90,7 +90,7 @@ func (sync *synchronizerState) runSync() { syncEpoch++ sync.currentEpoch = syncEpoch sync.stateMutex.Unlock() - if syncEpoch > indexerEpoch { + if int64(syncEpoch) > indexerEpoch { isComplete = true break }