Skip to content

Commit

Permalink
(BIDS-1478) Aggregate eth1-deposits, make db.GetEth1DepositsLeaderboa…
Browse files Browse the repository at this point in the history
…rd faster
  • Loading branch information
guybrush authored and recy21 committed Jan 26, 2023
1 parent 3a91d76 commit cb17664
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 56 deletions.
70 changes: 19 additions & 51 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func GetEth1DepositsCount() (uint64, error) {
return deposits, nil
}

func GetEth1DepositsLeaderboard(query string, length, start uint64, orderBy, orderDir string, latestEpoch uint64) ([]*types.EthOneDepositLeaderboardData, uint64, error) {
func GetEth1DepositsLeaderboard(query string, length, start uint64, orderBy, orderDir string) ([]*types.EthOneDepositLeaderboardData, uint64, error) {
deposits := []*types.EthOneDepositLeaderboardData{}

if orderDir != "desc" && orderDir != "asc" {
Expand Down Expand Up @@ -282,62 +282,30 @@ func GetEth1DepositsLeaderboard(query string, length, start uint64, orderBy, ord
var totalCount uint64
if query != "" {
err = ReaderDb.Get(&totalCount, `
SELECT
COUNT(from_address)
FROM
(
SELECT
from_address
FROM
eth1_deposits as eth1
WHERE
ENCODE(eth1.from_address, 'hex') LIKE LOWER($1)
GROUP BY from_address
) as count
`, query+"%")
SELECT COUNT(*) FROM eth1_deposits_aggregated WHERE ENCODE(from_address, 'hex') LIKE LOWER($1)`, query+"%")
} else {
err = ReaderDb.Get(&totalCount, "SELECT COUNT(*) FROM (SELECT from_address FROM eth1_deposits GROUP BY from_address) as count")
err = ReaderDb.Get(&totalCount, "SELECT COUNT(*) FROM eth1_deposits_aggregated AS count")
}
if err != nil && err != sql.ErrNoRows {
return nil, 0, err
}

err = ReaderDb.Select(&deposits, fmt.Sprintf(`
SELECT
eth1.from_address,
SUM(eth1.amount) as amount,
SUM(eth1.validcount) AS validcount,
SUM(eth1.invalidcount) AS invalidcount,
COUNT(CASE WHEN v.slashed = 't' THEN 1 END) AS slashedcount,
COUNT(v.pubkey) AS totalcount,
COUNT(CASE WHEN v.slashed = 'f' AND v.exitepoch > $3 AND v.activationepoch < $3 THEN 1 END) as activecount,
COUNT(CASE WHEN v.activationepoch > $3 THEN 1 END) AS pendingcount,
COUNT(CASE WHEN v.slashed = 'f' AND v.exitepoch < $3 THEN 1 END) AS voluntary_exit_count
FROM (
SELECT
from_address,
publickey,
SUM(amount) AS amount,
COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount,
COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount
FROM eth1_deposits
GROUP BY from_address, publickey
) eth1
LEFT JOIN (
SELECT
pubkey,
slashed,
exitepoch,
activationepoch,
COALESCE(validator_names.name, '') AS name
FROM validators
LEFT JOIN validator_names ON validators.pubkey = validator_names.publickey
) v ON v.pubkey = eth1.publickey
WHERE ENCODE(eth1.from_address, 'hex') LIKE LOWER($4)
GROUP BY eth1.from_address
ORDER BY %s %s
LIMIT $1
OFFSET $2`, orderBy, orderDir), length, start, latestEpoch, query+"%")
if query != "" {
err = ReaderDb.Select(&deposits, fmt.Sprintf(`
SELECT from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count
FROM eth1_deposits_aggregated
WHERE ENCODE(from_address, 'hex') LIKE LOWER($3)
ORDER BY %s %s
LIMIT $1
OFFSET $2`, orderBy, orderDir), length, start, query+"%")
} else {
err = ReaderDb.Select(&deposits, fmt.Sprintf(`
SELECT from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count
FROM eth1_deposits_aggregated
ORDER BY %s %s
LIMIT $1
OFFSET $2`, orderBy, orderDir), length, start)
}
if err != nil && err != sql.ErrNoRows {
return nil, 0, err
}
Expand Down
55 changes: 55 additions & 0 deletions exporter/eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package exporter

import (
"context"
"database/sql"
"encoding/hex"
"eth2-exporter/db"
"eth2-exporter/metrics"
"eth2-exporter/types"
"eth2-exporter/utils"
"fmt"
Expand Down Expand Up @@ -125,6 +127,15 @@ func eth1DepositsExporter() {
continue
}

if len(depositsToSave) > 0 {
err = aggregateDeposits()
logger.WithError(err).Errorf("error saving eth1-deposits-leaderboard")
if err != nil {
time.Sleep(time.Second * 5)
continue
}
}

// make sure we are progressing even if there are no deposits in the last batch
lastFetchedBlock = toBlock

Expand Down Expand Up @@ -383,3 +394,47 @@ func eth1BatchRequestHeadersAndTxs(blocksToFetch []uint64, txsToFetch []string)

return headers, txs, nil
}

func aggregateDeposits() error {
start := time.Now()
defer func() {
metrics.TaskDuration.WithLabelValues("exporter_aggregate_eth1_deposits").Observe(time.Since(start).Seconds())
}()
_, err := db.WriterDb.Exec(`
INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count)
SELECT
eth1.from_address,
SUM(eth1.amount) as amount,
SUM(eth1.validcount) AS validcount,
SUM(eth1.invalidcount) AS invalidcount,
COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount,
COUNT(v.pubkey) AS totalcount,
COUNT(CASE WHEN v.status = 'active' THEN 1 END) as activecount,
COUNT(CASE WHEN v.status = 'pending' THEN 1 END) AS pendingcount,
COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count
FROM (
SELECT
from_address,
publickey,
SUM(amount) AS amount,
COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount,
COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount
FROM eth1_deposits
GROUP BY from_address, publickey
) eth1
LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey
GROUP BY eth1.from_address
ON CONFLICT (from_address) DO UPDATE SET
amount = excluded.amount,
validcount = excluded.validcount,
invalidcount = excluded.invalidcount,
slashedcount = excluded.slashedcount,
totalcount = excluded.totalcount,
activecount = excluded.activecount,
pendingcount = excluded.pendingcount,
voluntary_exit_count = excluded.voluntary_exit_count`)
if err != nil && err != sql.ErrNoRows {
return nil
}
return err
}
7 changes: 2 additions & 5 deletions handlers/eth1Deposits.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Eth1Deposits(w http.ResponseWriter, r *http.Request) {
}

pageData.Stats = services.GetLatestStats()
pageData.DepositContract = utils.Config.Indexer.Eth1DepositContractAddress
pageData.DepositContract = utils.Config.Chain.Config.DepositContractAddress

data := InitPageData(w, r, "blockchain", "/deposits/eth1", "Initiated Deposits")
data.HeaderAd = true
Expand Down Expand Up @@ -139,7 +139,6 @@ func Eth1DepositsData(w http.ResponseWriter, r *http.Request) {

// Eth1Deposits will return information about deposits using a go template
func Eth1DepositsLeaderboard(w http.ResponseWriter, r *http.Request) {

var eth1DepositsLeaderboardTemplate = templates.GetTemplate("layout.html", "eth1DepositsLeaderboard.html")

w.Header().Set("Content-Type", "text/html")
Expand Down Expand Up @@ -206,9 +205,7 @@ func Eth1DepositsLeaderboardData(w http.ResponseWriter, r *http.Request) {

orderDir := q.Get("order[0][dir]")

latestEpoch := services.LatestEpoch()

deposits, depositCount, err := db.GetEth1DepositsLeaderboard(search, length, start, orderBy, orderDir, latestEpoch)
deposits, depositCount, err := db.GetEth1DepositsLeaderboard(search, length, start, orderBy, orderDir)
if err != nil {
logger.Errorf("GetEth1Deposits error retrieving eth1_deposit leaderboard data: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
Expand Down
15 changes: 15 additions & 0 deletions tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,21 @@ create table eth1_deposits
create index idx_eth1_deposits on eth1_deposits (publickey);
create index idx_eth1_deposits_from_address on eth1_deposits (from_address);

drop table if exists eth1_deposits_aggregated;
create table eth1_deposits_aggregated
(
from_address bytea not null,
amount bigint not null,
validcount int not null,
invalidcount int not null,
slashedcount int not null,
totalcount int not null,
activecount int not null,
pendingcount int not null,
voluntary_exit_count int not null,
primary key (from_address)
);

drop table if exists users;
create table users
(
Expand Down

0 comments on commit cb17664

Please sign in to comment.