Skip to content

Commit

Permalink
Merge pull request #144 from ethpandaops/pk910/exec-hash-index
Browse files Browse the repository at this point in the history
add execution block hash index to block cache
  • Loading branch information
pk910 authored Oct 15, 2024
2 parents 0157e7a + 17b4b5d commit 8c978a0
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 31 deletions.
89 changes: 58 additions & 31 deletions indexer/beacon/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,29 @@ import (
"github.com/ethpandaops/dora/db"
)

var zeroHash = phase0.Hash32{}

// blockCache is a cache for storing blocks.
type blockCache struct {
indexer *Indexer
cacheMutex sync.RWMutex
highestSlot int64
lowestSlot int64
slotMap map[phase0.Slot][]*Block
rootMap map[phase0.Root]*Block
parentMap map[phase0.Root][]*Block
latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes)
indexer *Indexer
cacheMutex sync.RWMutex
highestSlot int64
lowestSlot int64
slotMap map[phase0.Slot][]*Block
rootMap map[phase0.Root]*Block
parentMap map[phase0.Root][]*Block
execBlockMap map[phase0.Hash32][]*Block
latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes)
}

// newBlockCache creates a new instance of blockCache.
func newBlockCache(indexer *Indexer) *blockCache {
return &blockCache{
indexer: indexer,
slotMap: map[phase0.Slot][]*Block{},
rootMap: map[phase0.Root]*Block{},
parentMap: map[phase0.Root][]*Block{},
indexer: indexer,
slotMap: map[phase0.Slot][]*Block{},
rootMap: map[phase0.Root]*Block{},
parentMap: map[phase0.Root][]*Block{},
execBlockMap: map[phase0.Hash32][]*Block{},
}
}

Expand Down Expand Up @@ -81,6 +85,29 @@ func (cache *blockCache) addBlockToParentMap(block *Block) {
cache.parentMap[*parentRoot] = append(cache.parentMap[*parentRoot], block)
}

// addBlockToExecBlockMap adds the given block to the execution block map.
func (cache *blockCache) addBlockToExecBlockMap(block *Block) {
cache.cacheMutex.Lock()
defer cache.cacheMutex.Unlock()

blockIndex := block.GetBlockIndex()
if blockIndex == nil {
return
}

if bytes.Equal(blockIndex.ExecutionHash[:], zeroHash[:]) {
return
}

for _, entry := range cache.execBlockMap[blockIndex.ExecutionHash] {
if entry == block {
return
}
}

cache.execBlockMap[blockIndex.ExecutionHash] = append(cache.execBlockMap[blockIndex.ExecutionHash], block)
}

// getBlockByRoot returns the cached block with the given root.
func (cache *blockCache) getBlockByRoot(root phase0.Root) *Block {
cache.cacheMutex.RLock()
Expand Down Expand Up @@ -139,27 +166,13 @@ func (cache *blockCache) getBlocksByExecutionBlockHash(blockHash phase0.Hash32)
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()

resBlocks := []*Block{}
for _, block := range cache.rootMap {
if block.blockIndex != nil {
if bytes.Equal(block.blockIndex.ExecutionHash[:], blockHash[:]) {
resBlocks = append(resBlocks, block)
}
continue
}

blockBody := block.GetBlock()
if blockBody == nil {
continue
}

executionHash, _ := blockBody.ExecutionBlockHash()
if bytes.Equal(executionHash[:], blockHash[:]) {
resBlocks = append(resBlocks, block)
}
cachedBlocks := cache.execBlockMap[blockHash]
blocks := make([]*Block, len(cachedBlocks))
if len(blocks) > 0 {
copy(blocks, cachedBlocks)
}

return resBlocks
return blocks
}

func (cache *blockCache) getBlocksByExecutionBlockNumber(blockNumber uint64) []*Block {
Expand Down Expand Up @@ -320,6 +333,20 @@ func (cache *blockCache) removeBlock(block *Block) {
}
}

// remove the block from the execution block map.
if blockIndex := block.GetBlockIndex(); blockIndex != nil && !bytes.Equal(blockIndex.ExecutionHash[:], zeroHash[:]) {
execBlocks := cache.execBlockMap[blockIndex.ExecutionHash]
if len(execBlocks) == 1 && execBlocks[0] == block {
delete(cache.execBlockMap, blockIndex.ExecutionHash)
} else if len(execBlocks) > 1 {
for i, execBlock := range execBlocks {
if execBlock == block {
cache.execBlockMap[blockIndex.ExecutionHash] = append(execBlocks[:i], execBlocks[i+1:]...)
break
}
}
}
}
}

// getEpochBlocks returns the blocks that belong to the specified epoch.
Expand Down
1 change: 1 addition & 0 deletions indexer/beacon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0

if slot >= finalizedSlot && isNew {
c.indexer.blockCache.addBlockToParentMap(block)
c.indexer.blockCache.addBlockToExecBlockMap(block)
t1 := time.Now()

// fork detection
Expand Down
2 changes: 2 additions & 0 deletions indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ func (indexer *Indexer) StartIndexer() {
block.isInFinalizedDb = true
}

indexer.blockCache.addBlockToExecBlockMap(block)

blockFork := indexer.forkCache.getForkById(block.forkId)
if blockFork != nil {
if blockFork.headBlock == nil || blockFork.headBlock.Slot < block.Slot {
Expand Down

0 comments on commit 8c978a0

Please sign in to comment.