diff --git a/indexer/beacon/blockcache.go b/indexer/beacon/blockcache.go index 0d693de..6ea52cd 100644 --- a/indexer/beacon/blockcache.go +++ b/indexer/beacon/blockcache.go @@ -10,25 +10,29 @@ import ( "github.com/ethpandaops/dora/db" ) +var zeroHash = phase0.Hash32{} + // blockCache is a cache for storing blocks. type blockCache struct { - indexer *Indexer - cacheMutex sync.RWMutex - highestSlot int64 - lowestSlot int64 - slotMap map[phase0.Slot][]*Block - rootMap map[phase0.Root]*Block - parentMap map[phase0.Root][]*Block - latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes) + indexer *Indexer + cacheMutex sync.RWMutex + highestSlot int64 + lowestSlot int64 + slotMap map[phase0.Slot][]*Block + rootMap map[phase0.Root]*Block + parentMap map[phase0.Root][]*Block + execBlockMap map[phase0.Hash32][]*Block + latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes) } // newBlockCache creates a new instance of blockCache. func newBlockCache(indexer *Indexer) *blockCache { return &blockCache{ - indexer: indexer, - slotMap: map[phase0.Slot][]*Block{}, - rootMap: map[phase0.Root]*Block{}, - parentMap: map[phase0.Root][]*Block{}, + indexer: indexer, + slotMap: map[phase0.Slot][]*Block{}, + rootMap: map[phase0.Root]*Block{}, + parentMap: map[phase0.Root][]*Block{}, + execBlockMap: map[phase0.Hash32][]*Block{}, } } @@ -81,6 +85,29 @@ func (cache *blockCache) addBlockToParentMap(block *Block) { cache.parentMap[*parentRoot] = append(cache.parentMap[*parentRoot], block) } +// addBlockToExecBlockMap adds the given block to the execution block map. +func (cache *blockCache) addBlockToExecBlockMap(block *Block) { + cache.cacheMutex.Lock() + defer cache.cacheMutex.Unlock() + + blockIndex := block.GetBlockIndex() + if blockIndex == nil { + return + } + + if bytes.Equal(blockIndex.ExecutionHash[:], zeroHash[:]) { + return + } + + for _, entry := range cache.execBlockMap[blockIndex.ExecutionHash] { + if entry == block { + return + } + } + + cache.execBlockMap[blockIndex.ExecutionHash] = append(cache.execBlockMap[blockIndex.ExecutionHash], block) +} + // getBlockByRoot returns the cached block with the given root. func (cache *blockCache) getBlockByRoot(root phase0.Root) *Block { cache.cacheMutex.RLock() @@ -139,27 +166,13 @@ func (cache *blockCache) getBlocksByExecutionBlockHash(blockHash phase0.Hash32) cache.cacheMutex.RLock() defer cache.cacheMutex.RUnlock() - resBlocks := []*Block{} - for _, block := range cache.rootMap { - if block.blockIndex != nil { - if bytes.Equal(block.blockIndex.ExecutionHash[:], blockHash[:]) { - resBlocks = append(resBlocks, block) - } - continue - } - - blockBody := block.GetBlock() - if blockBody == nil { - continue - } - - executionHash, _ := blockBody.ExecutionBlockHash() - if bytes.Equal(executionHash[:], blockHash[:]) { - resBlocks = append(resBlocks, block) - } + cachedBlocks := cache.execBlockMap[blockHash] + blocks := make([]*Block, len(cachedBlocks)) + if len(blocks) > 0 { + copy(blocks, cachedBlocks) } - return resBlocks + return blocks } func (cache *blockCache) getBlocksByExecutionBlockNumber(blockNumber uint64) []*Block { @@ -320,6 +333,20 @@ func (cache *blockCache) removeBlock(block *Block) { } } + // remove the block from the execution block map. + if blockIndex := block.GetBlockIndex(); blockIndex != nil && !bytes.Equal(blockIndex.ExecutionHash[:], zeroHash[:]) { + execBlocks := cache.execBlockMap[blockIndex.ExecutionHash] + if len(execBlocks) == 1 && execBlocks[0] == block { + delete(cache.execBlockMap, blockIndex.ExecutionHash) + } else if len(execBlocks) > 1 { + for i, execBlock := range execBlocks { + if execBlock == block { + cache.execBlockMap[blockIndex.ExecutionHash] = append(execBlocks[:i], execBlocks[i+1:]...) + break + } + } + } + } } // getEpochBlocks returns the blocks that belong to the specified epoch. diff --git a/indexer/beacon/client.go b/indexer/beacon/client.go index cf40241..c373305 100644 --- a/indexer/beacon/client.go +++ b/indexer/beacon/client.go @@ -393,6 +393,7 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 if slot >= finalizedSlot && isNew { c.indexer.blockCache.addBlockToParentMap(block) + c.indexer.blockCache.addBlockToExecBlockMap(block) t1 := time.Now() // fork detection diff --git a/indexer/beacon/indexer.go b/indexer/beacon/indexer.go index dcdee71..6eb6f28 100644 --- a/indexer/beacon/indexer.go +++ b/indexer/beacon/indexer.go @@ -331,6 +331,8 @@ func (indexer *Indexer) StartIndexer() { block.isInFinalizedDb = true } + indexer.blockCache.addBlockToExecBlockMap(block) + blockFork := indexer.forkCache.getForkById(block.forkId) if blockFork != nil { if blockFork.headBlock == nil || blockFork.headBlock.Slot < block.Slot {