From b238613a3a2ed494a3133a91047f196cd099cc8c Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 6 Sep 2023 03:38:01 +0200 Subject: [PATCH] write sync committee duties to db and made epoch duties optional for slot details page --- db/db.go | 70 ++++++++++++++++++- .../pgsql/20230906021153_sync-duties.sql | 20 ++++++ .../sqlite/20230906021153_sync-duties.sql | 20 ++++++ dbtypes/dbtypes.go | 6 ++ handlers/slot.go | 66 +++++++++++------ indexer/cache_logic.go | 6 ++ indexer/epoch_stats.go | 16 +++++ indexer/synchronizer.go | 5 ++ indexer/write_db.go | 23 ++++++ templates/slot/slot.html | 10 ++- types/models/slot.go | 5 +- 11 files changed, 223 insertions(+), 24 deletions(-) create mode 100644 db/schema/pgsql/20230906021153_sync-duties.sql create mode 100644 db/schema/sqlite/20230906021153_sync-duties.sql diff --git a/db/db.go b/db/db.go index da9d1f4f..21349306 100644 --- a/db/db.go +++ b/db/db.go @@ -304,6 +304,15 @@ func IsEpochSynchronized(epoch uint64) bool { return count > 0 } +func IsSyncCommitteeSynchronized(period uint64) bool { + var count uint64 + err := ReaderDb.Get(&count, `SELECT COUNT(*) FROM sync_assignments WHERE period = $1`, period) + if err != nil { + return false + } + return count > 0 +} + func InsertSlotAssignments(slotAssignments []*dbtypes.SlotAssignment, tx *sqlx.Tx) error { var sql strings.Builder fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ @@ -332,6 +341,35 @@ func InsertSlotAssignments(slotAssignments []*dbtypes.SlotAssignment, tx *sqlx.T return nil } +func InsertSyncAssignments(syncAssignments []*dbtypes.SyncAssignment, tx *sqlx.Tx) error { + var sql strings.Builder + fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: `INSERT INTO sync_assignments (period, "index", validator) VALUES `, + dbtypes.DBEngineSqlite: `INSERT OR REPLACE INTO sync_assignments (period, "index", validator) VALUES `, + })) + argIdx := 0 + args := make([]any, len(syncAssignments)*3) + for i, slotAssignment := range syncAssignments { + if i > 0 { + fmt.Fprintf(&sql, ", ") + } + fmt.Fprintf(&sql, "($%v, $%v, $%v)", argIdx+1, argIdx+2, argIdx+3) + args[argIdx] = slotAssignment.Period + args[argIdx+1] = slotAssignment.Index + args[argIdx+2] = slotAssignment.Validator + argIdx += 3 + } + fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: ` ON CONFLICT (period, "index") DO UPDATE SET validator = excluded.validator`, + dbtypes.DBEngineSqlite: "", + })) + _, err := tx.Exec(sql.String(), args...) + if err != nil { + return err + } + return nil +} + func InsertBlock(block *dbtypes.Block, tx *sqlx.Tx) error { _, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{ dbtypes.DBEnginePgsql: ` @@ -636,7 +674,37 @@ func GetSlotAssignmentsForSlots(firstSlot uint64, lastSlot uint64) []*dbtypes.Sl WHERE slot <= $1 AND slot >= $2 `, firstSlot, lastSlot) if err != nil { - logger.Errorf("Error while fetching blocks: %v", err) + logger.Errorf("Error while fetching slot assignments: %v", err) + return nil + } + return assignments +} + +func GetSlotAssignment(slot uint64) *dbtypes.SlotAssignment { + assignment := dbtypes.SlotAssignment{} + err := ReaderDb.Get(&assignment, ` + SELECT + slot, proposer + FROM slot_assignments + WHERE slot = $1 + `, slot) + if err != nil { + return nil + } + return &assignment +} + +func GetSyncAssignmentsForPeriod(period uint64) []uint64 { + assignments := []uint64{} + err := ReaderDb.Select(&assignments, ` + SELECT + validator + FROM sync_assignments + WHERE period = $1 + ORDER BY "index" ASC + `, period) + if err != nil { + logger.Errorf("Error while fetching sync assignments: %v", err) return nil } return assignments diff --git a/db/schema/pgsql/20230906021153_sync-duties.sql b/db/schema/pgsql/20230906021153_sync-duties.sql new file mode 100644 index 00000000..a11ed6d0 --- /dev/null +++ b/db/schema/pgsql/20230906021153_sync-duties.sql @@ -0,0 +1,20 @@ +-- +goose Up +-- +goose StatementBegin + +CREATE TABLE IF NOT EXISTS public."sync_assignments" +( + "period" bigint NOT NULL, + "index" int NOT NULL, + "validator" bigint NOT NULL, + CONSTRAINT "sync_assignments_pkey" PRIMARY KEY ("period", "index") +); + +CREATE INDEX IF NOT EXISTS "sync_assignments_validator_idx" + ON public."sync_assignments" + ("validator" ASC NULLS LAST); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +SELECT 'NOT SUPPORTED'; +-- +goose StatementEnd diff --git a/db/schema/sqlite/20230906021153_sync-duties.sql b/db/schema/sqlite/20230906021153_sync-duties.sql new file mode 100644 index 00000000..7461faf0 --- /dev/null +++ b/db/schema/sqlite/20230906021153_sync-duties.sql @@ -0,0 +1,20 @@ +-- +goose Up +-- +goose StatementBegin + +CREATE TABLE IF NOT EXISTS "sync_assignments" +( + "period" bigint NOT NULL, + "index" int NOT NULL, + "validator" bigint NOT NULL, + CONSTRAINT "sync_assignments_pkey" PRIMARY KEY ("period", "index") +); + +CREATE INDEX IF NOT EXISTS "sync_assignments_validator_idx" + ON "sync_assignments" + ("validator" ASC); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +SELECT 'NOT SUPPORTED'; +-- +goose StatementEnd diff --git a/dbtypes/dbtypes.go b/dbtypes/dbtypes.go index 1d7571c7..7f0c5678 100644 --- a/dbtypes/dbtypes.go +++ b/dbtypes/dbtypes.go @@ -71,6 +71,12 @@ type SlotAssignment struct { Proposer uint64 `db:"proposer"` } +type SyncAssignment struct { + Period uint64 `db:"period"` + Index uint32 `db:"index"` + Validator uint64 `db:"validator"` +} + type UnfinalizedBlock struct { Root []byte `db:"root"` Slot uint64 `db:"slot"` diff --git a/handlers/slot.go b/handlers/slot.go index ec4b74dd..582ea328 100644 --- a/handlers/slot.go +++ b/handlers/slot.go @@ -15,6 +15,7 @@ import ( "github.com/juliangruber/go-intersect" "github.com/sirupsen/logrus" + "github.com/pk910/light-beaconchain-explorer/db" "github.com/pk910/light-beaconchain-explorer/dbtypes" "github.com/pk910/light-beaconchain-explorer/rpctypes" "github.com/pk910/light-beaconchain-explorer/services" @@ -57,7 +58,10 @@ func Slot(w http.ResponseWriter, r *http.Request) { } } - pageData := getSlotPageData(blockSlot, blockRootHash) + urlArgs := r.URL.Query() + loadDuties := urlArgs.Has("duties") + + pageData := getSlotPageData(blockSlot, blockRootHash, loadDuties) if pageData == nil { data := InitPageData(w, r, "blockchain", "/slots", fmt.Sprintf("Slot %v", slotOrHash), notfoundTemplateFiles) data.Data = "slot" @@ -67,7 +71,6 @@ func Slot(w http.ResponseWriter, r *http.Request) { return } - urlArgs := r.URL.Query() if urlArgs.Has("blob") && pageData.Block != nil { commitment, err := hex.DecodeString(strings.Replace(urlArgs.Get("blob"), "0x", "", -1)) var blobData *dbtypes.Blob @@ -145,18 +148,18 @@ func SlotBlob(w http.ResponseWriter, r *http.Request) { } } -func getSlotPageData(blockSlot int64, blockRoot []byte) *models.SlotPageData { +func getSlotPageData(blockSlot int64, blockRoot []byte, loadDuties bool) *models.SlotPageData { pageData := &models.SlotPageData{} - pageCacheKey := fmt.Sprintf("slot:%v:%x", blockSlot, blockRoot) + pageCacheKey := fmt.Sprintf("slot:%v:%x:%v", blockSlot, blockRoot, loadDuties) pageData = services.GlobalFrontendCache.ProcessCachedPage(pageCacheKey, true, pageData, func(pageCall *services.FrontendCacheProcessingPage) interface{} { - pageData, cacheTimeout := buildSlotPageData(blockSlot, blockRoot) + pageData, cacheTimeout := buildSlotPageData(blockSlot, blockRoot, loadDuties) pageCall.CacheTimeout = cacheTimeout return pageData }).(*models.SlotPageData) return pageData } -func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData, time.Duration) { +func buildSlotPageData(blockSlot int64, blockRoot []byte, loadDuties bool) (*models.SlotPageData, time.Duration) { currentSlot := utils.TimeToSlot(uint64(time.Now().Unix())) finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch() var blockData *rpctypes.CombinedBlockResponse @@ -207,10 +210,15 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData, EpochFinalized: finalizedEpoch >= int64(utils.EpochOfSlot(slot)), } - assignments, err := services.GlobalBeaconService.GetEpochAssignments(utils.EpochOfSlot(slot)) - if err != nil { - logrus.Printf("assignments error: %v", err) - // we can safely continue here. the UI is prepared to work without epoch duties, but fields related to the duties are not shown + var assignments *rpctypes.EpochAssignments + if loadDuties { + assignmentsRsp, err := services.GlobalBeaconService.GetEpochAssignments(utils.EpochOfSlot(slot)) + if err != nil { + logrus.Printf("assignments error: %v", err) + // we can safely continue here. the UI is prepared to work without epoch duties, but fields related to the duties are not shown + } else { + assignments = assignmentsRsp + } } var cacheTimeout time.Duration @@ -231,14 +239,20 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData, if blockData == nil { pageData.Status = uint16(models.SlotStatusMissed) - + pageData.Proposer = math.MaxInt64 if assignments != nil { pageData.Proposer = assignments.ProposerAssignments[slot] - pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer) - } else { - pageData.Proposer = math.MaxInt64 + } else if epochStats := services.GlobalBeaconService.GetIndexer().GetCachedEpochStats(utils.EpochOfSlot(slot)); epochStats != nil { + if proposers := epochStats.TryGetProposerAssignments(); proposers != nil { + pageData.Proposer = proposers[slot] + } } - + if pageData.Proposer == math.MaxInt64 { + if assignment := db.GetSlotAssignment(slot); assignment != nil { + pageData.Proposer = assignment.Proposer + } + } + pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer) } else { if blockData.Orphaned { pageData.Status = uint16(models.SlotStatusOrphaned) @@ -247,13 +261,13 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData, } pageData.Proposer = uint64(blockData.Block.Message.ProposerIndex) pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer) - pageData.Block = getSlotPageBlockData(blockData, assignments) + pageData.Block = getSlotPageBlockData(blockData, assignments, loadDuties) } return pageData, cacheTimeout } -func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments *rpctypes.EpochAssignments) *models.SlotPageBlockData { +func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments *rpctypes.EpochAssignments, loadDuties bool) *models.SlotPageBlockData { pageData := &models.SlotPageBlockData{ BlockRoot: blockData.Root, ParentRoot: blockData.Header.Message.ParentRoot, @@ -270,6 +284,7 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments DepositsCount: uint64(len(blockData.Block.Message.Body.Deposits)), VoluntaryExitsCount: uint64(len(blockData.Block.Message.Body.VoluntaryExits)), SlashingsCount: uint64(len(blockData.Block.Message.Body.ProposerSlashings)) + uint64(len(blockData.Block.Message.Body.AttesterSlashings)), + DutiesLoaded: loadDuties, } epoch := utils.EpochOfSlot(uint64(blockData.Header.Message.Slot)) @@ -283,7 +298,7 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments attestation := blockData.Block.Message.Body.Attestations[i] var attAssignments []uint64 attEpoch := utils.EpochOfSlot(uint64(attestation.Data.Slot)) - if !assignmentsLoaded[attEpoch] { // load epoch duties if needed + if !assignmentsLoaded[attEpoch] && loadDuties { // load epoch duties if needed attEpochAssignments, _ := services.GlobalBeaconService.GetEpochAssignments(attEpoch) assignmentsMap[attEpoch] = attEpochAssignments assignmentsLoaded[attEpoch] = true @@ -401,9 +416,20 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments syncAggregate := blockData.Block.Message.Body.SyncAggregate pageData.SyncAggregateBits = syncAggregate.SyncCommitteeBits pageData.SyncAggregateSignature = syncAggregate.SyncCommitteeSignature + var syncAssignments []uint64 if assignments != nil { - pageData.SyncAggCommittee = make([]types.NamedValidator, len(assignments.SyncAssignments)) - for idx, vidx := range assignments.SyncAssignments { + syncAssignments = assignments.SyncAssignments + } else if epochStats := services.GlobalBeaconService.GetIndexer().GetCachedEpochStats(epoch); epochStats != nil { + syncAssignments = epochStats.TryGetSyncAssignments() + } + if syncAssignments == nil { + syncPeriod := epoch / utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod + syncAssignments = db.GetSyncAssignmentsForPeriod(syncPeriod) + } + + if len(syncAssignments) != 0 { + pageData.SyncAggCommittee = make([]types.NamedValidator, len(syncAssignments)) + for idx, vidx := range syncAssignments { pageData.SyncAggCommittee[idx] = types.NamedValidator{ Index: vidx, Name: services.GlobalBeaconService.GetValidatorName(vidx), diff --git a/indexer/cache_logic.go b/indexer/cache_logic.go index 68ba48d5..871ee595 100644 --- a/indexer/cache_logic.go +++ b/indexer/cache_logic.go @@ -189,6 +189,12 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error { return err } + err = persistSyncAssignments(epoch, epochStats, tx) + if err != nil { + logger.Errorf("error persisting sync committee assignments to db: %v", err) + return err + } + if len(blobs) > 0 { for _, blob := range blobs { err := cache.indexer.BlobStore.saveBlob(blob, tx) diff --git a/indexer/epoch_stats.go b/indexer/epoch_stats.go index 3b36becd..aa35149b 100644 --- a/indexer/epoch_stats.go +++ b/indexer/epoch_stats.go @@ -124,6 +124,14 @@ func (epochStats *EpochStats) GetProposerAssignments() map[uint64]uint64 { return epochStats.proposerAssignments } +func (epochStats *EpochStats) TryGetProposerAssignments() map[uint64]uint64 { + if !epochStats.dutiesMutex.TryRLock() { + return nil + } + defer epochStats.dutiesMutex.RUnlock() + return epochStats.proposerAssignments +} + func (epochStats *EpochStats) GetAttestorAssignments() map[string][]uint64 { epochStats.dutiesMutex.RLock() defer epochStats.dutiesMutex.RUnlock() @@ -136,6 +144,14 @@ func (epochStats *EpochStats) GetSyncAssignments() []uint64 { return epochStats.syncAssignments } +func (epochStats *EpochStats) TryGetSyncAssignments() []uint64 { + if !epochStats.dutiesMutex.TryRLock() { + return nil + } + defer epochStats.dutiesMutex.RUnlock() + return epochStats.syncAssignments +} + func (client *IndexerClient) ensureEpochStats(epoch uint64, head []byte) error { var dependentRoot []byte var proposerRsp *rpctypes.StandardV1ProposerDutiesResponse diff --git a/indexer/synchronizer.go b/indexer/synchronizer.go index 37fc47c4..f679e510 100644 --- a/indexer/synchronizer.go +++ b/indexer/synchronizer.go @@ -273,6 +273,11 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipCli return false, client, fmt.Errorf("error persisting epoch data to db: %v", err) } + err = persistSyncAssignments(syncEpoch, epochStats, tx) + if err != nil { + return false, client, fmt.Errorf("error persisting sync committee assignments to db: %v", err) + } + if len(blobs) > 0 { for _, blob := range blobs { err := sync.indexer.BlobStore.saveBlob(blob, tx) diff --git a/indexer/write_db.go b/indexer/write_db.go index 156e1ee5..b92ebee9 100644 --- a/indexer/write_db.go +++ b/indexer/write_db.go @@ -55,6 +55,29 @@ func persistEpochData(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats return nil } +func persistSyncAssignments(epoch uint64, epochStats *EpochStats, tx *sqlx.Tx) error { + if epoch < utils.Config.Chain.Config.AltairForkEpoch { + // no sync committees before altair + return nil + } + period := epoch / utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod + isStartOfPeriod := epoch == period*utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod + if !isStartOfPeriod && db.IsSyncCommitteeSynchronized(period) { + // already synchronized + return nil + } + + syncAssignments := make([]*dbtypes.SyncAssignment, 0) + for idx, val := range epochStats.syncAssignments { + syncAssignments = append(syncAssignments, &dbtypes.SyncAssignment{ + Period: period, + Index: uint32(idx), + Validator: val, + }) + } + return db.InsertSyncAssignments(syncAssignments, tx) +} + func buildDbBlock(block *CacheBlock, epochStats *EpochStats) *dbtypes.Block { blockBody := block.GetBlockBody() if blockBody == nil { diff --git a/templates/slot/slot.html b/templates/slot/slot.html index 3f2b5c7a..938b94af 100644 --- a/templates/slot/slot.html +++ b/templates/slot/slot.html @@ -80,7 +80,15 @@

-

Showing {{ .Block.AttestationsCount }} Attestations

+
+

+ Showing {{ .Block.AttestationsCount }} Attestations +

+
+ {{ if not .Block.DutiesLoaded -}} + Load Validator Indexes + {{- end }} +
diff --git a/types/models/slot.go b/types/models/slot.go index 9ddb8b29..0252fd3d 100644 --- a/types/models/slot.go +++ b/types/models/slot.go @@ -51,8 +51,9 @@ type SlotPageBlockData struct { WithdrawalsCount uint64 `json:"withdrawals_count"` BLSChangesCount uint64 `json:"bls_changes_count"` VoluntaryExitsCount uint64 `json:"voluntaryexits_count"` - SlashingsCount uint64 - BlobsCount uint64 `json:"blobs_count"` + SlashingsCount uint64 `json:"slashings_count"` + BlobsCount uint64 `json:"blobs_count"` + DutiesLoaded bool `json:"duties_loaded"` ExecutionData *SlotPageExecutionData `json:"execution_data"` Attestations []*SlotPageAttestation `json:"attestations"` // Attestations included in this block