From 9f40ad6b426c9bf61407dde95e0d2ca80074e893 Mon Sep 17 00:00:00 2001 From: pk910 Date: Fri, 25 Aug 2023 17:11:54 +0200 Subject: [PATCH] added support for whisk testnet (SSLE) #10 --- db/db.go | 19 +++++++++ indexer/cache_logic.go | 2 +- indexer/epoch_stats.go | 67 ++++++++++++++++++------------ indexer/indexer.go | 9 +++- indexer/synchronizer.go | 47 +++++++++++---------- rpc/beaconapi.go | 86 +++++++++++++++++++++++++++++++++++---- services/beaconservice.go | 9 ++-- types/config.go | 3 ++ utils/format.go | 4 +- 9 files changed, 182 insertions(+), 64 deletions(-) diff --git a/db/db.go b/db/db.go index 8d43c105..1feb47d7 100644 --- a/db/db.go +++ b/db/db.go @@ -605,6 +605,25 @@ func GetBlockOrphanedRefs(blockRoots [][]byte) []*dbtypes.BlockOrphanedRef { return orphanedRefs } +func GetHighestRootBeforeSlot(slot uint64, withOrphaned bool) []byte { + result := &struct { + Root []byte `db:"root"` + }{} + orphanedLimit := "" + if !withOrphaned { + orphanedLimit = "AND NOT orphaned" + } + + err := ReaderDb.Get(&result, ` + SELECT root FROM blocks WHERE slot < $1 `+orphanedLimit+` ORDER BY slot DESC LIMIT 1 + `, slot) + if err != nil { + logger.Errorf("Error while fetching highest root before %v: %v", slot, err) + return nil + } + return result.Root +} + func InsertUnfinalizedBlock(block *dbtypes.UnfinalizedBlock, tx *sqlx.Tx) error { _, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{ dbtypes.DBEnginePgsql: ` diff --git a/indexer/cache_logic.go b/indexer/cache_logic.go index ddf3a120..7de5e05f 100644 --- a/indexer/cache_logic.go +++ b/indexer/cache_logic.go @@ -117,7 +117,7 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error { var epochTarget []byte var epochDependentRoot []byte if firstBlock == nil { - logger.Warnf("counld not find epoch %v target (no block found)", epoch) + logger.Warnf("could not find epoch %v target (no block found)", epoch) } else { if firstBlock.Slot == firstSlot { epochTarget = firstBlock.Root diff --git a/indexer/epoch_stats.go b/indexer/epoch_stats.go index 7d5b791c..a693aa86 100644 --- a/indexer/epoch_stats.go +++ b/indexer/epoch_stats.go @@ -3,10 +3,12 @@ package indexer import ( "bytes" "fmt" + "math" "strconv" "strings" "sync" + "github.com/pk910/light-beaconchain-explorer/db" "github.com/pk910/light-beaconchain-explorer/dbtypes" "github.com/pk910/light-beaconchain-explorer/rpctypes" "github.com/pk910/light-beaconchain-explorer/utils" @@ -119,7 +121,7 @@ func (epochStats *EpochStats) GetSyncAssignments() []uint64 { } func (client *IndexerClient) ensureEpochStats(epoch uint64, head []byte) error { - var dependendRoot []byte + var dependentRoot []byte var proposerRsp *rpctypes.StandardV1ProposerDutiesResponse if epoch > 0 { firstBlock := client.indexerCache.getFirstCanonicalBlock(epoch, head) @@ -127,35 +129,40 @@ func (client *IndexerClient) ensureEpochStats(epoch uint64, head []byte) error { logger.WithField("client", client.clientName).Debugf("canonical first block for epoch %v: %v/0x%x (head: 0x%x)", epoch, firstBlock.Slot, firstBlock.Root, head) firstBlock.mutex.RLock() if firstBlock.header != nil { - dependendRoot = firstBlock.header.Message.ParentRoot + dependentRoot = firstBlock.header.Message.ParentRoot } firstBlock.mutex.RUnlock() } - if dependendRoot == nil { + if dependentRoot == nil { lastBlock := client.indexerCache.getLastCanonicalBlock(epoch-1, head) if lastBlock != nil { logger.WithField("client", client.clientName).Debugf("canonical last block for epoch %v: %v/0x%x (head: 0x%x)", epoch-1, lastBlock.Slot, lastBlock.Root, head) - dependendRoot = lastBlock.Root + dependentRoot = lastBlock.Root } } } - if dependendRoot == nil { - var err error - proposerRsp, err = client.rpcClient.GetProposerDuties(epoch) - if err != nil { - logger.WithField("client", client.clientName).Warnf("could not load proposer duties for epoch %v: %v", epoch, err) - } - if proposerRsp == nil { - return fmt.Errorf("could not find proposer duties for epoch %v", epoch) + if dependentRoot == nil { + if utils.Config.Chain.WhiskForkEpoch != nil && epoch >= *utils.Config.Chain.WhiskForkEpoch { + firstSlot := epoch * utils.Config.Chain.Config.SlotsPerEpoch + dependentRoot = db.GetHighestRootBeforeSlot(firstSlot, false) + } else { + var err error + proposerRsp, err = client.rpcClient.GetProposerDuties(epoch) + if err != nil { + logger.WithField("client", client.clientName).Warnf("could not load proposer duties for epoch %v: %v", epoch, err) + } + if proposerRsp == nil { + return fmt.Errorf("could not find proposer duties for epoch %v", epoch) + } + dependentRoot = proposerRsp.DependentRoot } - dependendRoot = proposerRsp.DependentRoot } - epochStats, isNewStats := client.indexerCache.createOrGetEpochStats(epoch, dependendRoot) + epochStats, isNewStats := client.indexerCache.createOrGetEpochStats(epoch, dependentRoot) if isNewStats { - logger.WithField("client", client.clientName).Infof("load epoch stats for epoch %v (dependend: 0x%x)", epoch, dependendRoot) + logger.WithField("client", client.clientName).Infof("load epoch stats for epoch %v (dependend: 0x%x)", epoch, dependentRoot) } else { - logger.WithField("client", client.clientName).Debugf("ensure epoch stats for epoch %v (dependend: 0x%x)", epoch, dependendRoot) + logger.WithField("client", client.clientName).Debugf("ensure epoch stats for epoch %v (dependend: 0x%x)", epoch, dependentRoot) } go epochStats.ensureEpochStatsLazy(client, proposerRsp) if int64(epoch) > client.lastEpochStats { @@ -175,7 +182,8 @@ func (epochStats *EpochStats) ensureEpochStatsLazy(client *IndexerClient, propos // proposer duties if epochStats.proposerAssignments == nil { - if proposerRsp == nil { + whiskActivated := utils.Config.Chain.WhiskForkEpoch != nil && epochStats.Epoch >= *utils.Config.Chain.WhiskForkEpoch + if proposerRsp == nil && !whiskActivated { var err error proposerRsp, err = client.rpcClient.GetProposerDuties(epochStats.Epoch) if err != nil { @@ -191,8 +199,16 @@ func (epochStats *EpochStats) ensureEpochStatsLazy(client *IndexerClient, propos } } epochStats.proposerAssignments = map[uint64]uint64{} - for _, duty := range proposerRsp.Data { - epochStats.proposerAssignments[uint64(duty.Slot)] = uint64(duty.ValidatorIndex) + if whiskActivated { + firstSlot := epochStats.Epoch * utils.Config.Chain.Config.SlotsPerEpoch + lastSlot := firstSlot + utils.Config.Chain.Config.SlotsPerEpoch - 1 + for slot := firstSlot; slot <= lastSlot; slot++ { + epochStats.proposerAssignments[slot] = math.MaxInt64 + } + } else { + for _, duty := range proposerRsp.Data { + epochStats.proposerAssignments[uint64(duty.Slot)] = uint64(duty.ValidatorIndex) + } } } @@ -210,19 +226,20 @@ func (epochStats *EpochStats) ensureEpochStatsLazy(client *IndexerClient, propos } } else { parsedHeader, err := client.rpcClient.GetBlockHeaderByBlockroot(epochStats.DependentRoot) - if err != nil { + if err != nil || parsedHeader == nil { logger.WithField("client", client.clientName).Warnf("could not get dependent block header for epoch %v (0x%x)", epochStats.Epoch, epochStats.DependentRoot) - } - if parsedHeader.Data.Header.Message.Slot == 0 { - epochStats.dependentStateRef = "genesis" } else { - epochStats.dependentStateRef = parsedHeader.Data.Header.Message.StateRoot.String() + if parsedHeader.Data.Header.Message.Slot == 0 { + epochStats.dependentStateRef = "genesis" + } else { + epochStats.dependentStateRef = parsedHeader.Data.Header.Message.StateRoot.String() + } } } } // load validators - if epochStats.validatorStats == nil { + if epochStats.validatorStats == nil && epochStats.dependentStateRef != "" { go epochStats.ensureValidatorStatsLazy(client, epochStats.dependentStateRef) } diff --git a/indexer/indexer.go b/indexer/indexer.go index 091d8232..6e7940d5 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -330,6 +330,13 @@ func (indexer *Indexer) GetCachedBlocksByParentRoot(parentRoot []byte) []*CacheB return resBlocks } +func (indexer *Indexer) GetFirstCachedCanonicalBlock(epoch uint64, head []byte) *CacheBlock { + indexer.indexerCache.cacheMutex.RLock() + defer indexer.indexerCache.cacheMutex.RUnlock() + block := indexer.indexerCache.getFirstCanonicalBlock(epoch, head) + return block +} + func (indexer *Indexer) GetCachedEpochStats(epoch uint64) *EpochStats { _, headRoot := indexer.GetCanonicalHead() return indexer.getCachedEpochStats(epoch, headRoot) @@ -369,7 +376,7 @@ func (indexer *Indexer) getEpochVotes(epoch uint64, epochStats *EpochStats) *Epo firstBlock := indexer.indexerCache.getFirstCanonicalBlock(epoch, headRoot) var epochTarget []byte if firstBlock == nil { - logger.Warnf("Counld not find epoch %v target (no block found)", epoch) + logger.Warnf("could not find epoch %v target (no block found)", epoch) } else { if firstBlock.Slot == firstSlot { epochTarget = firstBlock.Root diff --git a/indexer/synchronizer.go b/indexer/synchronizer.go index da2638e1..02d25fda 100644 --- a/indexer/synchronizer.go +++ b/indexer/synchronizer.go @@ -156,22 +156,10 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipCli client := sync.indexer.GetReadyClient(true, nil, skipClients) - // load epoch assignments - epochAssignments, err := client.rpcClient.GetEpochAssignments(syncEpoch) - if err != nil || epochAssignments == nil { - return false, client, fmt.Errorf("error fetching epoch %v duties: %v", syncEpoch, err) - } - if epochAssignments.AttestorAssignments == nil && !lastTry { - return false, client, fmt.Errorf("error fetching epoch %v duties: attestor assignments empty", syncEpoch) - } - - if sync.checkKillChan(0) { - return false, nil, nil - } - // load headers & blocks from this & next epoch firstSlot := syncEpoch * utils.Config.Chain.Config.SlotsPerEpoch lastSlot := firstSlot + (utils.Config.Chain.Config.SlotsPerEpoch * 2) - 1 + var firstBlock *CacheBlock for slot := firstSlot; slot <= lastSlot; slot++ { if sync.cachedSlot >= slot { continue @@ -196,6 +184,9 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipCli header: &headerRsp.Data.Header, block: &blockRsp.Data, } + if firstBlock == nil { + firstBlock = sync.cachedBlocks[slot] + } } sync.cachedSlot = lastSlot @@ -203,6 +194,27 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipCli return false, nil, nil } + // load epoch assignments + var dependentRoot []byte + if firstBlock != nil { + dependentRoot = firstBlock.header.Message.ParentRoot + } else { + // get from db + dependentRoot = db.GetHighestRootBeforeSlot(firstSlot, false) + } + + epochAssignments, err := client.rpcClient.GetEpochAssignments(syncEpoch, dependentRoot) + if err != nil || epochAssignments == nil { + return false, client, fmt.Errorf("error fetching epoch %v duties: %v", syncEpoch, err) + } + if epochAssignments.AttestorAssignments == nil && !lastTry { + return false, client, fmt.Errorf("error fetching epoch %v duties: attestor assignments empty", syncEpoch) + } + + if sync.checkKillChan(0) { + return false, nil, nil + } + // load epoch stats epochStats := &EpochStats{ Epoch: syncEpoch, @@ -221,15 +233,6 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipCli } // process epoch vote aggregations - var firstBlock *CacheBlock - lastSlot = firstSlot + (utils.Config.Chain.Config.SlotsPerEpoch) - 1 - for slot := firstSlot; slot <= lastSlot; slot++ { - if sync.cachedBlocks[slot] != nil { - firstBlock = sync.cachedBlocks[slot] - break - } - } - var targetRoot []byte if firstBlock != nil { if uint64(firstBlock.header.Message.Slot) == firstSlot { diff --git a/rpc/beaconapi.go b/rpc/beaconapi.go index 0423f416..5dd9d2b9 100644 --- a/rpc/beaconapi.go +++ b/rpc/beaconapi.go @@ -1,10 +1,12 @@ package rpc import ( + "bytes" "encoding/json" "errors" "fmt" "io" + "math" "net/http" "strconv" "time" @@ -40,7 +42,7 @@ func (bc *BeaconClient) get(requrl string) ([]byte, error) { logurl := utils.GetRedactedUrl(requrl) t0 := time.Now() defer func() { - logger.WithField("client", bc.name).Debugf("RPC call (byte): %v [%v ms]", logurl, time.Since(t0).Milliseconds()) + logger.WithField("client", bc.name).Debugf("RPC GET call (byte): %v [%v ms]", logurl, time.Since(t0).Milliseconds()) }() req, err := http.NewRequest("GET", requrl, nil) @@ -75,7 +77,7 @@ func (bc *BeaconClient) getJson(requrl string, returnValue interface{}) error { logurl := utils.GetRedactedUrl(requrl) t0 := time.Now() defer func() { - logger.WithField("client", bc.name).Debugf("RPC call (json): %v [%v ms]", logurl, time.Since(t0).Milliseconds()) + logger.WithField("client", bc.name).Debugf("RPC GET call (json): %v [%v ms]", logurl, time.Since(t0).Milliseconds()) }() req, err := http.NewRequest("GET", requrl, nil) @@ -111,6 +113,52 @@ func (bc *BeaconClient) getJson(requrl string, returnValue interface{}) error { return nil } +func (bc *BeaconClient) postJson(requrl string, postData interface{}, returnValue interface{}) error { + logurl := utils.GetRedactedUrl(requrl) + t0 := time.Now() + defer func() { + logger.WithField("client", bc.name).Debugf("RPC POST call (json): %v [%v ms]", logurl, time.Since(t0).Milliseconds()) + }() + + postDataBytes, err := json.Marshal(postData) + if err != nil { + return fmt.Errorf("error encoding json request: %v", err) + } + reader := bytes.NewReader(postDataBytes) + req, err := http.NewRequest("POST", requrl, reader) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + for headerKey, headerVal := range bc.headers { + req.Header.Set(headerKey, headerVal) + } + + client := &http.Client{Timeout: time.Second * 120} + resp, err := client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return errNotFound + } + data, _ := io.ReadAll(resp.Body) + return fmt.Errorf("url: %v, error-response: %s", logurl, data) + } + + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&returnValue) + if err != nil { + return fmt.Errorf("error parsing json response: %v", err) + } + + return nil +} + func (bc *BeaconClient) GetGenesis() (*rpctypes.StandardV1GenesisResponse, error) { resGenesis, err := bc.get(fmt.Sprintf("%s/eth/v1/beacon/genesis", bc.endpoint)) if err != nil { @@ -253,11 +301,15 @@ func (bc *BeaconClient) GetBlockBodyByBlockroot(blockroot []byte) (*rpctypes.Sta } func (bc *BeaconClient) GetProposerDuties(epoch uint64) (*rpctypes.StandardV1ProposerDutiesResponse, error) { + if utils.Config.Chain.WhiskForkEpoch != nil && epoch >= *utils.Config.Chain.WhiskForkEpoch { + // whisk activated - cannot fetch proposer duties + return nil, nil + } + var parsedProposerResponse rpctypes.StandardV1ProposerDutiesResponse proposerResp, err := bc.get(fmt.Sprintf("%s/eth/v1/validator/duties/proposer/%d", bc.endpoint, epoch)) if err != nil { return nil, fmt.Errorf("error retrieving proposer duties: %v", err) } - var parsedProposerResponse rpctypes.StandardV1ProposerDutiesResponse err = json.Unmarshal(proposerResp, &parsedProposerResponse) if err != nil { return nil, fmt.Errorf("error parsing proposer duties: %v", err) @@ -287,32 +339,48 @@ func (bc *BeaconClient) GetSyncCommitteeDuties(stateRef string, epoch uint64) (* } // GetEpochAssignments will get the epoch assignments from Lighthouse RPC api -func (bc *BeaconClient) GetEpochAssignments(epoch uint64) (*rpctypes.EpochAssignments, error) { +func (bc *BeaconClient) GetEpochAssignments(epoch uint64, dependendRoot []byte) (*rpctypes.EpochAssignments, error) { parsedProposerResponse, err := bc.GetProposerDuties(epoch) if err != nil { return nil, err } + if parsedProposerResponse != nil { + dependendRoot = parsedProposerResponse.DependentRoot + } + if dependendRoot == nil { + return nil, fmt.Errorf("couldn't find dependent root for epoch %v", epoch) + } + + var depStateRoot string // fetch the block root that the proposer data is dependent on - parsedHeader, err := bc.GetBlockHeaderByBlockroot(parsedProposerResponse.DependentRoot) + parsedHeader, err := bc.GetBlockHeaderByBlockroot(dependendRoot) if err != nil { return nil, err } - var depStateRoot string = parsedHeader.Data.Header.Message.StateRoot.String() + depStateRoot = parsedHeader.Data.Header.Message.StateRoot.String() if epoch == 0 { depStateRoot = "genesis" } assignments := &rpctypes.EpochAssignments{ - DependendRoot: parsedProposerResponse.DependentRoot, + DependendRoot: dependendRoot, DependendStateRef: depStateRoot, ProposerAssignments: make(map[uint64]uint64), AttestorAssignments: make(map[string][]uint64), } // proposer duties - for _, duty := range parsedProposerResponse.Data { - assignments.ProposerAssignments[uint64(duty.Slot)] = uint64(duty.ValidatorIndex) + if utils.Config.Chain.WhiskForkEpoch != nil && epoch >= *utils.Config.Chain.WhiskForkEpoch { + firstSlot := epoch * utils.Config.Chain.Config.SlotsPerEpoch + lastSlot := firstSlot + utils.Config.Chain.Config.SlotsPerEpoch - 1 + for slot := firstSlot; slot <= lastSlot; slot++ { + assignments.ProposerAssignments[slot] = math.MaxInt64 + } + } else if parsedProposerResponse != nil { + for _, duty := range parsedProposerResponse.Data { + assignments.ProposerAssignments[uint64(duty.Slot)] = uint64(duty.ValidatorIndex) + } } // Now use the state root to make a consistent committee query diff --git a/services/beaconservice.go b/services/beaconservice.go index 6ab89685..2e09e94a 100644 --- a/services/beaconservice.go +++ b/services/beaconservice.go @@ -317,8 +317,10 @@ func (bs *BeaconService) GetEpochAssignments(epoch uint64) (*rpctypes.EpochAssig return epochAssignments, nil } + firstSlot := epoch * utils.Config.Chain.Config.SlotsPerEpoch + dependentRoot := db.GetHighestRootBeforeSlot(firstSlot, false) var err error - epochAssignments, err = bs.indexer.GetRpcClient(true, nil).GetEpochAssignments(epoch) + epochAssignments, err = bs.indexer.GetRpcClient(true, nil).GetEpochAssignments(epoch, dependentRoot) if err != nil { return nil, err } @@ -719,10 +721,7 @@ func (bs *BeaconService) CheckBlockOrphanedStatus(blockRoot []byte) bool { return !cachedBlock.IsCanonical(bs.indexer, nil) } dbRefs := db.GetBlockOrphanedRefs([][]byte{blockRoot}) - if len(dbRefs) > 0 { - return true - } - return false + return len(dbRefs) > 0 } func (bs *BeaconService) GetValidatorActivity() (map[uint64]uint8, uint64) { diff --git a/types/config.go b/types/config.go index c0a62beb..2d2cb3aa 100644 --- a/types/config.go +++ b/types/config.go @@ -23,6 +23,9 @@ type Config struct { GenesisTimestamp uint64 `yaml:"genesisTimestamp" envconfig:"CHAIN_GENESIS_TIMESTAMP"` ConfigPath string `yaml:"configPath" envconfig:"CHAIN_CONFIG_PATH"` Config ChainConfig + + // optional features + WhiskForkEpoch *uint64 `yaml:"whiskForkEpoch" envconfig:"WHISK_FORK_EPOCH"` } `yaml:"chain"` Frontend struct { diff --git a/utils/format.go b/utils/format.go index 2cfdeff1..39342b8c 100644 --- a/utils/format.go +++ b/utils/format.go @@ -317,7 +317,9 @@ func FormatSlashedValidator(index uint64, name string) template.HTML { } func formatValidator(index uint64, name string, icon string) template.HTML { - if name != "" { + if index == math.MaxInt64 { + return template.HTML(fmt.Sprintf(" unknown", icon)) + } else if name != "" { return template.HTML(fmt.Sprintf(" %v", index, icon, index, html.EscapeString(name))) } return template.HTML(fmt.Sprintf(" %v", icon, index, index))