Skip to content

Commit

Permalink
Merge pull request #17 from pk910/sync-committee-indexing
Browse files Browse the repository at this point in the history
Sync committee assignment indexing & slot page speedup
  • Loading branch information
pk910 authored Sep 6, 2023
2 parents 0f97e9a + b238613 commit 030495d
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 24 deletions.
70 changes: 69 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,15 @@ func IsEpochSynchronized(epoch uint64) bool {
return count > 0
}

func IsSyncCommitteeSynchronized(period uint64) bool {
var count uint64
err := ReaderDb.Get(&count, `SELECT COUNT(*) FROM sync_assignments WHERE period = $1`, period)
if err != nil {
return false
}
return count > 0
}

func InsertSlotAssignments(slotAssignments []*dbtypes.SlotAssignment, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
Expand Down Expand Up @@ -332,6 +341,35 @@ func InsertSlotAssignments(slotAssignments []*dbtypes.SlotAssignment, tx *sqlx.T
return nil
}

func InsertSyncAssignments(syncAssignments []*dbtypes.SyncAssignment, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `INSERT INTO sync_assignments (period, "index", validator) VALUES `,
dbtypes.DBEngineSqlite: `INSERT OR REPLACE INTO sync_assignments (period, "index", validator) VALUES `,
}))
argIdx := 0
args := make([]any, len(syncAssignments)*3)
for i, slotAssignment := range syncAssignments {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "($%v, $%v, $%v)", argIdx+1, argIdx+2, argIdx+3)
args[argIdx] = slotAssignment.Period
args[argIdx+1] = slotAssignment.Index
args[argIdx+2] = slotAssignment.Validator
argIdx += 3
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: ` ON CONFLICT (period, "index") DO UPDATE SET validator = excluded.validator`,
dbtypes.DBEngineSqlite: "",
}))
_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

