Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync committee assignment indexing & slot page speedup #17

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,15 @@ func IsEpochSynchronized(epoch uint64) bool {
return count > 0
}

func IsSyncCommitteeSynchronized(period uint64) bool {
var count uint64
err := ReaderDb.Get(&count, `SELECT COUNT(*) FROM sync_assignments WHERE period = $1`, period)
if err != nil {
return false
}
return count > 0
}

func InsertSlotAssignments(slotAssignments []*dbtypes.SlotAssignment, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
Expand Down Expand Up @@ -332,6 +341,35 @@ func InsertSlotAssignments(slotAssignments []*dbtypes.SlotAssignment, tx *sqlx.T
return nil
}

func InsertSyncAssignments(syncAssignments []*dbtypes.SyncAssignment, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `INSERT INTO sync_assignments (period, "index", validator) VALUES `,
dbtypes.DBEngineSqlite: `INSERT OR REPLACE INTO sync_assignments (period, "index", validator) VALUES `,
}))
argIdx := 0
args := make([]any, len(syncAssignments)*3)
for i, slotAssignment := range syncAssignments {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "($%v, $%v, $%v)", argIdx+1, argIdx+2, argIdx+3)
args[argIdx] = slotAssignment.Period
args[argIdx+1] = slotAssignment.Index
args[argIdx+2] = slotAssignment.Validator
argIdx += 3
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: ` ON CONFLICT (period, "index") DO UPDATE SET validator = excluded.validator`,
dbtypes.DBEngineSqlite: "",
}))
_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

