Skip to content

Commit

Permalink
fixed indexer skipping epoch 0 & handle epoch 0 validators properly
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 15, 2023
1 parent 8b5b403 commit 6d30397
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 28 deletions.
57 changes: 30 additions & 27 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ type Indexer struct {
}

type indexerState struct {
lastHeadBlock uint64
lastHeadRoot []byte
lastFinalizedBlock uint64
cacheMutex sync.RWMutex
cachedBlocks map[uint64][]*BlockInfo
epochStats map[uint64]*EpochStats
headValidators *rpctypes.StandardV1StateValidatorsResponse
headValidatorsSlot uint64
lowestCachedSlot int64
highestCachedSlot int64
lastProcessedEpoch uint64
lastHeadBlock uint64
lastHeadRoot []byte
lastFinalizedBlock uint64
cacheMutex sync.RWMutex
cachedBlocks map[uint64][]*BlockInfo
epochStats map[uint64]*EpochStats
headValidators *rpctypes.StandardV1StateValidatorsResponse
headValidatorsEpoch int64
lowestCachedSlot int64
highestCachedSlot int64
lastProcessedEpoch int64
}

type EpochStats struct {
Expand Down Expand Up @@ -84,10 +84,12 @@ func NewIndexer(rpcClient *rpc.BeaconClient) (*Indexer, error) {
inMemoryEpochs: inMemoryEpochs,
epochProcessingDelay: epochProcessingDelay,
state: indexerState{
cachedBlocks: make(map[uint64][]*BlockInfo),
epochStats: make(map[uint64]*EpochStats),
lowestCachedSlot: -1,
highestCachedSlot: -1,
cachedBlocks: make(map[uint64][]*BlockInfo),
epochStats: make(map[uint64]*EpochStats),
headValidatorsEpoch: -1,
lowestCachedSlot: -1,
highestCachedSlot: -1,
lastProcessedEpoch: -1,
},
}, nil
}
Expand Down Expand Up @@ -289,7 +291,7 @@ func (indexer *Indexer) runIndexerLoop() error {
indexer.state.lastHeadBlock = uint64((currentEpoch-int64(indexer.prepopulateEpochs)+1)*int64(chainConfig.SlotsPerEpoch)) - 1
}
if currentEpoch > int64(indexer.epochProcessingDelay) {
indexer.state.lastProcessedEpoch = uint64(currentEpoch - int64(indexer.epochProcessingDelay))
indexer.state.lastProcessedEpoch = currentEpoch - int64(indexer.epochProcessingDelay)
}
}

Expand All @@ -308,7 +310,7 @@ func (indexer *Indexer) runIndexerLoop() error {
if indexer.writeDb {
syncState := dbtypes.IndexerSyncState{}
db.GetExplorerState("indexer.syncstate", &syncState)
if syncState.Epoch < indexer.state.lastProcessedEpoch {
if int64(syncState.Epoch) < indexer.state.lastProcessedEpoch {
indexer.startSynchronization(syncState.Epoch)
}
}
Expand Down Expand Up @@ -522,7 +524,7 @@ func (indexer *Indexer) processHeadBlock(slot uint64, header *rpctypes.StandardV
logger.Errorf("Large chain reorg detected, resync needed")
// TODO: Start synchronization
} else {
reorgMinEpoch := utils.EpochOfSlot(uint64(canonicalBlock.Header.Data.Header.Message.Slot))
reorgMinEpoch := int64(utils.EpochOfSlot(uint64(canonicalBlock.Header.Data.Header.Message.Slot)))
if reorgMinEpoch <= indexer.state.lastProcessedEpoch {
logger.Infof("Chain reorg touched processed epochs, reset epoch processing to %v", reorgMinEpoch-1)
indexer.state.lastProcessedEpoch = reorgMinEpoch - 1
Expand Down Expand Up @@ -561,7 +563,7 @@ func (indexer *Indexer) newEpochStats(epoch uint64, dependentRoot []byte) (*Epoc
indexer.state.cacheMutex.Lock()
defer indexer.state.cacheMutex.Unlock()

if epoch < indexer.state.lastProcessedEpoch {
if int64(epoch) < indexer.state.lastProcessedEpoch {
return nil, false, false
}
oldEpochStats := indexer.state.epochStats[epoch]
Expand Down Expand Up @@ -627,9 +629,12 @@ func (indexer *Indexer) loadEpochValidators(epoch uint64, epochStats *EpochStats
if err != nil {
logger.Errorf("Error fetching epoch %v validators: %v", epoch, err)
} else {
if epoch > indexer.state.headValidatorsSlot {
indexer.state.cacheMutex.Lock()
if int64(epoch) > indexer.state.headValidatorsEpoch {
indexer.state.headValidatorsEpoch = int64(epoch)
indexer.state.headValidators = epochValidators
}
indexer.state.cacheMutex.Unlock()
epochStats.Validators.ValidatorsStatsMutex.Lock()
for idx := 0; idx < len(epochValidators.Data); idx++ {
validator := epochValidators.Data[idx]
Expand All @@ -648,13 +653,11 @@ func (indexer *Indexer) loadEpochValidators(epoch uint64, epochStats *EpochStats
func (indexer *Indexer) processIndexing() {
// process old epochs
currentEpoch := utils.EpochOfSlot(indexer.state.lastHeadBlock)
if currentEpoch >= uint64(indexer.epochProcessingDelay) {
maxProcessEpoch := currentEpoch - uint64(indexer.epochProcessingDelay)
for indexer.state.lastProcessedEpoch < maxProcessEpoch {
processEpoch := indexer.state.lastProcessedEpoch + 1
indexer.processEpoch(processEpoch)
indexer.state.lastProcessedEpoch = processEpoch
}
maxProcessEpoch := int64(currentEpoch) - int64(indexer.epochProcessingDelay)
for indexer.state.lastProcessedEpoch < maxProcessEpoch {
processEpoch := indexer.state.lastProcessedEpoch + 1
indexer.processEpoch(uint64(processEpoch))
indexer.state.lastProcessedEpoch = processEpoch
}
}

Expand Down
2 changes: 1 addition & 1 deletion indexer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (sync *synchronizerState) runSync() {
syncEpoch++
sync.currentEpoch = syncEpoch
sync.stateMutex.Unlock()
if syncEpoch > indexerEpoch {
if int64(syncEpoch) > indexerEpoch {
isComplete = true
break
}
Expand Down

0 comments on commit 6d30397

Please sign in to comment.