diff --git a/db/db.go b/db/db.go index dee1c6a257..c571d9f17e 100644 --- a/db/db.go +++ b/db/db.go @@ -251,7 +251,7 @@ func GetEth1DepositsCount() (uint64, error) { return deposits, nil } -func GetEth1DepositsLeaderboard(query string, length, start uint64, orderBy, orderDir string, latestEpoch uint64) ([]*types.EthOneDepositLeaderboardData, uint64, error) { +func GetEth1DepositsLeaderboard(query string, length, start uint64, orderBy, orderDir string) ([]*types.EthOneDepositLeaderboardData, uint64, error) { deposits := []*types.EthOneDepositLeaderboardData{} if orderDir != "desc" && orderDir != "asc" { @@ -282,62 +282,30 @@ func GetEth1DepositsLeaderboard(query string, length, start uint64, orderBy, ord var totalCount uint64 if query != "" { err = ReaderDb.Get(&totalCount, ` - SELECT - COUNT(from_address) - FROM - ( - SELECT - from_address - FROM - eth1_deposits as eth1 - WHERE - ENCODE(eth1.from_address, 'hex') LIKE LOWER($1) - GROUP BY from_address - ) as count - `, query+"%") + SELECT COUNT(*) FROM eth1_deposits_aggregated WHERE ENCODE(from_address, 'hex') LIKE LOWER($1)`, query+"%") } else { - err = ReaderDb.Get(&totalCount, "SELECT COUNT(*) FROM (SELECT from_address FROM eth1_deposits GROUP BY from_address) as count") + err = ReaderDb.Get(&totalCount, "SELECT COUNT(*) FROM eth1_deposits_aggregated AS count") } if err != nil && err != sql.ErrNoRows { return nil, 0, err } - err = ReaderDb.Select(&deposits, fmt.Sprintf(` - SELECT - eth1.from_address, - SUM(eth1.amount) as amount, - SUM(eth1.validcount) AS validcount, - SUM(eth1.invalidcount) AS invalidcount, - COUNT(CASE WHEN v.slashed = 't' THEN 1 END) AS slashedcount, - COUNT(v.pubkey) AS totalcount, - COUNT(CASE WHEN v.slashed = 'f' AND v.exitepoch > $3 AND v.activationepoch < $3 THEN 1 END) as activecount, - COUNT(CASE WHEN v.activationepoch > $3 THEN 1 END) AS pendingcount, - COUNT(CASE WHEN v.slashed = 'f' AND v.exitepoch < $3 THEN 1 END) AS voluntary_exit_count - FROM ( - SELECT - from_address, - publickey, - SUM(amount) AS amount, - COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount, - COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount - FROM eth1_deposits - GROUP BY from_address, publickey - ) eth1 - LEFT JOIN ( - SELECT - pubkey, - slashed, - exitepoch, - activationepoch, - COALESCE(validator_names.name, '') AS name - FROM validators - LEFT JOIN validator_names ON validators.pubkey = validator_names.publickey - ) v ON v.pubkey = eth1.publickey - WHERE ENCODE(eth1.from_address, 'hex') LIKE LOWER($4) - GROUP BY eth1.from_address - ORDER BY %s %s - LIMIT $1 - OFFSET $2`, orderBy, orderDir), length, start, latestEpoch, query+"%") + if query != "" { + err = ReaderDb.Select(&deposits, fmt.Sprintf(` + SELECT from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count + FROM eth1_deposits_aggregated + WHERE ENCODE(from_address, 'hex') LIKE LOWER($3) + ORDER BY %s %s + LIMIT $1 + OFFSET $2`, orderBy, orderDir), length, start, query+"%") + } else { + err = ReaderDb.Select(&deposits, fmt.Sprintf(` + SELECT from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count + FROM eth1_deposits_aggregated + ORDER BY %s %s + LIMIT $1 + OFFSET $2`, orderBy, orderDir), length, start) + } if err != nil && err != sql.ErrNoRows { return nil, 0, err } diff --git a/exporter/eth1.go b/exporter/eth1.go index 38278370fb..600216567a 100644 --- a/exporter/eth1.go +++ b/exporter/eth1.go @@ -2,8 +2,10 @@ package exporter import ( "context" + "database/sql" "encoding/hex" "eth2-exporter/db" + "eth2-exporter/metrics" "eth2-exporter/types" "eth2-exporter/utils" "fmt" @@ -125,6 +127,15 @@ func eth1DepositsExporter() { continue } + if len(depositsToSave) > 0 { + err = aggregateDeposits() + logger.WithError(err).Errorf("error saving eth1-deposits-leaderboard") + if err != nil { + time.Sleep(time.Second * 5) + continue + } + } + // make sure we are progressing even if there are no deposits in the last batch lastFetchedBlock = toBlock @@ -383,3 +394,47 @@ func eth1BatchRequestHeadersAndTxs(blocksToFetch []uint64, txsToFetch []string) return headers, txs, nil } + +func aggregateDeposits() error { + start := time.Now() + defer func() { + metrics.TaskDuration.WithLabelValues("exporter_aggregate_eth1_deposits").Observe(time.Since(start).Seconds()) + }() + _, err := db.WriterDb.Exec(` + INSERT INTO eth1_deposits_aggregated (from_address, amount, validcount, invalidcount, slashedcount, totalcount, activecount, pendingcount, voluntary_exit_count) + SELECT + eth1.from_address, + SUM(eth1.amount) as amount, + SUM(eth1.validcount) AS validcount, + SUM(eth1.invalidcount) AS invalidcount, + COUNT(CASE WHEN v.status = 'slashed' THEN 1 END) AS slashedcount, + COUNT(v.pubkey) AS totalcount, + COUNT(CASE WHEN v.status = 'active' THEN 1 END) as activecount, + COUNT(CASE WHEN v.status = 'pending' THEN 1 END) AS pendingcount, + COUNT(CASE WHEN v.status = 'exited' THEN 1 END) AS voluntary_exit_count + FROM ( + SELECT + from_address, + publickey, + SUM(amount) AS amount, + COUNT(CASE WHEN valid_signature = 't' THEN 1 END) AS validcount, + COUNT(CASE WHEN valid_signature = 'f' THEN 1 END) AS invalidcount + FROM eth1_deposits + GROUP BY from_address, publickey + ) eth1 + LEFT JOIN (SELECT pubkey, status FROM validators) v ON v.pubkey = eth1.publickey + GROUP BY eth1.from_address + ON CONFLICT (from_address) DO UPDATE SET + amount = excluded.amount, + validcount = excluded.validcount, + invalidcount = excluded.invalidcount, + slashedcount = excluded.slashedcount, + totalcount = excluded.totalcount, + activecount = excluded.activecount, + pendingcount = excluded.pendingcount, + voluntary_exit_count = excluded.voluntary_exit_count`) + if err != nil && err != sql.ErrNoRows { + return nil + } + return err +} diff --git a/handlers/eth1Deposits.go b/handlers/eth1Deposits.go index 453c7305de..119f11ff31 100644 --- a/handlers/eth1Deposits.go +++ b/handlers/eth1Deposits.go @@ -32,7 +32,7 @@ func Eth1Deposits(w http.ResponseWriter, r *http.Request) { } pageData.Stats = services.GetLatestStats() - pageData.DepositContract = utils.Config.Indexer.Eth1DepositContractAddress + pageData.DepositContract = utils.Config.Chain.Config.DepositContractAddress data := InitPageData(w, r, "blockchain", "/deposits/eth1", "Initiated Deposits") data.HeaderAd = true @@ -139,7 +139,6 @@ func Eth1DepositsData(w http.ResponseWriter, r *http.Request) { // Eth1Deposits will return information about deposits using a go template func Eth1DepositsLeaderboard(w http.ResponseWriter, r *http.Request) { - var eth1DepositsLeaderboardTemplate = templates.GetTemplate("layout.html", "eth1DepositsLeaderboard.html") w.Header().Set("Content-Type", "text/html") @@ -206,9 +205,7 @@ func Eth1DepositsLeaderboardData(w http.ResponseWriter, r *http.Request) { orderDir := q.Get("order[0][dir]") - latestEpoch := services.LatestEpoch() - - deposits, depositCount, err := db.GetEth1DepositsLeaderboard(search, length, start, orderBy, orderDir, latestEpoch) + deposits, depositCount, err := db.GetEth1DepositsLeaderboard(search, length, start, orderBy, orderDir) if err != nil { logger.Errorf("GetEth1Deposits error retrieving eth1_deposit leaderboard data: %v", err) http.Error(w, "Internal server error", http.StatusServiceUnavailable) diff --git a/tables.sql b/tables.sql index 33585e5263..3265ac6c15 100644 --- a/tables.sql +++ b/tables.sql @@ -429,6 +429,21 @@ create table eth1_deposits create index idx_eth1_deposits on eth1_deposits (publickey); create index idx_eth1_deposits_from_address on eth1_deposits (from_address); +drop table if exists eth1_deposits_aggregated; +create table eth1_deposits_aggregated +( + from_address bytea not null, + amount bigint not null, + validcount int not null, + invalidcount int not null, + slashedcount int not null, + totalcount int not null, + activecount int not null, + pendingcount int not null, + voluntary_exit_count int not null, + primary key (from_address) +); + drop table if exists users; create table users (