func InsertBlock(block *dbtypes.Block, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
Expand Down Expand Up @@ -636,7 +674,37 @@ func GetSlotAssignmentsForSlots(firstSlot uint64, lastSlot uint64) []*dbtypes.Sl
WHERE slot <= $1 AND slot >= $2
`, firstSlot, lastSlot)
if err != nil {
logger.Errorf("Error while fetching blocks: %v", err)
logger.Errorf("Error while fetching slot assignments: %v", err)
return nil
}
return assignments
}

func GetSlotAssignment(slot uint64) *dbtypes.SlotAssignment {
assignment := dbtypes.SlotAssignment{}
err := ReaderDb.Get(&assignment, `
SELECT
slot, proposer
FROM slot_assignments
WHERE slot = $1
`, slot)
if err != nil {
return nil
}
return &assignment
}

func GetSyncAssignmentsForPeriod(period uint64) []uint64 {
assignments := []uint64{}
err := ReaderDb.Select(&assignments, `
SELECT
validator
FROM sync_assignments
WHERE period = $1
ORDER BY "index" ASC
`, period)
if err != nil {
logger.Errorf("Error while fetching sync assignments: %v", err)
return nil
}
return assignments
Expand Down
20 changes: 20 additions & 0 deletions db/schema/pgsql/20230906021153_sync-duties.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS public."sync_assignments"
(
"period" bigint NOT NULL,
"index" int NOT NULL,
"validator" bigint NOT NULL,
CONSTRAINT "sync_assignments_pkey" PRIMARY KEY ("period", "index")
);

CREATE INDEX IF NOT EXISTS "sync_assignments_validator_idx"
ON public."sync_assignments"
("validator" ASC NULLS LAST);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
20 changes: 20 additions & 0 deletions db/schema/sqlite/20230906021153_sync-duties.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS "sync_assignments"
(
"period" bigint NOT NULL,
"index" int NOT NULL,
"validator" bigint NOT NULL,
CONSTRAINT "sync_assignments_pkey" PRIMARY KEY ("period", "index")
);

CREATE INDEX IF NOT EXISTS "sync_assignments_validator_idx"
ON "sync_assignments"
("validator" ASC);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
6 changes: 6 additions & 0 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type SlotAssignment struct {
Proposer uint64 `db:"proposer"`
}

type SyncAssignment struct {
Period uint64 `db:"period"`
Index uint32 `db:"index"`
Validator uint64 `db:"validator"`
}

type UnfinalizedBlock struct {
Root []byte `db:"root"`
Slot uint64 `db:"slot"`
Expand Down
66 changes: 46 additions & 20 deletions handlers/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/juliangruber/go-intersect"
"github.com/sirupsen/logrus"

"github.com/pk910/light-beaconchain-explorer/db"
"github.com/pk910/light-beaconchain-explorer/dbtypes"
"github.com/pk910/light-beaconchain-explorer/rpctypes"
"github.com/pk910/light-beaconchain-explorer/services"
Expand Down Expand Up @@ -57,7 +58,10 @@ func Slot(w http.ResponseWriter, r *http.Request) {
}
}

pageData := getSlotPageData(blockSlot, blockRootHash)
urlArgs := r.URL.Query()
loadDuties := urlArgs.Has("duties")

pageData := getSlotPageData(blockSlot, blockRootHash, loadDuties)
if pageData == nil {
data := InitPageData(w, r, "blockchain", "/slots", fmt.Sprintf("Slot %v", slotOrHash), notfoundTemplateFiles)
data.Data = "slot"
Expand All @@ -67,7 +71,6 @@ func Slot(w http.ResponseWriter, r *http.Request) {
return
}

urlArgs := r.URL.Query()
if urlArgs.Has("blob") && pageData.Block != nil {
commitment, err := hex.DecodeString(strings.Replace(urlArgs.Get("blob"), "0x", "", -1))
var blobData *dbtypes.Blob
Expand Down Expand Up @@ -145,18 +148,18 @@ func SlotBlob(w http.ResponseWriter, r *http.Request) {
}
}

func getSlotPageData(blockSlot int64, blockRoot []byte) *models.SlotPageData {
func getSlotPageData(blockSlot int64, blockRoot []byte, loadDuties bool) *models.SlotPageData {
pageData := &models.SlotPageData{}
pageCacheKey := fmt.Sprintf("slot:%v:%x", blockSlot, blockRoot)
pageCacheKey := fmt.Sprintf("slot:%v:%x:%v", blockSlot, blockRoot, loadDuties)
pageData = services.GlobalFrontendCache.ProcessCachedPage(pageCacheKey, true, pageData, func(pageCall *services.FrontendCacheProcessingPage) interface{} {
pageData, cacheTimeout := buildSlotPageData(blockSlot, blockRoot)
pageData, cacheTimeout := buildSlotPageData(blockSlot, blockRoot, loadDuties)
pageCall.CacheTimeout = cacheTimeout
return pageData
}).(*models.SlotPageData)
return pageData
}

func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData, time.Duration) {
func buildSlotPageData(blockSlot int64, blockRoot []byte, loadDuties bool) (*models.SlotPageData, time.Duration) {
currentSlot := utils.TimeToSlot(uint64(time.Now().Unix()))
finalizedEpoch, _ := services.GlobalBeaconService.GetFinalizedEpoch()
var blockData *rpctypes.CombinedBlockResponse
Expand Down Expand Up @@ -207,10 +210,15 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData,
EpochFinalized: finalizedEpoch >= int64(utils.EpochOfSlot(slot)),
}

assignments, err := services.GlobalBeaconService.GetEpochAssignments(utils.EpochOfSlot(slot))
if err != nil {
logrus.Printf("assignments error: %v", err)
// we can safely continue here. the UI is prepared to work without epoch duties, but fields related to the duties are not shown
var assignments *rpctypes.EpochAssignments
if loadDuties {
assignmentsRsp, err := services.GlobalBeaconService.GetEpochAssignments(utils.EpochOfSlot(slot))
if err != nil {
logrus.Printf("assignments error: %v", err)
// we can safely continue here. the UI is prepared to work without epoch duties, but fields related to the duties are not shown
} else {
assignments = assignmentsRsp
}
}

var cacheTimeout time.Duration
Expand All @@ -231,14 +239,20 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData,

if blockData == nil {
pageData.Status = uint16(models.SlotStatusMissed)

pageData.Proposer = math.MaxInt64
if assignments != nil {
pageData.Proposer = assignments.ProposerAssignments[slot]
pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer)
} else {
pageData.Proposer = math.MaxInt64
} else if epochStats := services.GlobalBeaconService.GetIndexer().GetCachedEpochStats(utils.EpochOfSlot(slot)); epochStats != nil {
if proposers := epochStats.TryGetProposerAssignments(); proposers != nil {
pageData.Proposer = proposers[slot]
}
}

if pageData.Proposer == math.MaxInt64 {
if assignment := db.GetSlotAssignment(slot); assignment != nil {
pageData.Proposer = assignment.Proposer
}
}
pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer)
} else {
if blockData.Orphaned {
pageData.Status = uint16(models.SlotStatusOrphaned)
Expand All @@ -247,13 +261,13 @@ func buildSlotPageData(blockSlot int64, blockRoot []byte) (*models.SlotPageData,
}
pageData.Proposer = uint64(blockData.Block.Message.ProposerIndex)
pageData.ProposerName = services.GlobalBeaconService.GetValidatorName(pageData.Proposer)
pageData.Block = getSlotPageBlockData(blockData, assignments)
pageData.Block = getSlotPageBlockData(blockData, assignments, loadDuties)
}

return pageData, cacheTimeout
}

func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments *rpctypes.EpochAssignments) *models.SlotPageBlockData {
func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments *rpctypes.EpochAssignments, loadDuties bool) *models.SlotPageBlockData {
pageData := &models.SlotPageBlockData{
BlockRoot: blockData.Root,
ParentRoot: blockData.Header.Message.ParentRoot,
Expand All @@ -270,6 +284,7 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments
DepositsCount: uint64(len(blockData.Block.Message.Body.Deposits)),
VoluntaryExitsCount: uint64(len(blockData.Block.Message.Body.VoluntaryExits)),
SlashingsCount: uint64(len(blockData.Block.Message.Body.ProposerSlashings)) + uint64(len(blockData.Block.Message.Body.AttesterSlashings)),
DutiesLoaded: loadDuties,
}

epoch := utils.EpochOfSlot(uint64(blockData.Header.Message.Slot))
Expand All @@ -283,7 +298,7 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments
attestation := blockData.Block.Message.Body.Attestations[i]
var attAssignments []uint64
attEpoch := utils.EpochOfSlot(uint64(attestation.Data.Slot))
if !assignmentsLoaded[attEpoch] { // load epoch duties if needed
if !assignmentsLoaded[attEpoch] && loadDuties { // load epoch duties if needed
attEpochAssignments, _ := services.GlobalBeaconService.GetEpochAssignments(attEpoch)
assignmentsMap[attEpoch] = attEpochAssignments
assignmentsLoaded[attEpoch] = true
Expand Down Expand Up @@ -401,9 +416,20 @@ func getSlotPageBlockData(blockData *rpctypes.CombinedBlockResponse, assignments
syncAggregate := blockData.Block.Message.Body.SyncAggregate
pageData.SyncAggregateBits = syncAggregate.SyncCommitteeBits
pageData.SyncAggregateSignature = syncAggregate.SyncCommitteeSignature
var syncAssignments []uint64
if assignments != nil {
pageData.SyncAggCommittee = make([]types.NamedValidator, len(assignments.SyncAssignments))
for idx, vidx := range assignments.SyncAssignments {
syncAssignments = assignments.SyncAssignments
} else if epochStats := services.GlobalBeaconService.GetIndexer().GetCachedEpochStats(epoch); epochStats != nil {
syncAssignments = epochStats.TryGetSyncAssignments()
}
if syncAssignments == nil {
syncPeriod := epoch / utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod
syncAssignments = db.GetSyncAssignmentsForPeriod(syncPeriod)
}

if len(syncAssignments) != 0 {
pageData.SyncAggCommittee = make([]types.NamedValidator, len(syncAssignments))
for idx, vidx := range syncAssignments {
pageData.SyncAggCommittee[idx] = types.NamedValidator{
Index: vidx,
Name: services.GlobalBeaconService.GetValidatorName(vidx),
Expand Down
6 changes: 6 additions & 0 deletions indexer/cache_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error {
return err
}

err = persistSyncAssignments(epoch, epochStats, tx)
if err != nil {
logger.Errorf("error persisting sync committee assignments to db: %v", err)
return err
}

if len(blobs) > 0 {
for _, blob := range blobs {
err := cache.indexer.BlobStore.saveBlob(blob, tx)
Expand Down
16 changes: 16 additions & 0 deletions indexer/epoch_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ func (epochStats *EpochStats) GetProposerAssignments() map[uint64]uint64 {
return epochStats.proposerAssignments
}

func (epochStats *EpochStats) TryGetProposerAssignments() map[uint64]uint64 {
if !epochStats.dutiesMutex.TryRLock() {
return nil
}
defer epochStats.dutiesMutex.RUnlock()
return epochStats.proposerAssignments
}

func (epochStats *EpochStats) GetAttestorAssignments() map[string][]uint64 {
epochStats.dutiesMutex.RLock()
defer epochStats.dutiesMutex.RUnlock()
Expand All @@ -136,6 +144,14 @@ func (epochStats *EpochStats) GetSyncAssignments() []uint64 {
return epochStats.syncAssignments
}

func (epochStats *EpochStats) TryGetSyncAssignments() []uint64 {
if !epochStats.dutiesMutex.TryRLock() {
return nil
}
defer epochStats.dutiesMutex.RUnlock()
return epochStats.syncAssignments
}

func (client *IndexerClient) ensureEpochStats(epoch uint64, head []byte) error {
var dependentRoot []byte
var proposerRsp *rpctypes.StandardV1ProposerDutiesResponse
Expand Down
5 changes: 5 additions & 0 deletions indexer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipCli
return false, client, fmt.Errorf("error persisting epoch data to db: %v", err)
}

err = persistSyncAssignments(syncEpoch, epochStats, tx)
if err != nil {
return false, client, fmt.Errorf("error persisting sync committee assignments to db: %v", err)
}

if len(blobs) > 0 {
for _, blob := range blobs {
err := sync.indexer.BlobStore.saveBlob(blob, tx)
Expand Down
23 changes: 23 additions & 0 deletions indexer/write_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ func persistEpochData(epoch uint64, blockMap map[uint64]*CacheBlock, epochStats
return nil
}

func persistSyncAssignments(epoch uint64, epochStats *EpochStats, tx *sqlx.Tx) error {
if epoch < utils.Config.Chain.Config.AltairForkEpoch {
// no sync committees before altair
return nil
}
period := epoch / utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod
isStartOfPeriod := epoch == period*utils.Config.Chain.Config.EpochsPerSyncCommitteePeriod
if !isStartOfPeriod && db.IsSyncCommitteeSynchronized(period) {
// already synchronized
return nil
}

syncAssignments := make([]*dbtypes.SyncAssignment, 0)
for idx, val := range epochStats.syncAssignments {
syncAssignments = append(syncAssignments, &dbtypes.SyncAssignment{
Period: period,
Index: uint32(idx),
Validator: val,
})
}
return db.InsertSyncAssignments(syncAssignments, tx)
}

func buildDbBlock(block *CacheBlock, epochStats *EpochStats) *dbtypes.Block {
blockBody := block.GetBlockBody()
if blockBody == nil {
Expand Down
Loading