func InsertBlock(block *dbtypes.Block, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
Expand Down Expand Up @@ -636,7 +674,37 @@ func GetSlotAssignmentsForSlots(firstSlot uint64, lastSlot uint64) []*dbtypes.Sl
WHERE slot <= $1 AND slot >= $2
`, firstSlot, lastSlot)
if err != nil {
logger.Errorf("Error while fetching blocks: %v", err)
logger.Errorf("Error while fetching slot assignments: %v", err)
return nil
}
return assignments
}

func GetSlotAssignment(slot uint64) *dbtypes.SlotAssignment {
assignment := dbtypes.SlotAssignment{}
err := ReaderDb.Get(&assignment, `
SELECT
slot, proposer
FROM slot_assignments
WHERE slot = $1
`, slot)
if err != nil {
return nil
}
return &assignment
}

func GetSyncAssignmentsForPeriod(period uint64) []uint64 {
assignments := []uint64{}
err := ReaderDb.Select(&assignments, `
SELECT
validator
FROM sync_assignments
WHERE period = $1
ORDER BY "index" ASC
`, period)
if err != nil {
logger.Errorf("Error while fetching sync assignments: %v", err)
return nil
}
return assignments
Expand Down
20 changes: 20 additions & 0 deletions db/schema/pgsql/20230906021153_sync-duties.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS public."sync_assignments"
(
"period" bigint NOT NULL,
"index" int NOT NULL,
"validator" bigint NOT NULL,
CONSTRAINT "sync_assignments_pkey" PRIMARY KEY ("period", "index")
);

CREATE INDEX IF NOT EXISTS "sync_assignments_validator_idx"
ON public."sync_assignments"
("validator" ASC NULLS LAST);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
20 changes: 20 additions & 0 deletions db/schema/sqlite/20230906021153_sync-duties.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS "sync_assignments"
(
"period" bigint NOT NULL,
"index" int NOT NULL,
"validator" bigint NOT NULL,
CONSTRAINT "sync_assignments_pkey" PRIMARY KEY ("period", "index")
);

CREATE INDEX IF NOT EXISTS "sync_assignments_validator_idx"
ON "sync_assignments"
("validator" ASC);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
6 changes: 6 additions & 0 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type SlotAssignment struct {
Proposer uint64 `db:"proposer"`
}

type SyncAssignment struct {
Period uint64 `db:"period"`
Index uint32 `db:"index"`
Validator uint64 `db:"validator"`
}

type UnfinalizedBlock struct {
Root []byte `db:"root"`
Slot uint64 `db:"slot"`
Expand Down
66 changes: 46 additions & 20 deletions handlers/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/juliangruber/go-intersect"
"github.com/sirupsen/logrus"

"github.com/pk910/light-beaconchain-explorer/db"
"github.com/pk910/light-beaconchain-explorer/dbtypes"
"github.com/pk910/light-beaconchain-explorer/rpctypes"
"github.com/pk910/light-beaconchain-explorer/services"
Expand Down Expand Up @@ -57,7 +58,10 @@ func Slot(w http.ResponseWriter, r *http.Request) {
}
}

pageData := getSlotPageData(blockSlot, blockRootHash)
urlArgs := r.URL.Query()
loadDuties := urlArgs.Has("duties")

pageData := getSlotPageData(blockSlot, blockRootHash, loadDuties)
if pageData == nil {
data := InitPageData(w, r, "blockchain", "/slots", fmt.Sprintf("Slot %v", slotOrHash), notfoundTemplateFiles)
data.Data = "slot"
Expand All @@ -67,7 +71,6 @@ func Slot(w http.ResponseWriter, r *http.Request) {
return
}

urlArgs := r.URL.Query()
if urlArgs.Has("blob") && pageData.Block != nil {
commitment, err := hex.DecodeString(strings.Replace(urlArgs.Get("blob"), "0x", "", -1))
var blobData *dbtypes.Blob
Expand Down Expand Up @@ -145,18 +148,18 @@ func SlotBlob(w http.ResponseWriter, r *http.Request) {
}
}

func getSlotPageData(blockSlot int64, blockRoot []byte) *models.SlotPageData {
func getSlotPageData(blockSlot int64, blockRoot []byte, loadDuties bool) *models.SlotPageData {
pageData := &models.SlotPageData{}
pageCacheKey := fmt.Sprintf("slot:%v:%x", blockSlot, blockRoot)
pageCacheKey := fmt.Sprintf("slot:%v:%x:%v", blockSlot, blockRoot, loadDuties)
pageData = services.GlobalFrontendCache.ProcessCachedPage(pageCacheKey, true, pageData, func(pageCall *services.FrontendCacheProcessingPage) interface{} {
pageData, cacheTimeout := buildSlotPageData(blockSlot, blockRoot)
pageData, cacheTimeout := buildSlotPageData(blockSlot, blockRoot, loadDuties)
pageCall.CacheTimeout = cacheTimeout
return pageData
}).(*models.SlotPageData)
return pageData
}

func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData, time.Duration) {
func buildSlotPageData(blockSlot int64, blockRoot []byte, loadDuties bool) (*models.SlotPageData, time.Duration) {
currentSlot := utils.TimeToSlot(uint64(time.Now().Unix()))
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()
var blockData *rpctypes.CombinedBlockResponse
Expand Down Expand Up @@ -207,10 +210,15 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData,
EpochFinalized: finalizedEpoch >= int64(utils.EpochOfSlot(slot)),
}

assignments, err := services.GlobalBeaconService.GetEpochAssignments(utils.EpochOfSlot(slot))
if err != nil {
logrus.Printf("assignments error: %v", err)
// we can safely continue here. the UI is prepared to work without epoch duties, but fields related to the duties are not shown
var assignments *rpctypes.EpochAssignments
if loadDuties {
assignmentsRsp, err := services.GlobalBeaconService.GetEpochAssignments(utils.EpochOfSlot(slot))
if err != nil {
logrus.Printf("assignments error: %v", err)
// we can safely continue here. the UI is prepared to work without epoch duties, but fields related to the duties are not shown
} else {
assignments = assignmentsRsp
}
}

var cacheTimeout time.Duration
Expand All @@ -231,14 +239,20 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData,

if blockData == nil {
pageData.Status = uint16(models.SlotStatusMissed)

pageData.Proposer = math.MaxInt64
if assignments != nil {
pageData.Proposer = assignments.ProposerAssignments[slot]
pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer)
} else {
pageData.Proposer = math.MaxInt64
} else if epochStats := services.GlobalBeaconService.GetIndexer().GetCachedEpochStats(utils.EpochOfSlot(slot)); epochStats != nil {
if proposers := epochStats.TryGetProposerAssignments(); proposers != nil {
pageData.Proposer = proposers[slot]
}
}

if pageData.Proposer == math.MaxInt64 {
if assignment := db.GetSlotAssignment(slot); assignment != nil {
pageData.Proposer = assignment.Proposer
}
}
pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer)
} else {
if blockData.Orphaned {
pageData.Status = uint16(models.SlotStatusOrphaned)
Expand All @@ -247,13 +261,13 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData,
}
pageData.Proposer = uint64(blockData.Block.Message.ProposerIndex)
pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer)
pageData.Block = getSlotPageBlockData(blockData, assignments)
pageData.Block = getSlotPageBlockData(blockData, assignments, loadDuties)
}

return pageData, cacheTimeout
}

func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments *rpctypes.EpochAssignments) *models.SlotPageBlockData {
func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments *rpctypes.EpochAssignments, loadDuties bool) *models.SlotPageBlockData {
pageData := &models.SlotPageBlockData{
BlockRoot: blockData.Root,
ParentRoot: blockData.Header.Message.ParentRoot,
Expand All @@ -270,6 +284,7 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments
DepositsCount: uint64(len(blockData.Block.Message.Body.Deposits)),
VoluntaryExitsCount: uint64(len(blockData.Block.Message.Body.VoluntaryExits)),
SlashingsCount: uint64(len(blockData.Block.Message.Body.ProposerSlashings)) + uint64(len(blockData.Block.Message.Body.AttesterSlashings)),
DutiesLoaded: loadDuties,
}

epoch := utils.EpochOfSlot(uint64(blockData.Header.Message.Slot))
Expand All @@ -283,7 +298,7 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments
attestation := blockData.Block.Message.Body.Attestations[i]
var attAssignments []uint64
attEpoch := utils.EpochOfSlot(uint64(attestation.Data.Slot))
if !assignmentsLoaded[attEpoch] { // load epoch duties if needed
if !assignmentsLoaded[attEpoch] && loadDuties { // load epoch duties if needed
attEpochAssignments, _ := services.GlobalBeaconService.GetEpochAssignments(attEpoch)
assignmentsMap[attEpoch] = attEpochAssignments
assignmentsLoaded[attEpoch] = true
Expand Down Expand Up @@ -401,9 +416,20 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments
syncAggregate := blockData.Block.Message.Body.SyncAggregate
pageData.SyncAggregateBits = syncAggregate.SyncCommitteeBits
pageData.SyncAggregateSignature = syncAggregate.SyncCommitteeSignature
var syncAssignments []uint64
if assignments != nil {
pageData.SyncAggCommittee = make([]types.NamedValidator, len(assignments.SyncAssignments))
for idx, vidx := range assignments.SyncAssignments {
syncAssignments = assignments.SyncAssignments
} else if epochStats := services.GlobalBeaconService.GetIndexer().GetCachedEpochStats(epoch); epochStats != nil {
syncAssignments = epochStats.TryGetSyncAssignments()
}
if syncAssignments == nil {
syncPeriod := epoch / utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod
syncAssignments = db.GetSyncAssignmentsForPeriod(syncPeriod)
}

if len(syncAssignments) != 0 {
pageData.SyncAggCommittee = make([]types.NamedValidator, len(syncAssignments))
for idx, vidx := range syncAssignments {
pageData.SyncAggCommittee[idx] = types.NamedValidator{
Index: vidx,
Name: services.GlobalBeaconService.GetValidatorName(vidx),
Expand Down
6 changes: 6 additions & 0 deletions indexer/cache_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error {
return err
}

err = persistSyncAssignments(epoch, epochStats, tx)
if err != nil {
logger.Errorf("error persisting sync committee assignments to db: %v", err)
return err
}

if len(blobs) > 0 {
for _, blob := range blobs {
err := cache.indexer.BlobStore.saveBlob(blob, tx)
Expand Down
16 changes: 16 additions & 0 deletions indexer/epoch_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ func (epochStats *EpochStats) GetProposerAssignments() map[uint64]uint64 {
return epochStats.proposerAssignments
}

func (epochStats *EpochStats) TryGetProposerAssignments() map[uint64]uint64 {
if !epochStats.dutiesMutex.TryRLock() {
return nil
}
defer epochStats.dutiesMutex.RUnlock()
return epochStats.proposerAssignments
}

func (epochStats *EpochStats) GetAttestorAssignments() map[string][]uint64 {
epochStats.dutiesMutex.RLock()
defer epochStats.dutiesMutex.RUnlock()
Expand All @@ -136,6 +144,14 @@ func (epochStats *EpochStats) GetSyncAssignments() []uint64 {
return epochStats.syncAssignments
}

func (epochStats *EpochStats) TryGetSyncAssignments() []uint64 {
if !epochStats.dutiesMutex.TryRLock() {
return nil
}
defer epochStats.dutiesMutex.RUnlock()
return epochStats.syncAssignments
}

func (client *IndexerClient) ensureEpochStats(epoch uint64, head []byte) error {
var dependentRoot []byte
var proposerRsp *rpctypes.StandardV1ProposerDutiesResponse
Expand Down
5 changes: 5 additions & 0 deletions indexer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipCli
return false, client, fmt.Errorf("error persisting epoch data to db: %v", err)
}

err = persistSyncAssignments(syncEpoch, epochStats, tx)
if err != nil {
return false, client, fmt.Errorf("error persisting sync committee assignments to db: %v", err)
}

if len(blobs) > 0 {
for _, blob := range blobs {
err := sync.indexer.BlobStore.saveBlob(blob, tx)
Expand Down
23 changes: 23 additions & 0 deletions indexer/write_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ func persistEpochData(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats
return nil
}

func persistSyncAssignments(epoch uint64, epochStats *EpochStats, tx *sqlx.Tx) error {
if epoch < utils.Config.Chain.Config.AltairForkEpoch {
// no sync committees before altair
return nil
}
period := epoch / utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod
isStartOfPeriod := epoch == period*utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod
if !isStartOfPeriod && db.IsSyncCommitteeSynchronized(period) {
// already synchronized
return nil
}

syncAssignments := make([]*dbtypes.SyncAssignment, 0)
for idx, val := range epochStats.syncAssignments {
syncAssignments = append(syncAssignments, &dbtypes.SyncAssignment{
Period: period,
Index: uint32(idx),
Validator: val,
})
}
return db.InsertSyncAssignments(syncAssignments, tx)
}

func buildDbBlock(block *CacheBlock, epochStats *EpochStats) *dbtypes.Block {
blockBody := block.GetBlockBody()
if blockBody == nil {
Expand Down
Loading

0 comments on commit 030495d

Please sign in to comment.