Skip to content

Commit

Permalink
introduce concurrent map for managing block hash to block node index (d…
Browse files Browse the repository at this point in the history
…eso-protocol#1405)

* introduce concurrent map for managing block hash to block node index

* fix txindex

* fix db utils test

* fix encoding test issue and convert block index to map before checking len
  • Loading branch information
lazynina authored Sep 17, 2024
1 parent e258767 commit 509225f
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 110 deletions.
15 changes: 15 additions & 0 deletions collections/concurrent_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ func NewConcurrentMap[Key comparable, Value any]() *ConcurrentMap[Key, Value] {
}
}

func NewConcurrentMapFromMap[Key comparable, Value any](input map[Key]Value) *ConcurrentMap[Key, Value] {
return &ConcurrentMap[Key, Value]{
m: input,
}
}

func (cm *ConcurrentMap[Key, Value]) Set(key Key, val Value) {
cm.mtx.Lock()
defer cm.mtx.Unlock()
Expand Down Expand Up @@ -78,3 +84,12 @@ func (cm *ConcurrentMap[Key, Value]) Count() int {

return len(cm.m)
}

func (cm *ConcurrentMap[Key, Value]) Iterate(fn func(Key, Value)) {
cm.mtx.RLock()
defer cm.mtx.RUnlock()

for key, val := range cm.m {
fn(key, val)
}
}
5 changes: 5 additions & 0 deletions lib/block_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ func setupTestDeSoEncoder(t *testing.T) {
{
for ii := 0; ii < testDeSoEncoderRetries; ii++ {
newVersionByte := encoder.GetVersionByte(blockHeight)
// If the version byte changes, we can't compare the encoding as we know that a
// fork height was changed underneath us.
if newVersionByte != versionByte {
continue
}
reEncodingBytes := encodeToBytes(blockHeight, encoder, skipMetadata...)
if !bytes.Equal(encodingBytes, reEncodingBytes) {
t.Fatalf("EncodeToBytes: Found non-deterministic encoding for a DeSoEncoder. Attempted "+
Expand Down
68 changes: 38 additions & 30 deletions lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ type Blockchain struct {
//
// An in-memory index of the "tree" of blocks we are currently aware of.
// This index includes forks and side-chains.
blockIndexByHash map[BlockHash]*BlockNode
blockIndexByHash *collections.ConcurrentMap[BlockHash, *BlockNode]
// blockIndexByHeight is an in-memory map of block height to block nodes. This is
// used to quickly find the safe blocks from which the chain can be extended for PoS
blockIndexByHeight map[uint64]map[BlockHash]*BlockNode
Expand Down Expand Up @@ -705,36 +705,39 @@ func getCheckpointBlockInfoFromProviderHelper(provider string) *CheckpointBlockI
}

func (bc *Blockchain) addNewBlockNodeToBlockIndex(blockNode *BlockNode) {
bc.blockIndexByHash[*blockNode.Hash] = blockNode
bc.blockIndexByHash.Set(*blockNode.Hash, blockNode)
if _, exists := bc.blockIndexByHeight[uint64(blockNode.Height)]; !exists {
bc.blockIndexByHeight[uint64(blockNode.Height)] = make(map[BlockHash]*BlockNode)
}
bc.blockIndexByHeight[uint64(blockNode.Height)][*blockNode.Hash] = blockNode
}

func (bc *Blockchain) CopyBlockIndexes() (_blockIndexByHash map[BlockHash]*BlockNode, _blockIndexByHeight map[uint64]map[BlockHash]*BlockNode) {
newBlockIndexByHash := make(map[BlockHash]*BlockNode)
func (bc *Blockchain) CopyBlockIndexes() (
_blockIndexByHash *collections.ConcurrentMap[BlockHash, *BlockNode],
_blockIndexByHeight map[uint64]map[BlockHash]*BlockNode,
) {
newBlockIndexByHash := collections.NewConcurrentMap[BlockHash, *BlockNode]()
newBlockIndexByHeight := make(map[uint64]map[BlockHash]*BlockNode)
for kk, vv := range bc.blockIndexByHash {
newBlockIndexByHash[kk] = vv
bc.blockIndexByHash.Iterate(func(kk BlockHash, vv *BlockNode) {
newBlockIndexByHash.Set(kk, vv)
blockHeight := uint64(vv.Height)
if _, exists := newBlockIndexByHeight[blockHeight]; !exists {
newBlockIndexByHeight[blockHeight] = make(map[BlockHash]*BlockNode)
}
newBlockIndexByHeight[blockHeight][kk] = vv
}
})
return newBlockIndexByHash, newBlockIndexByHeight
}

func (bc *Blockchain) constructBlockIndexByHeight() map[uint64]map[BlockHash]*BlockNode {
newBlockIndex := make(map[uint64]map[BlockHash]*BlockNode)
for _, blockNode := range bc.blockIndexByHash {
bc.blockIndexByHash.Iterate(func(_ BlockHash, blockNode *BlockNode) {
blockHeight := uint64(blockNode.Height)
if _, exists := newBlockIndex[blockHeight]; !exists {
newBlockIndex[blockHeight] = make(map[BlockHash]*BlockNode)
}
newBlockIndex[blockHeight][*blockNode.Hash] = blockNode
}
})
return newBlockIndex
}

Expand Down Expand Up @@ -852,14 +855,14 @@ func (bc *Blockchain) _initChain() error {
// nodes pointing to valid parent nodes.
{
// Find the tip node with the best node hash.
tipNode := bc.blockIndexByHash[*bestBlockHash]
if tipNode == nil {
tipNode, exists := bc.blockIndexByHash.Get(*bestBlockHash)
if !exists {
return fmt.Errorf("_initChain(block): Best hash (%#v) not found in block index", bestBlockHash)
}

// Walk back from the best node to the genesis block and store them all
// in bestChain.
bc.bestChain, err = GetBestChain(tipNode, bc.blockIndexByHash)
bc.bestChain, err = GetBestChain(tipNode)
if err != nil {
return errors.Wrapf(err, "_initChain(block): Problem reading best chain from db")
}
Expand All @@ -871,14 +874,14 @@ func (bc *Blockchain) _initChain() error {
// TODO: This code is a bit repetitive but this seemed clearer than factoring it out.
{
// Find the tip node with the best node hash.
tipNode := bc.blockIndexByHash[*bestHeaderHash]
if tipNode == nil {
tipNode, exists := bc.blockIndexByHash.Get(*bestHeaderHash)
if !exists {
return fmt.Errorf("_initChain(header): Best hash (%#v) not found in block index", bestHeaderHash)
}

// Walk back from the best node to the genesis block and store them all
// in bestChain.
bc.bestHeaderChain, err = GetBestChain(tipNode, bc.blockIndexByHash)
bc.bestHeaderChain, err = GetBestChain(tipNode)
if err != nil {
return errors.Wrapf(err, "_initChain(header): Problem reading best chain from db")
}
Expand Down Expand Up @@ -989,7 +992,7 @@ func NewBlockchain(
eventManager: eventManager,
archivalMode: archivalMode,

blockIndexByHash: make(map[BlockHash]*BlockNode),
blockIndexByHash: collections.NewConcurrentMap[BlockHash, *BlockNode](),
blockIndexByHeight: make(map[uint64]map[BlockHash]*BlockNode),
bestChainMap: make(map[BlockHash]*BlockNode),

Expand Down Expand Up @@ -1062,12 +1065,12 @@ func fastLog2Floor(n uint32) uint8 {
//
// This function MUST be called with the chain state lock held (for reads).
func locateInventory(locator []*BlockHash, stopHash *BlockHash, maxEntries uint32,
blockIndex map[BlockHash]*BlockNode, bestChainList []*BlockNode,
blockIndex *collections.ConcurrentMap[BlockHash, *BlockNode], bestChainList []*BlockNode,
bestChainMap map[BlockHash]*BlockNode) (*BlockNode, uint32) {

// There are no block locators so a specific block is being requested
// as identified by the stop hash.
stopNode, stopNodeExists := blockIndex[*stopHash]
stopNode, stopNodeExists := blockIndex.Get(*stopHash)
if len(locator) == 0 {
if !stopNodeExists {
// No blocks with the stop hash were found so there is
Expand Down Expand Up @@ -1123,7 +1126,7 @@ func locateInventory(locator []*BlockHash, stopHash *BlockHash, maxEntries uint3
//
// This function MUST be called with the ChainLock held (for reads).
func locateHeaders(locator []*BlockHash, stopHash *BlockHash, maxHeaders uint32,
blockIndex map[BlockHash]*BlockNode, bestChainList []*BlockNode,
blockIndex *collections.ConcurrentMap[BlockHash, *BlockNode], bestChainList []*BlockNode,
bestChainMap map[BlockHash]*BlockNode) []*MsgDeSoHeader {

// Find the node after the first known block in the locator and the
Expand Down Expand Up @@ -1253,7 +1256,7 @@ func (bc *Blockchain) LatestLocator(tip *BlockNode) []*BlockHash {
}

func (bc *Blockchain) HeaderLocatorWithNodeHash(blockHash *BlockHash) ([]*BlockHash, error) {
node, exists := bc.blockIndexByHash[*blockHash]
node, exists := bc.blockIndexByHash.Get(*blockHash)
if !exists {
return nil, fmt.Errorf("Blockchain.HeaderLocatorWithNodeHash: Node for hash %v is not in our blockIndexByHash", blockHash)
}
Expand Down Expand Up @@ -1334,7 +1337,7 @@ func (bc *Blockchain) GetBlockNodesToFetch(
}

func (bc *Blockchain) HasHeader(headerHash *BlockHash) bool {
_, exists := bc.blockIndexByHash[*headerHash]
_, exists := bc.blockIndexByHash.Get(*headerHash)
return exists
}

Expand All @@ -1347,7 +1350,7 @@ func (bc *Blockchain) HeaderAtHeight(blockHeight uint32) *BlockNode {
}

func (bc *Blockchain) HasBlock(blockHash *BlockHash) bool {
node, nodeExists := bc.blockIndexByHash[*blockHash]
node, nodeExists := bc.blockIndexByHash.Get(*blockHash)
if !nodeExists {
glog.V(2).Infof("Blockchain.HasBlock: Node with hash %v does not exist in node index", blockHash)
return false
Expand All @@ -1366,7 +1369,7 @@ func (bc *Blockchain) HasBlockInBlockIndex(blockHash *BlockHash) bool {
bc.ChainLock.RLock()
defer bc.ChainLock.RUnlock()

_, exists := bc.blockIndexByHash[*blockHash]
_, exists := bc.blockIndexByHash.Get(*blockHash)
return exists
}

Expand All @@ -1376,7 +1379,7 @@ func (bc *Blockchain) GetBlockHeaderFromIndex(blockHash *BlockHash) *MsgDeSoHead
bc.ChainLock.RLock()
defer bc.ChainLock.RUnlock()

block, blockExists := bc.blockIndexByHash[*blockHash]
block, blockExists := bc.blockIndexByHash.Get(*blockHash)
if !blockExists {
return nil
}
Expand Down Expand Up @@ -1684,7 +1687,12 @@ func (bc *Blockchain) SetBestChain(bestChain []*BlockNode) {
bc.bestChain = bestChain
}

func (bc *Blockchain) SetBestChainMap(bestChain []*BlockNode, bestChainMap map[BlockHash]*BlockNode, blockIndexByHash map[BlockHash]*BlockNode, blockIndexByHeight map[uint64]map[BlockHash]*BlockNode) {
func (bc *Blockchain) SetBestChainMap(
bestChain []*BlockNode,
bestChainMap map[BlockHash]*BlockNode,
blockIndexByHash *collections.ConcurrentMap[BlockHash, *BlockNode],
blockIndexByHeight map[uint64]map[BlockHash]*BlockNode,
) {
bc.bestChain = bestChain
bc.bestChainMap = bestChainMap
bc.blockIndexByHash = blockIndexByHash
Expand Down Expand Up @@ -2017,7 +2025,7 @@ func (bc *Blockchain) processHeaderPoW(blockHeader *MsgDeSoHeader, headerHash *B
// index. If it does, then return an error. We should generally
// expect that processHeaderPoW will only be called on headers we
// haven't seen before.
_, nodeExists := bc.blockIndexByHash[*headerHash]
_, nodeExists := bc.blockIndexByHash.Get(*headerHash)
if nodeExists {
return false, false, HeaderErrorDuplicateHeader
}
Expand All @@ -2042,7 +2050,7 @@ func (bc *Blockchain) processHeaderPoW(blockHeader *MsgDeSoHeader, headerHash *B
if blockHeader.PrevBlockHash == nil {
return false, false, HeaderErrorNilPrevHash
}
parentNode, parentNodeExists := bc.blockIndexByHash[*blockHeader.PrevBlockHash]
parentNode, parentNodeExists := bc.blockIndexByHash.Get(*blockHeader.PrevBlockHash)
if !parentNodeExists {
// This block is an orphan if its parent doesn't exist and we don't
// process unconnectedTxns.
Expand Down Expand Up @@ -2304,7 +2312,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures
bc.timer.Start("Blockchain.ProcessBlock: BlockNode")

// See if a node for the block exists in our node index.
nodeToValidate, nodeExists := bc.blockIndexByHash[*blockHash]
nodeToValidate, nodeExists := bc.blockIndexByHash.Get(*blockHash)
// If no node exists for this block at all, then process the header
// first before we do anything. This should create a node and set
// the header validation status for it.
Expand All @@ -2325,7 +2333,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures

// Reset the pointers after having presumably added the header to the
// block index.
nodeToValidate, nodeExists = bc.blockIndexByHash[*blockHash]
nodeToValidate, nodeExists = bc.blockIndexByHash.Get(*blockHash)
}
// At this point if the node still doesn't exist or if the header's validation
// failed then we should return an error for the block. Note that at this point
Expand All @@ -2344,7 +2352,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures
// In this case go ahead and return early. If its parents are truly legitimate then we
// should re-request it and its parents from a node and reprocess it
// once it is no longer an orphan.
parentNode, parentNodeExists := bc.blockIndexByHash[*blockHeader.PrevBlockHash]
parentNode, parentNodeExists := bc.blockIndexByHash.Get(*blockHeader.PrevBlockHash)
if !parentNodeExists || (parentNode.Status&StatusBlockProcessed) == 0 {
return false, true, nil
}
Expand Down
14 changes: 9 additions & 5 deletions lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/deso-protocol/core/collections"
"io"
"log"
"math"
Expand Down Expand Up @@ -5470,8 +5471,11 @@ func GetBlockTipHeight(handle *badger.DB, bitcoinNodes bool) (uint64, error) {
return blockHeight, err
}

func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (map[BlockHash]*BlockNode, error) {
blockIndex := make(map[BlockHash]*BlockNode)
func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (
*collections.ConcurrentMap[BlockHash, *BlockNode],
error,
) {
blockIndex := collections.NewConcurrentMap[BlockHash, *BlockNode]()

prefix := _heightHashToNodeIndexPrefix(bitcoinNodes)

Expand Down Expand Up @@ -5503,7 +5507,7 @@ func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (ma

// If we got here it means we read a blockNode successfully. Store it
// into our node index.
blockIndex[*blockNode.Hash] = blockNode
blockIndex.Set(*blockNode.Hash, blockNode)

// Find the parent of this block, which should already have been read
// in and connect it. Skip the genesis block, which has height 0. Also
Expand All @@ -5517,7 +5521,7 @@ func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (ma
if blockNode.Height == 0 || (*blockNode.Header.PrevBlockHash == BlockHash{}) {
continue
}
if parent, ok := blockIndex[*blockNode.Header.PrevBlockHash]; ok {
if parent, ok := blockIndex.Get(*blockNode.Header.PrevBlockHash); ok {
// We found the parent node so connect it.
blockNode.Parent = parent
} else {
Expand All @@ -5540,7 +5544,7 @@ func GetBlockIndex(handle *badger.DB, bitcoinNodes bool, params *DeSoParams) (ma
return blockIndex, nil
}

func GetBestChain(tipNode *BlockNode, blockIndex map[BlockHash]*BlockNode) ([]*BlockNode, error) {
func GetBestChain(tipNode *BlockNode) ([]*BlockNode, error) {
reversedBestChain := []*BlockNode{}
for tipNode != nil {
if (tipNode.Status&StatusBlockValidated) == 0 &&
Expand Down
18 changes: 9 additions & 9 deletions lib/db_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,17 @@ func TestBlockNodePutGet(t *testing.T) {
blockIndex, err := GetBlockIndex(db, false /*bitcoinNodes*/, &DeSoTestnetParams)
require.NoError(err)

require.Len(blockIndex, 4)
b1Ret, exists := blockIndex[*b1.Hash]
require.Equal(blockIndex.Count(), 4)
b1Ret, exists := blockIndex.Get(*b1.Hash)
require.True(exists, "b1 not found")

b2Ret, exists := blockIndex[*b2.Hash]
b2Ret, exists := blockIndex.Get(*b2.Hash)
require.True(exists, "b2 not found")

b3Ret, exists := blockIndex[*b3.Hash]
b3Ret, exists := blockIndex.Get(*b3.Hash)
require.True(exists, "b3 not found")

b4Ret, exists := blockIndex[*b4.Hash]
b4Ret, exists := blockIndex.Get(*b4.Hash)
require.True(exists, "b4 not found")

// Make sure the hashes all line up.
Expand All @@ -201,7 +201,7 @@ func TestBlockNodePutGet(t *testing.T) {

// Check that getting the best chain works.
{
bestChain, err := GetBestChain(b3Ret, blockIndex)
bestChain, err := GetBestChain(b3Ret)
require.NoError(err)
require.Len(bestChain, 3)
require.Equal(b1Ret, bestChain[0])
Expand All @@ -226,15 +226,15 @@ func TestInitDbWithGenesisBlock(t *testing.T) {
// Check the block index.
blockIndex, err := GetBlockIndex(db, false /*bitcoinNodes*/, &DeSoTestnetParams)
require.NoError(err)
require.Len(blockIndex, 1)
require.Equal(blockIndex.Count(), 1)
genesisHash := *MustDecodeHexBlockHash(DeSoTestnetParams.GenesisBlockHashHex)
genesis, exists := blockIndex[genesisHash]
genesis, exists := blockIndex.Get(genesisHash)
require.True(exists, "genesis block not found in index")
require.NotNil(genesis)
require.Equal(&genesisHash, genesis.Hash)

// Check the bestChain.
bestChain, err := GetBestChain(genesis, blockIndex)
bestChain, err := GetBestChain(genesis)
require.NoError(err)
require.Len(bestChain, 1)
require.Equal(genesis, bestChain[0])
Expand Down
Loading

0 comments on commit 509225f

Please sign in to comment.