From fcb00990aebc5eec8f6c61445285ce1eec36c42d Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 25 Jul 2024 14:56:28 +0200 Subject: [PATCH 01/19] geth update and minor cleanup --- cmd/util/util.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/util/util.go b/cmd/util/util.go index c959d37..3d90550 100644 --- a/cmd/util/util.go +++ b/cmd/util/util.go @@ -4,6 +4,8 @@ package util import ( "github.com/flashbots/relayscan/common" "github.com/spf13/cobra" + "golang.org/x/text/language" + "golang.org/x/text/message" ) var ( @@ -11,10 +13,16 @@ var ( numThreads uint64 = 10 // Printer for pretty printing numbers - // printer = message.NewPrinter(language.English) + printer = message.NewPrinter(language.English) ethNodeURI string ethNodeBackupURI string + slotStr string + blockHash string + mevGethURI string + loadAddresses bool + scLookup bool // whether to lookup smart contract details + printAllSimTx bool ) var UtilCmd = &cobra.Command{ From 3caa2bf657ef7023ec297e214fdfc7c9fb215c46 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 25 Jul 2024 15:23:00 +0200 Subject: [PATCH 02/19] use geth in check-payload-value --- cmd/util/util.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/cmd/util/util.go b/cmd/util/util.go index 3d90550..c959d37 100644 --- a/cmd/util/util.go +++ b/cmd/util/util.go @@ -4,8 +4,6 @@ package util import ( "github.com/flashbots/relayscan/common" "github.com/spf13/cobra" - "golang.org/x/text/language" - "golang.org/x/text/message" ) var ( @@ -13,16 +11,10 @@ var ( numThreads uint64 = 10 // Printer for pretty printing numbers - printer = message.NewPrinter(language.English) + // printer = message.NewPrinter(language.English) ethNodeURI string ethNodeBackupURI string - slotStr string - blockHash string - mevGethURI string - loadAddresses bool - scLookup bool // whether to lookup smart contract details - printAllSimTx bool ) var UtilCmd = &cobra.Command{ From 8f00c2a91d79f9e6c04e7ef9f4fae79c3fd07411 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 25 Jul 2024 15:51:16 +0200 Subject: [PATCH 03/19] cleanup --- README.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 2e532ee..3f9cea7 100644 --- a/README.md +++ b/README.md @@ -94,13 +94,10 @@ Start by filling the DB with relay data (delivered payloads), and checking it: source .env.local # Start Postgres -docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres +docker run -d -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres -# Query only a single relay, and for the shortest time possible -go run . core data-api-backfill --relay fb --min-slot -1 - -# Now the DB has data, check it (for only a single slot, the latest one, see logs for "latest received payload at slot N" in the backfill command) -go run . core check-payload-value --slot _N_ +# Query relays for data until a given earliest slot +go run . core data-api-backfill --relay fb --min-slot 9590900 ``` For linting and testing: From 95a7c14ba1240baa36b14b327b73bd09cb4367a2 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 25 Jul 2024 17:55:56 +0200 Subject: [PATCH 04/19] use geth instead of FlashbotsRPC --- README.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3f9cea7..2e532ee 100644 --- a/README.md +++ b/README.md @@ -94,10 +94,13 @@ Start by filling the DB with relay data (delivered payloads), and checking it: source .env.local # Start Postgres -docker run -d -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres +docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres -# Query relays for data until a given earliest slot -go run . core data-api-backfill --relay fb --min-slot 9590900 +# Query only a single relay, and for the shortest time possible +go run . core data-api-backfill --relay fb --min-slot -1 + +# Now the DB has data, check it (for only a single slot, the latest one, see logs for "latest received payload at slot N" in the backfill command) +go run . core check-payload-value --slot _N_ ``` For linting and testing: From ec73345ddbb431eb6ef79b7aa938914d0cd93570 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 25 Jul 2024 18:11:34 +0200 Subject: [PATCH 05/19] initial migrations --- cmd/core/check-payload-value.go | 8 +- cmd/util/update-extradata.go | 5 +- database/database.go | 47 ++++--- database/migrations/001_init_database.go | 163 ++++++++++++++++++++++ database/migrations/migration.go | 12 ++ database/schema.go | 165 ----------------------- database/vars/tables.go | 18 +++ go.mod | 5 +- go.sum | 14 +- 9 files changed, 238 insertions(+), 199 deletions(-) create mode 100644 database/migrations/001_init_database.go create mode 100644 database/migrations/migration.go delete mode 100644 database/schema.go create mode 100644 database/vars/tables.go diff --git a/cmd/core/check-payload-value.go b/cmd/core/check-payload-value.go index e948543..3f0a6ff 100644 --- a/cmd/core/check-payload-value.go +++ b/cmd/core/check-payload-value.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/flashbots/relayscan/common" "github.com/flashbots/relayscan/database" + dbvars "github.com/flashbots/relayscan/database/vars" "github.com/flashbots/relayscan/vars" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -71,7 +72,7 @@ var checkPayloadValueCmd = &cobra.Command{ // log.Infof("beacon node connected. headslot: %d", headSlot) entries := []database.DataAPIPayloadDeliveredEntry{} - query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + database.TableDataAPIPayloadDelivered + query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + dbvars.TableDataAPIPayloadDelivered if checkIncorrectOnly { query += ` WHERE value_check_ok=false ORDER BY slot DESC` if limit > 0 { @@ -184,7 +185,7 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, } saveEntry := func(_log *logrus.Entry, entry database.DataAPIPayloadDeliveredEntry) { - query := `UPDATE ` + database.TableDataAPIPayloadDelivered + ` SET + query := `UPDATE ` + dbvars.TableDataAPIPayloadDelivered + ` SET block_number=:block_number, extra_data=:extra_data, slot_missed=:slot_missed, @@ -259,6 +260,9 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, entryBlockHash := ethcommon.HexToHash(entry.BlockHash) // query block by number to ensure that's what landed on-chain + // + // TODO: This reports "slot is missed" when actually an EL block with that number is there, but the hash is different. + // Should refactor this to instead say elBlockHashMismatch (and save both hashes) blockByNum, err := getHeaderByNumber(block.Number()) if err != nil { _log.WithError(err).Fatalf("couldn't get block by number %d", block.NumberU64()) diff --git a/cmd/util/update-extradata.go b/cmd/util/update-extradata.go index 3b94fd2..903aaeb 100644 --- a/cmd/util/update-extradata.go +++ b/cmd/util/update-extradata.go @@ -5,6 +5,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/flashbots/relayscan/database" + dbvars "github.com/flashbots/relayscan/database/vars" "github.com/flashbots/relayscan/vars" "github.com/metachris/flashbotsrpc" "github.com/sirupsen/logrus" @@ -38,7 +39,7 @@ var backfillExtradataCmd = &cobra.Command{ db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) entries := []database.DataAPIPayloadDeliveredEntry{} - query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + database.TableDataAPIPayloadDelivered + ` WHERE slot < 4823872 AND extra_data = ''` + query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + dbvars.TableDataAPIPayloadDelivered + ` WHERE slot < 4823872 AND extra_data = ''` // query += ` LIMIT 1000` err = db.DB.Select(&entries, query) if err != nil { @@ -109,7 +110,7 @@ func startBackfillWorker(wg *sync.WaitGroup, db *database.DatabaseService, clien continue } - query := `UPDATE ` + database.TableDataAPIPayloadDelivered + ` SET extra_data=$1 WHERE id=$2` + query := `UPDATE ` + dbvars.TableDataAPIPayloadDelivered + ` SET extra_data=$1 WHERE id=$2` _, err := db.DB.Exec(query, entry.ExtraData, entry.ID) if err != nil { _log.WithError(err).Fatalf("failed to save entry") diff --git a/database/database.go b/database/database.go index c2963ff..1948df9 100644 --- a/database/database.go +++ b/database/database.go @@ -2,12 +2,14 @@ package database import ( - "fmt" "os" "time" + "github.com/flashbots/relayscan/database/migrations" + "github.com/flashbots/relayscan/database/vars" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" + migrate "github.com/rubenv/sql-migrate" ) type DatabaseService struct { @@ -24,12 +26,9 @@ func NewDatabaseService(dsn string) (*DatabaseService, error) { db.DB.SetMaxIdleConns(10) db.DB.SetConnMaxIdleTime(0) - if os.Getenv("PRINT_SCHEMA") == "1" { - fmt.Println(schema) - } - if os.Getenv("DB_DONT_APPLY_SCHEMA") == "" { - _, err = db.Exec(schema) + migrate.SetTable(vars.TableMigrations) + _, err := migrate.Exec(db.DB, "postgres", migrations.Migrations, migrate.Up) if err != nil { return nil, err } @@ -45,7 +44,7 @@ func (s *DatabaseService) Close() error { } func (s *DatabaseService) SaveSignedBuilderBid(entry SignedBuilderBidEntry) error { - query := `INSERT INTO ` + TableSignedBuilderBid + ` + query := `INSERT INTO ` + vars.TableSignedBuilderBid + ` (relay, requested_at, received_at, duration_ms, slot, parent_hash, proposer_pubkey, pubkey, signature, value, fee_recipient, block_hash, block_number, gas_limit, gas_used, extra_data, epoch, timestamp, prev_randao) VALUES (:relay, :requested_at, :received_at, :duration_ms, :slot, :parent_hash, :proposer_pubkey, :pubkey, :signature, :value, :fee_recipient, :block_hash, :block_number, :gas_limit, :gas_used, :extra_data, :epoch, :timestamp, :prev_randao) ON CONFLICT DO NOTHING` @@ -54,13 +53,13 @@ func (s *DatabaseService) SaveSignedBuilderBid(entry SignedBuilderBidEntry) erro } func (s *DatabaseService) SaveBuilder(entry *BlockBuilderEntry) error { - query := `INSERT INTO ` + TableBlockBuilder + ` (builder_pubkey, description) VALUES (:builder_pubkey, :description) ON CONFLICT DO NOTHING` + query := `INSERT INTO ` + vars.TableBlockBuilder + ` (builder_pubkey, description) VALUES (:builder_pubkey, :description) ON CONFLICT DO NOTHING` _, err := s.DB.NamedExec(query, entry) return err } func (s *DatabaseService) SaveDataAPIPayloadDelivered(entry *DataAPIPayloadDeliveredEntry) error { - query := `INSERT INTO ` + TableDataAPIPayloadDelivered + ` + query := `INSERT INTO ` + vars.TableDataAPIPayloadDelivered + ` (relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number, extra_data) VALUES (:relay, :epoch, :slot, :parent_hash, :block_hash, :builder_pubkey, :proposer_pubkey, :proposer_fee_recipient, :gas_limit, :gas_used, :value_claimed_wei, :value_claimed_eth, :num_tx, :block_number, :extra_data) ON CONFLICT DO NOTHING` @@ -73,7 +72,7 @@ func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPay return nil } - query := `INSERT INTO ` + TableDataAPIPayloadDelivered + ` + query := `INSERT INTO ` + vars.TableDataAPIPayloadDelivered + ` (relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number, extra_data) VALUES (:relay, :epoch, :slot, :parent_hash, :block_hash, :builder_pubkey, :proposer_pubkey, :proposer_fee_recipient, :gas_limit, :gas_used, :value_claimed_wei, :value_claimed_eth, :num_tx, :block_number, :extra_data) ON CONFLICT DO NOTHING` @@ -95,13 +94,13 @@ func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPay func (s *DatabaseService) GetDataAPILatestPayloadDelivered(relay string) (*DataAPIPayloadDeliveredEntry, error) { entry := new(DataAPIPayloadDeliveredEntry) - query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number, extra_data, slot_missed, value_check_ok, value_check_method, value_delivered_wei, value_delivered_eth, value_delivered_diff_wei, value_delivered_diff_eth, block_coinbase_addr, block_coinbase_is_proposer, coinbase_diff_wei, coinbase_diff_eth, found_onchain, notes FROM ` + TableDataAPIPayloadDelivered + ` WHERE relay=$1 ORDER BY slot DESC LIMIT 1` + query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number, extra_data, slot_missed, value_check_ok, value_check_method, value_delivered_wei, value_delivered_eth, value_delivered_diff_wei, value_delivered_diff_eth, block_coinbase_addr, block_coinbase_is_proposer, coinbase_diff_wei, coinbase_diff_eth, found_onchain, notes FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE relay=$1 ORDER BY slot DESC LIMIT 1` err := s.DB.Get(entry, query, relay) return entry, err } func (s *DatabaseService) SaveDataAPIBid(entry *DataAPIBuilderBidEntry) error { - query := `INSERT INTO ` + TableDataAPIBuilderBid + ` + query := `INSERT INTO ` + vars.TableDataAPIBuilderBid + ` (relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value, num_tx, block_number, timestamp) VALUES (:relay, :epoch, :slot, :parent_hash, :block_hash, :builder_pubkey, :proposer_pubkey, :proposer_fee_recipient, :gas_limit, :gas_used, :value, :num_tx, :block_number, :timestamp) ON CONFLICT DO NOTHING` @@ -113,7 +112,7 @@ func (s *DatabaseService) SaveDataAPIBids(entries []*DataAPIBuilderBidEntry) err if len(entries) == 0 { return nil } - query := `INSERT INTO ` + TableDataAPIBuilderBid + ` + query := `INSERT INTO ` + vars.TableDataAPIBuilderBid + ` (relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value, num_tx, block_number, timestamp) VALUES (:relay, :epoch, :slot, :parent_hash, :block_hash, :builder_pubkey, :proposer_pubkey, :proposer_fee_recipient, :gas_limit, :gas_used, :value, :num_tx, :block_number, :timestamp) ON CONFLICT DO NOTHING` @@ -123,7 +122,7 @@ func (s *DatabaseService) SaveDataAPIBids(entries []*DataAPIBuilderBidEntry) err func (s *DatabaseService) GetDataAPILatestBid(relay string) (*DataAPIBuilderBidEntry, error) { entry := new(DataAPIBuilderBidEntry) - query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value, num_tx, block_number, timestamp FROM ` + TableDataAPIBuilderBid + ` WHERE relay=$1 ORDER BY slot DESC, timestamp DESC LIMIT 1` + query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value, num_tx, block_number, timestamp FROM ` + vars.TableDataAPIBuilderBid + ` WHERE relay=$1 ORDER BY slot DESC, timestamp DESC LIMIT 1` err := s.DB.Get(entry, query, relay) return entry, err } @@ -132,8 +131,8 @@ func (s *DatabaseService) GetTopRelays(since, until time.Time) (res []*TopRelayE startSlot := timeToSlot(since) endSlot := timeToSlot(until) - // query := `SELECT relay, count(relay) as payloads FROM ` + TableDataAPIPayloadDelivered + ` WHERE inserted_at > $1 AND inserted_at < $2 GROUP BY relay ORDER BY payloads DESC;` - query := `SELECT relay, count(relay) as payloads FROM ` + TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2 GROUP BY relay ORDER BY payloads DESC;` + // query := `SELECT relay, count(relay) as payloads FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE inserted_at > $1 AND inserted_at < $2 GROUP BY relay ORDER BY payloads DESC;` + query := `SELECT relay, count(relay) as payloads FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2 GROUP BY relay ORDER BY payloads DESC;` err = s.DB.Select(&res, query, startSlot, endSlot) return res, err } @@ -143,7 +142,7 @@ func (s *DatabaseService) GetTopBuilders(since, until time.Time, relay string) ( endSlot := timeToSlot(until) query := `SELECT extra_data, count(extra_data) as blocks FROM ( - SELECT distinct(slot), extra_data FROM ` + TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2` + SELECT distinct(slot), extra_data FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2` if relay != "" { query += ` AND relay = '` + relay + `'` } @@ -167,7 +166,7 @@ func (s *DatabaseService) GetBuilderProfits(since, until time.Time) (res []*Buil round(sum(CASE WHEN coinbase_diff_eth IS NOT NULL THEN coinbase_diff_eth ELSE 0 END), 4) as total_profit, round(abs(sum(CASE WHEN coinbase_diff_eth < 0 THEN coinbase_diff_eth ELSE 0 END)), 4) as total_subsidies FROM ( - SELECT distinct(slot), extra_data, coinbase_diff_eth FROM ` + TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2 + SELECT distinct(slot), extra_data, coinbase_diff_eth FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2 ) AS x GROUP BY extra_data ORDER BY total_profit DESC;` @@ -195,7 +194,7 @@ func (s *DatabaseService) GetStatsForTimerange(since, until time.Time, relay str func (s *DatabaseService) GetDeliveredPayloadsForSlot(slot uint64) (res []*DataAPIPayloadDeliveredEntry, err error) { query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number - FROM ` + TableDataAPIPayloadDelivered + ` WHERE slot=$1;` + FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE slot=$1;` err = s.DB.Select(&res, query, slot) return res, err } @@ -203,7 +202,7 @@ func (s *DatabaseService) GetDeliveredPayloadsForSlot(slot uint64) (res []*DataA func (s *DatabaseService) GetLatestDeliveredPayload() (*DataAPIPayloadDeliveredEntry, error) { query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number - FROM ` + TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL ORDER BY id DESC LIMIT 1;` + FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL ORDER BY id DESC LIMIT 1;` entry := new(DataAPIPayloadDeliveredEntry) err := s.DB.Get(entry, query) return entry, err @@ -212,7 +211,7 @@ func (s *DatabaseService) GetLatestDeliveredPayload() (*DataAPIPayloadDeliveredE func (s *DatabaseService) GetDeliveredPayloadsForSlots(slotStart, slotEnd uint64) (res []*DataAPIPayloadDeliveredEntry, err error) { query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number, extra_data - FROM ` + TableDataAPIPayloadDelivered + ` WHERE slot>=$1 AND slot<=$2 ORDER BY slot ASC;` + FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE slot>=$1 AND slot<=$2 ORDER BY slot ASC;` err = s.DB.Select(&res, query, slotStart, slotEnd) return res, err } @@ -220,7 +219,7 @@ func (s *DatabaseService) GetDeliveredPayloadsForSlots(slotStart, slotEnd uint64 func (s *DatabaseService) GetSignedBuilderBidsForSlot(slot uint64) (res []*SignedBuilderBidEntry, err error) { query := `SELECT id, relay, requested_at, received_at, duration_ms, slot, parent_hash, proposer_pubkey, pubkey, signature, value, fee_recipient, block_hash, block_number, gas_limit, gas_used, extra_data, epoch, timestamp, prev_randao - FROM ` + TableSignedBuilderBid + ` WHERE slot=$1;` + FROM ` + vars.TableSignedBuilderBid + ` WHERE slot=$1;` err = s.DB.Select(&res, query, slot) return res, err } @@ -229,7 +228,7 @@ func (s *DatabaseService) SaveBuilderStats(entries []*BuilderStatsEntry) error { if len(entries) == 0 { return nil } - query := `INSERT INTO ` + TableBlockBuilderInclusionStats + ` + query := `INSERT INTO ` + vars.TableBlockBuilderInclusionStats + ` (type, hours, time_start, time_end, builder_name, extra_data, builder_pubkeys, blocks_included) VALUES (:type, :hours, :time_start, :time_end, :builder_name, :extra_data, :builder_pubkeys, :blocks_included) ON CONFLICT (type, hours, time_start, time_end, builder_name) DO UPDATE SET @@ -241,7 +240,7 @@ func (s *DatabaseService) SaveBuilderStats(entries []*BuilderStatsEntry) error { } func (s *DatabaseService) GetLastDailyBuilderStatsEntry(filterType string) (*BuilderStatsEntry, error) { - query := `SELECT type, hours, time_start, time_end, builder_name, extra_data, builder_pubkeys, blocks_included FROM ` + TableBlockBuilderInclusionStats + ` WHERE hours=24 AND type=$1 ORDER BY time_end DESC LIMIT 1;` + query := `SELECT type, hours, time_start, time_end, builder_name, extra_data, builder_pubkeys, blocks_included FROM ` + vars.TableBlockBuilderInclusionStats + ` WHERE hours=24 AND type=$1 ORDER BY time_end DESC LIMIT 1;` entry := new(BuilderStatsEntry) err := s.DB.Get(entry, query, filterType) return entry, err diff --git a/database/migrations/001_init_database.go b/database/migrations/001_init_database.go new file mode 100644 index 0000000..596478c --- /dev/null +++ b/database/migrations/001_init_database.go @@ -0,0 +1,163 @@ +package migrations + +import ( + "github.com/flashbots/relayscan/database/vars" + migrate "github.com/rubenv/sql-migrate" +) + +var initialSchema = ` +CREATE TABLE IF NOT EXISTS ` + vars.TableSignedBuilderBid + ` ( + id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + inserted_at timestamp NOT NULL default current_timestamp, + + relay text NOT NULL, + requested_at timestamp NOT NULL, + received_at timestamp NOT NULL, + duration_ms bigint NOT NULL, + + slot bigint NOT NULL, + parent_hash varchar(66) NOT NULL, + proposer_pubkey varchar(98) NOT NULL, + + pubkey varchar(98) NOT NULL, + signature text NOT NULL, + + value NUMERIC(48, 0) NOT NULL, + fee_recipient varchar(42) NOT NULL, + block_hash varchar(66) NOT NULL, + block_number bigint NOT NULL, + gas_limit bigint NOT NULL, + gas_used bigint NOT NULL, + extra_data text NOT NULL, + timestamp bigint NOT NULL, + prev_randao text NOT NULL, + + epoch bigint NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS ` + vars.TableSignedBuilderBid + `_u_relay_slot_n_hashes_idx ON ` + vars.TableSignedBuilderBid + `("relay", "slot", "parent_hash", "block_hash"); +CREATE INDEX IF NOT EXISTS ` + vars.TableSignedBuilderBid + `_insertedat_idx ON ` + vars.TableSignedBuilderBid + `("inserted_at"); +CREATE INDEX IF NOT EXISTS ` + vars.TableSignedBuilderBid + `_slot_idx ON ` + vars.TableSignedBuilderBid + `("slot"); +CREATE INDEX IF NOT EXISTS ` + vars.TableSignedBuilderBid + `_block_number_idx ON ` + vars.TableSignedBuilderBid + `("block_number"); +CREATE INDEX IF NOT EXISTS ` + vars.TableSignedBuilderBid + `_value_idx ON ` + vars.TableSignedBuilderBid + `("value"); + + +CREATE TABLE IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + ` ( + id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + inserted_at timestamp NOT NULL default current_timestamp, + relay text NOT NULL, + + epoch bigint NOT NULL, + slot bigint NOT NULL, + + parent_hash varchar(66) NOT NULL, + block_hash varchar(66) NOT NULL, + builder_pubkey varchar(98) NOT NULL, + proposer_pubkey varchar(98) NOT NULL, + proposer_fee_recipient varchar(42) NOT NULL, + gas_limit bigint NOT NULL, + gas_used bigint NOT NULL, + value_claimed_wei NUMERIC(48, 0) NOT NULL, + value_claimed_eth NUMERIC(16, 8) NOT NULL, + num_tx int, + block_number bigint, + extra_data text NOT NULL, + + slot_missed boolean, -- null means not yet checked + value_check_ok boolean, -- null means not yet checked + value_check_method text, -- how value was checked (i.e. blockBalanceDiff) + value_delivered_wei NUMERIC(48, 0), -- actually delivered value + value_delivered_eth NUMERIC(16, 8), -- actually delivered value + value_delivered_diff_wei NUMERIC(48, 0), -- value_delivered - value_claimed + value_delivered_diff_eth NUMERIC(16, 8), -- value_delivered - value_claimed + block_coinbase_addr varchar(42), -- block coinbase address + block_coinbase_is_proposer boolean, -- true if coinbase == proposerFeeRecipient + coinbase_diff_wei NUMERIC(48, 0), -- builder value difference + coinbase_diff_eth NUMERIC(16, 8), -- builder value difference + found_onchain boolean, -- whether the payload blockhash can be found on chain (at all) + notes text +); + +CREATE UNIQUE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_u_relay_slot_blockhash_idx ON ` + vars.TableDataAPIPayloadDelivered + `("relay", "slot", "parent_hash", "block_hash"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_insertedat_idx ON ` + vars.TableDataAPIPayloadDelivered + `("inserted_at"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_slot_idx ON ` + vars.TableDataAPIPayloadDelivered + `("slot"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_builder_pubkey_idx ON ` + vars.TableDataAPIPayloadDelivered + `("builder_pubkey"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_block_number_idx ON ` + vars.TableDataAPIPayloadDelivered + `("block_number"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_value_wei_idx ON ` + vars.TableDataAPIPayloadDelivered + `("value_claimed_wei"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_valuecheck_ok_idx ON ` + vars.TableDataAPIPayloadDelivered + `("value_check_ok"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_slotmissed_idx ON ` + vars.TableDataAPIPayloadDelivered + `("slot_missed"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_cb_diff_eth_idx ON ` + vars.TableDataAPIPayloadDelivered + `("coinbase_diff_eth"); +-- CREATE INDEX CONCURRENTLY IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_insertedat_relay_idx ON ` + vars.TableDataAPIPayloadDelivered + `("inserted_at", "relay"); + + +CREATE TABLE IF NOT EXISTS ` + vars.TableDataAPIBuilderBid + ` ( + id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + inserted_at timestamp NOT NULL default current_timestamp, + relay text NOT NULL, + + epoch bigint NOT NULL, + slot bigint NOT NULL, + + parent_hash varchar(66) NOT NULL, + block_hash varchar(66) NOT NULL, + builder_pubkey varchar(98) NOT NULL, + proposer_pubkey varchar(98) NOT NULL, + proposer_fee_recipient varchar(42) NOT NULL, + gas_limit bigint NOT NULL, + gas_used bigint NOT NULL, + value NUMERIC(48, 0) NOT NULL, + num_tx int, + block_number bigint, + timestamp timestamp NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS ` + vars.TableDataAPIBuilderBid + `_unique_idx ON ` + vars.TableDataAPIBuilderBid + `("relay", "slot", "builder_pubkey", "parent_hash", "block_hash"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIBuilderBid + `_insertedat_idx ON ` + vars.TableDataAPIBuilderBid + `("inserted_at"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIBuilderBid + `_slot_idx ON ` + vars.TableDataAPIBuilderBid + `("slot"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIBuilderBid + `_builder_pubkey_idx ON ` + vars.TableDataAPIBuilderBid + `("builder_pubkey"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIBuilderBid + `_block_number_idx ON ` + vars.TableDataAPIBuilderBid + `("block_number"); +CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIBuilderBid + `_value_idx ON ` + vars.TableDataAPIBuilderBid + `("value"); + + +CREATE TABLE IF NOT EXISTS ` + vars.TableBlockBuilder + ` ( + id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + inserted_at timestamp NOT NULL default current_timestamp, + + builder_pubkey varchar(98) NOT NULL, + description text NOT NULL, + + UNIQUE (builder_pubkey) +); + +CREATE TABLE IF NOT EXISTS ` + vars.TableBlockBuilderInclusionStats + ` ( + id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + inserted_at timestamp NOT NULL default current_timestamp, + + type text NOT NULL, -- "extra_data" or "builder_pubkey" + hours int NOT NULL, -- the amount of hours aggregated over (i.e. 24 for daily) + + time_start timestamp NOT NULL, + time_end timestamp NOT NULL, + builder_name text NOT NULL, + + extra_data text NOT NULL, + builder_pubkeys text NOT NULL, + blocks_included int NOT NULL, + + UNIQUE (type, hours, time_start, time_end, builder_name) +); + +CREATE INDEX IF NOT EXISTS ` + vars.TableBlockBuilderInclusionStats + `_type_hours_idx ON ` + vars.TableBlockBuilderInclusionStats + `("type", "hours"); +CREATE INDEX IF NOT EXISTS ` + vars.TableBlockBuilderInclusionStats + `_time_start_idx ON ` + vars.TableBlockBuilderInclusionStats + `("time_start"); +CREATE INDEX IF NOT EXISTS ` + vars.TableBlockBuilderInclusionStats + `_time_end_idx ON ` + vars.TableBlockBuilderInclusionStats + `("time_end"); +CREATE INDEX IF NOT EXISTS ` + vars.TableBlockBuilderInclusionStats + `_builder_name_idx ON ` + vars.TableBlockBuilderInclusionStats + `("builder_name"); +CREATE INDEX IF NOT EXISTS ` + vars.TableBlockBuilderInclusionStats + `_extra_data_idx ON ` + vars.TableBlockBuilderInclusionStats + `("extra_data"); +` + +var Migration001InitDatabase = &migrate.Migration{ + Id: "001-init-database", + Up: []string{initialSchema}, + + DisableTransactionUp: false, + DisableTransactionDown: true, +} diff --git a/database/migrations/migration.go b/database/migrations/migration.go new file mode 100644 index 0000000..a990cca --- /dev/null +++ b/database/migrations/migration.go @@ -0,0 +1,12 @@ +// Package migrations contains all the migration files +package migrations + +import ( + migrate "github.com/rubenv/sql-migrate" +) + +var Migrations = migrate.MemoryMigrationSource{ + Migrations: []*migrate.Migration{ + Migration001InitDatabase, + }, +} diff --git a/database/schema.go b/database/schema.go deleted file mode 100644 index be0e6bd..0000000 --- a/database/schema.go +++ /dev/null @@ -1,165 +0,0 @@ -package database - -import ( - "github.com/flashbots/mev-boost-relay/common" -) - -var ( - tableBase = common.GetEnv("DB_TABLE_PREFIX", "rsdev") - - TableSignedBuilderBid = tableBase + "_signed_builder_bid" - TableDataAPIPayloadDelivered = tableBase + "_data_api_payload_delivered" - TableDataAPIBuilderBid = tableBase + "_data_api_builder_bid" - TableError = tableBase + "_error" - TableBlockBuilder = tableBase + "_blockbuilder" - TableBlockBuilderInclusionStats = tableBase + "_blockbuilder_stats_inclusion" -) - -var schema = ` -CREATE TABLE IF NOT EXISTS ` + TableSignedBuilderBid + ` ( - id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - inserted_at timestamp NOT NULL default current_timestamp, - - relay text NOT NULL, - requested_at timestamp NOT NULL, - received_at timestamp NOT NULL, - duration_ms bigint NOT NULL, - - slot bigint NOT NULL, - parent_hash varchar(66) NOT NULL, - proposer_pubkey varchar(98) NOT NULL, - - pubkey varchar(98) NOT NULL, - signature text NOT NULL, - - value NUMERIC(48, 0) NOT NULL, - fee_recipient varchar(42) NOT NULL, - block_hash varchar(66) NOT NULL, - block_number bigint NOT NULL, - gas_limit bigint NOT NULL, - gas_used bigint NOT NULL, - extra_data text NOT NULL, - timestamp bigint NOT NULL, - prev_randao text NOT NULL, - - epoch bigint NOT NULL -); - -CREATE UNIQUE INDEX IF NOT EXISTS ` + TableSignedBuilderBid + `_u_relay_slot_n_hashes_idx ON ` + TableSignedBuilderBid + `("relay", "slot", "parent_hash", "block_hash"); -CREATE INDEX IF NOT EXISTS ` + TableSignedBuilderBid + `_insertedat_idx ON ` + TableSignedBuilderBid + `("inserted_at"); -CREATE INDEX IF NOT EXISTS ` + TableSignedBuilderBid + `_slot_idx ON ` + TableSignedBuilderBid + `("slot"); -CREATE INDEX IF NOT EXISTS ` + TableSignedBuilderBid + `_block_number_idx ON ` + TableSignedBuilderBid + `("block_number"); -CREATE INDEX IF NOT EXISTS ` + TableSignedBuilderBid + `_value_idx ON ` + TableSignedBuilderBid + `("value"); - - -CREATE TABLE IF NOT EXISTS ` + TableDataAPIPayloadDelivered + ` ( - id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - inserted_at timestamp NOT NULL default current_timestamp, - relay text NOT NULL, - - epoch bigint NOT NULL, - slot bigint NOT NULL, - - parent_hash varchar(66) NOT NULL, - block_hash varchar(66) NOT NULL, - builder_pubkey varchar(98) NOT NULL, - proposer_pubkey varchar(98) NOT NULL, - proposer_fee_recipient varchar(42) NOT NULL, - gas_limit bigint NOT NULL, - gas_used bigint NOT NULL, - value_claimed_wei NUMERIC(48, 0) NOT NULL, - value_claimed_eth NUMERIC(16, 8) NOT NULL, - num_tx int, - block_number bigint, - extra_data text NOT NULL, - - slot_missed boolean, -- null means not yet checked - value_check_ok boolean, -- null means not yet checked - value_check_method text, -- how value was checked (i.e. blockBalanceDiff) - value_delivered_wei NUMERIC(48, 0), -- actually delivered value - value_delivered_eth NUMERIC(16, 8), -- actually delivered value - value_delivered_diff_wei NUMERIC(48, 0), -- value_delivered - value_claimed - value_delivered_diff_eth NUMERIC(16, 8), -- value_delivered - value_claimed - block_coinbase_addr varchar(42), -- block coinbase address - block_coinbase_is_proposer boolean, -- true if coinbase == proposerFeeRecipient - coinbase_diff_wei NUMERIC(48, 0), -- builder value difference - coinbase_diff_eth NUMERIC(16, 8), -- builder value difference - found_onchain boolean, -- whether the payload blockhash can be found on chain (at all) - notes text -); - -CREATE UNIQUE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_u_relay_slot_blockhash_idx ON ` + TableDataAPIPayloadDelivered + `("relay", "slot", "parent_hash", "block_hash"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_insertedat_idx ON ` + TableDataAPIPayloadDelivered + `("inserted_at"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_slot_idx ON ` + TableDataAPIPayloadDelivered + `("slot"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_builder_pubkey_idx ON ` + TableDataAPIPayloadDelivered + `("builder_pubkey"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_block_number_idx ON ` + TableDataAPIPayloadDelivered + `("block_number"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_value_wei_idx ON ` + TableDataAPIPayloadDelivered + `("value_claimed_wei"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_valuecheck_ok_idx ON ` + TableDataAPIPayloadDelivered + `("value_check_ok"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_slotmissed_idx ON ` + TableDataAPIPayloadDelivered + `("slot_missed"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_cb_diff_eth_idx ON ` + TableDataAPIPayloadDelivered + `("coinbase_diff_eth"); --- CREATE INDEX CONCURRENTLY IF NOT EXISTS ` + TableDataAPIPayloadDelivered + `_insertedat_relay_idx ON ` + TableDataAPIPayloadDelivered + `("inserted_at", "relay"); - - -CREATE TABLE IF NOT EXISTS ` + TableDataAPIBuilderBid + ` ( - id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - inserted_at timestamp NOT NULL default current_timestamp, - relay text NOT NULL, - - epoch bigint NOT NULL, - slot bigint NOT NULL, - - parent_hash varchar(66) NOT NULL, - block_hash varchar(66) NOT NULL, - builder_pubkey varchar(98) NOT NULL, - proposer_pubkey varchar(98) NOT NULL, - proposer_fee_recipient varchar(42) NOT NULL, - gas_limit bigint NOT NULL, - gas_used bigint NOT NULL, - value NUMERIC(48, 0) NOT NULL, - num_tx int, - block_number bigint, - timestamp timestamp NOT NULL -); - -CREATE UNIQUE INDEX IF NOT EXISTS ` + TableDataAPIBuilderBid + `_unique_idx ON ` + TableDataAPIBuilderBid + `("relay", "slot", "builder_pubkey", "parent_hash", "block_hash"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIBuilderBid + `_insertedat_idx ON ` + TableDataAPIBuilderBid + `("inserted_at"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIBuilderBid + `_slot_idx ON ` + TableDataAPIBuilderBid + `("slot"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIBuilderBid + `_builder_pubkey_idx ON ` + TableDataAPIBuilderBid + `("builder_pubkey"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIBuilderBid + `_block_number_idx ON ` + TableDataAPIBuilderBid + `("block_number"); -CREATE INDEX IF NOT EXISTS ` + TableDataAPIBuilderBid + `_value_idx ON ` + TableDataAPIBuilderBid + `("value"); - - -CREATE TABLE IF NOT EXISTS ` + TableBlockBuilder + ` ( - id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - inserted_at timestamp NOT NULL default current_timestamp, - - builder_pubkey varchar(98) NOT NULL, - description text NOT NULL, - - UNIQUE (builder_pubkey) -); - -CREATE TABLE IF NOT EXISTS ` + TableBlockBuilderInclusionStats + ` ( - id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - inserted_at timestamp NOT NULL default current_timestamp, - - type text NOT NULL, -- "extra_data" or "builder_pubkey" - hours int NOT NULL, -- the amount of hours aggregated over (i.e. 24 for daily) - - time_start timestamp NOT NULL, - time_end timestamp NOT NULL, - builder_name text NOT NULL, - - extra_data text NOT NULL, - builder_pubkeys text NOT NULL, - blocks_included int NOT NULL, - - UNIQUE (type, hours, time_start, time_end, builder_name) -); - -CREATE INDEX IF NOT EXISTS ` + TableBlockBuilderInclusionStats + `_type_hours_idx ON ` + TableBlockBuilderInclusionStats + `("type", "hours"); -CREATE INDEX IF NOT EXISTS ` + TableBlockBuilderInclusionStats + `_time_start_idx ON ` + TableBlockBuilderInclusionStats + `("time_start"); -CREATE INDEX IF NOT EXISTS ` + TableBlockBuilderInclusionStats + `_time_end_idx ON ` + TableBlockBuilderInclusionStats + `("time_end"); -CREATE INDEX IF NOT EXISTS ` + TableBlockBuilderInclusionStats + `_builder_name_idx ON ` + TableBlockBuilderInclusionStats + `("builder_name"); -CREATE INDEX IF NOT EXISTS ` + TableBlockBuilderInclusionStats + `_extra_data_idx ON ` + TableBlockBuilderInclusionStats + `("extra_data"); -` diff --git a/database/vars/tables.go b/database/vars/tables.go new file mode 100644 index 0000000..f0b419c --- /dev/null +++ b/database/vars/tables.go @@ -0,0 +1,18 @@ +// Package vars contains the database variables such as dynamic table names +package vars + +import ( + relaycommon "github.com/flashbots/mev-boost-relay/common" +) + +var ( + tableBase = relaycommon.GetEnv("DB_TABLE_PREFIX", "rsdev") + + TableMigrations = tableBase + "_migrations" + TableSignedBuilderBid = tableBase + "_signed_builder_bid" + TableDataAPIPayloadDelivered = tableBase + "_data_api_payload_delivered" + TableDataAPIBuilderBid = tableBase + "_data_api_builder_bid" + TableError = tableBase + "_error" + TableBlockBuilder = tableBase + "_blockbuilder" + TableBlockBuilderInclusionStats = tableBase + "_blockbuilder_stats_inclusion" +) diff --git a/go.mod b/go.mod index e777d3f..7c1a07a 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/lithammer/shortuuid v3.0.0+incompatible github.com/metachris/flashbotsrpc v0.5.0 github.com/olekukonko/tablewriter v0.0.5 + github.com/rubenv/sql-migrate v1.7.0 github.com/sirupsen/logrus v1.9.2 github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 @@ -51,6 +52,7 @@ require ( github.com/fatih/color v1.15.0 // indirect github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 // indirect github.com/getsentry/sentry-go v0.21.0 // indirect + github.com/go-gorp/gorp/v3 v3.1.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/goccy/go-yaml v1.11.0 // indirect github.com/gofrs/flock v0.8.1 // indirect @@ -68,7 +70,6 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.18 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect - github.com/mattn/go-sqlite3 v1.14.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -103,7 +104,7 @@ require ( golang.org/x/sys v0.20.0 // indirect golang.org/x/tools v0.15.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/protobuf v1.30.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 6217d60..7406bf6 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,8 @@ github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclK github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-gorp/gorp/v3 v3.1.0 h1:ItKF/Vbuj31dmV4jxA1qblpSwkl9g1typ24xoe70IGs= +github.com/go-gorp/gorp/v3 v3.1.0/go.mod h1:dLEjIyyRNiXvNZ8PSmzpt1GsWAUK8kjVhEpjH8TixEw= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= @@ -281,8 +283,8 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= -github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI= +github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -334,6 +336,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/poy/onpar v1.1.2 h1:QaNrNiZx0+Nar5dLgTVp5mXkyoVFIbepjyEoGSnhbAY= +github.com/poy/onpar v1.1.2/go.mod h1:6X8FLNoxyr9kkmnlqpK6LSoiOtrO6MICtWwEuWkLjzg= github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -357,6 +361,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/rubenv/sql-migrate v1.7.0 h1:HtQq1xyTN2ISmQDggnh0c9U3JlP8apWh8YO2jzlXpTI= +github.com/rubenv/sql-migrate v1.7.0/go.mod h1:S4wtDEG1CKn+0ShpTtzWhFpHHI5PvCUtiGI+C+Z2THE= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -604,8 +610,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From c41e7ef646b564fd78992f4544e3d294380d0646 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 25 Jul 2024 18:29:51 +0200 Subject: [PATCH 06/19] add blob count! --- README.md | 5 ++++- cmd/core/check-payload-value.go | 15 ++++++++++++++- database/migrations/002_add_blob_count.go | 19 +++++++++++++++++++ database/migrations/migration.go | 1 + database/types.go | 4 ++++ 5 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 database/migrations/002_add_blob_count.go diff --git a/README.md b/README.md index 2e532ee..a416ac8 100644 --- a/README.md +++ b/README.md @@ -94,13 +94,16 @@ Start by filling the DB with relay data (delivered payloads), and checking it: source .env.local # Start Postgres -docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres +docker run -d --name relayscan-postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres # Query only a single relay, and for the shortest time possible go run . core data-api-backfill --relay fb --min-slot -1 # Now the DB has data, check it (for only a single slot, the latest one, see logs for "latest received payload at slot N" in the backfill command) go run . core check-payload-value --slot _N_ + +# Reset DB? Remove and restart the Docker container +docker rm -f relayscan-postgres ``` For linting and testing: diff --git a/cmd/core/check-payload-value.go b/cmd/core/check-payload-value.go index 3f0a6ff..0aef80c 100644 --- a/cmd/core/check-payload-value.go +++ b/cmd/core/check-payload-value.go @@ -199,7 +199,8 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, block_coinbase_is_proposer=:block_coinbase_is_proposer, coinbase_diff_wei=:coinbase_diff_wei, coinbase_diff_eth=:coinbase_diff_eth, - found_onchain=:found_onchain -- should rename field, because getBlockByHash might succeed even though this slot was missed + found_onchain=:found_onchain, -- should rename field, because getBlockByHash might succeed even though this slot was missed + num_blob_txs=:num_blob_txs WHERE slot=:slot` _, err := db.DB.NamedExec(query, entry) if err != nil { @@ -321,6 +322,16 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, } } + // find number of blob transactions + numBlobTxs := 0 + numBlobs := 0 + for _, tx := range txs { + if tx.Type() == types.BlobTxType { + numBlobTxs++ + numBlobs += len(tx.BlobHashes()) + } + } + entry.NumBlobTxs = database.NewNullInt64(int64(numBlobTxs)) entry.ExtraData = database.ExtraDataToUtf8Str(block.Extra()) entry.ValueCheckOk = database.NewNullBool(proposerValueDiffFromClaim.String() == "0") entry.ValueCheckMethod = database.NewNullString(checkMethod) @@ -339,6 +350,8 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, "valueDeliveredEth": entry.ValueDeliveredEth.String, // "valueDeliveredDiffWei": entry.ValueDeliveredDiffWei, "valueDeliveredDiffEth": entry.ValueDeliveredDiffEth.String, + "numBlobTxs": numBlobTxs, + "numBlobs": numBlobs, }).Info("value check done") if !coinbaseIsProposer { diff --git a/database/migrations/002_add_blob_count.go b/database/migrations/002_add_blob_count.go new file mode 100644 index 0000000..c68acab --- /dev/null +++ b/database/migrations/002_add_blob_count.go @@ -0,0 +1,19 @@ +package migrations + +import ( + "github.com/flashbots/relayscan/database/vars" + migrate "github.com/rubenv/sql-migrate" +) + +var migration002SQL = ` + ALTER TABLE ` + vars.TableDataAPIPayloadDelivered + ` ADD num_blob_txs int DEFAULT NULL; + ALTER TABLE ` + vars.TableDataAPIPayloadDelivered + ` ADD num_blobs int DEFAULT NULL; +` + +var Migration002AddBlobCount = &migrate.Migration{ + Id: "002-add-blob-count", + Up: []string{migration002SQL}, + + DisableTransactionUp: false, + DisableTransactionDown: true, +} diff --git a/database/migrations/migration.go b/database/migrations/migration.go index a990cca..973c7c4 100644 --- a/database/migrations/migration.go +++ b/database/migrations/migration.go @@ -8,5 +8,6 @@ import ( var Migrations = migrate.MemoryMigrationSource{ Migrations: []*migrate.Migration{ Migration001InitDatabase, + Migration002AddBlobCount, }, } diff --git a/database/types.go b/database/types.go index 60618c6..80d1b18 100644 --- a/database/types.go +++ b/database/types.go @@ -62,6 +62,10 @@ type DataAPIPayloadDeliveredEntry struct { CoinbaseDiffWei sql.NullString `db:"coinbase_diff_wei"` CoinbaseDiffEth sql.NullString `db:"coinbase_diff_eth"` Notes sql.NullString `db:"notes"` + + // Blob info added 2024-07-25 + NumBlobTxs sql.NullInt64 `db:"num_blob_txs"` + NumBlobs sql.NullInt64 `db:"num_blobs"` } type DataAPIBuilderBidEntry struct { From 20220ff4deb1193ce684e148d90e7861c19108d9 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Thu, 25 Jul 2024 18:50:58 +0200 Subject: [PATCH 07/19] fix stuff --- README.md | 5 ++++- cmd/core/check-payload-value.go | 27 +++++++++++++++++---------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index a416ac8..4a94cfc 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,10 @@ docker run -d --name relayscan-postgres -p 5432:5432 -e POSTGRES_USER=postgres - # Query only a single relay, and for the shortest time possible go run . core data-api-backfill --relay fb --min-slot -1 -# Now the DB has data, check it (for only a single slot, the latest one, see logs for "latest received payload at slot N" in the backfill command) +# Now the DB has data, check it (and update in DB) +go run . core check-payload-value + +# Can also check a single slot only: go run . core check-payload-value --slot _N_ # Reset DB? Remove and restart the Docker container diff --git a/cmd/core/check-payload-value.go b/cmd/core/check-payload-value.go index 0aef80c..c521dfb 100644 --- a/cmd/core/check-payload-value.go +++ b/cmd/core/check-payload-value.go @@ -2,7 +2,6 @@ package core import ( "context" - "database/sql" "fmt" "math/big" "strings" @@ -200,7 +199,8 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, coinbase_diff_wei=:coinbase_diff_wei, coinbase_diff_eth=:coinbase_diff_eth, found_onchain=:found_onchain, -- should rename field, because getBlockByHash might succeed even though this slot was missed - num_blob_txs=:num_blob_txs + num_blob_txs=:num_blob_txs, + num_blobs=:num_blobs WHERE slot=:slot` _, err := db.DB.NamedExec(query, entry) if err != nil { @@ -217,7 +217,7 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, "blockHash": entry.BlockHash, "relay": entry.Relay, }) - _log.Infof("checking slot...") + _log.Infof("checking slot %d ...", entry.Slot) claimedProposerValue, ok := new(big.Int).SetString(entry.ValueClaimedWei, 10) if !ok { _log.Fatalf("couldn't convert claimed value to big.Int: %s", entry.ValueClaimedWei) @@ -241,15 +241,20 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, // query block by hash block, err = getBlockByHash(entry.BlockHash) if err != nil { - _log.WithError(err).Fatalf("couldn't get block %s", entry.BlockHash) - } else if block == nil { - _log.WithError(err).Warnf("block not found: %s", entry.BlockHash) - entry.FoundOnChain = database.NewNullBool(false) - saveEntry(_log, entry) - continue + if err.Error() == "not found" { + _log.WithError(err).Warnf("block by hash not found: %s", entry.BlockHash) + _log.WithError(err).Warnf("block not found: %s", entry.BlockHash) + entry.FoundOnChain = database.NewNullBool(false) + saveEntry(_log, entry) + continue + } else { + _log.WithError(err).Fatalf("error querying block by hash: %s", entry.BlockHash) + } } - entry.FoundOnChain = sql.NullBool{} //nolint:exhaustruct + // We found this block by hash, it's on chain + entry.FoundOnChain = database.NewNullBool(true) + if !entry.BlockNumber.Valid { entry.BlockNumber = database.NewNullInt64(block.Number().Int64()) } @@ -332,6 +337,8 @@ func startUpdateWorker(wg *sync.WaitGroup, db *database.DatabaseService, client, } } entry.NumBlobTxs = database.NewNullInt64(int64(numBlobTxs)) + entry.NumBlobs = database.NewNullInt64(int64(numBlobs)) + entry.ExtraData = database.ExtraDataToUtf8Str(block.Extra()) entry.ValueCheckOk = database.NewNullBool(proposerValueDiffFromClaim.String() == "0") entry.ValueCheckMethod = database.NewNullString(checkMethod) From f997bf228777c671d77d99a645cd5e33050fbd19 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 00:23:12 +0200 Subject: [PATCH 08/19] makefile plus --- Makefile | 73 +++++++++++++++++++++++++++++++++++-------------------- README.md | 17 ++++++++----- 2 files changed, 57 insertions(+), 33 deletions(-) diff --git a/Makefile b/Makefile index 0e4fd83..d8a6623 100644 --- a/Makefile +++ b/Makefile @@ -1,66 +1,85 @@ +# Heavily inspired by Lighthouse: https://github.com/sigp/lighthouse/blob/stable/Makefile +# and Reth: https://github.com/paradigmxyz/reth/blob/main/Makefile +.DEFAULT_GOAL := help + VERSION := $(shell git describe --tags --always --dirty="-dev") -all: build-portable +##@ Help + +help: ## Display this help + @awk 'BEGIN {FS = ":.*##"; printf "Usage:\n make \033[36m\033[0m\n"} /^[a-zA-Z_0-9-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST) -v: +v: ## Show the current version @echo "Version: ${VERSION}" -clean: +##@ Building + +clean: ## Remove build artifacts rm -rf relayscan build/ -build: +build: ## Build the relayscan binary go build -trimpath -ldflags "-s -X cmd.Version=${VERSION} -X main.Version=${VERSION}" -v -o relayscan . -build-portable: - go build -trimpath -ldflags "-s -X cmd.Version=${VERSION} -X main.Version=${VERSION}" -v -o relayscan . +docker-image: ## Build the relayscan docker image + DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan -test: - go test ./... +generate-ssz: ## Generate SSZ serialization code + rm -f common/ultrasoundbid_encoding.go + sszgen --path common --objs UltrasoundStreamBid -test-race: - go test -race ./... +##@ Production tasks + +update-bids-website: ## Update the bid archive website + go run . service bidcollect --build-website --build-website-upload + +##@ Linting and Testing -lint: +lint: ## Lint the code gofmt -d -s . gofumpt -d -extra . go vet ./... staticcheck ./... golangci-lint run -lt: lint test +test: ## Run tests + go test ./... + +test-race: ## Run tests with -race fla + go test -race ./... -gofumpt: +lt: lint test ## Run lint and tests + +gofumpt: ## Run gofumpt on the code gofumpt -l -w -extra . -fmt: +fmt: ## Format the code with gofmt and gofumpt and gc gofmt -s -w . gofumpt -extra -w . gci write . go mod tidy -cover: +cover: ## Run tests with coverage go test -coverprofile=/tmp/go-sim-lb.cover.tmp ./... go tool cover -func /tmp/go-sim-lb.cover.tmp unlink /tmp/go-sim-lb.cover.tmp -cover-html: +cover-html: ## Run tests with coverage and output the HTML report go test -coverprofile=/tmp/go-sim-lb.cover.tmp ./... go tool cover -html=/tmp/go-sim-lb.cover.tmp unlink /tmp/go-sim-lb.cover.tmp -docker-image: - DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan +##@ Development +dev-website: ## Run the relayscan website service in development mode + DB_DONT_APPLY_SCHEMA=1 go run . service website --dev -generate-ssz: - rm -f common/ultrasoundbid_encoding.go - sszgen --path common --objs UltrasoundStreamBid +dev-bids-website: ## Run the bidcollect website in development mode + go run . service bidcollect --devserver -update-bids-website: - go run . service bidcollect --build-website --build-website-upload +dev-postgres-start: ## Start a Postgres container for development + docker run -d --name relayscan-postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres -dev-website: - DB_DONT_APPLY_SCHEMA=1 go run . service website --dev +dev-postgres-stop: ## Stop the Postgres container + docker rm -f relayscan-postgres -dev-bids-website: - go run . service bidcollect --devserver +dev-postgres-wipe: dev-postgres-stop dev-postgres-start ## Restart the Postgres container (wipes the database) diff --git a/README.md b/README.md index 4a94cfc..5c9889f 100644 --- a/README.md +++ b/README.md @@ -93,8 +93,8 @@ Start by filling the DB with relay data (delivered payloads), and checking it: # Copy .env.example to .env.local, update ETH_NODE_URI and source it source .env.local -# Start Postgres -docker run -d --name relayscan-postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres +# Start Postgres Docker container +make dev-postgres-start # Query only a single relay, and for the shortest time possible go run . core data-api-backfill --relay fb --min-slot -1 @@ -105,8 +105,11 @@ go run . core check-payload-value # Can also check a single slot only: go run . core check-payload-value --slot _N_ -# Reset DB? Remove and restart the Docker container -docker rm -f relayscan-postgres +# Reset DB +dev-postgres-wipe + +# See the Makefile for more commands +make help ``` For linting and testing: @@ -117,11 +120,13 @@ go install mvdan.cc/gofumpt@latest go install honnef.co/go/tools/cmd/staticcheck@v0.4.3 go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 -# Lint, test and build +# Lint and test make lint make test make test-race -make build + +# Format the code +make fmt ``` From 652349bd545b3d5e80b73696cf30d497d1912a45 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 00:30:31 +0200 Subject: [PATCH 09/19] remove deprecated collect-live-bids service --- cmd/service/collect-live-bids.go | 40 -------- cmd/service/service.go | 1 - services/collector/collector.go | 157 ------------------------------- 3 files changed, 198 deletions(-) delete mode 100644 cmd/service/collect-live-bids.go delete mode 100644 services/collector/collector.go diff --git a/cmd/service/collect-live-bids.go b/cmd/service/collect-live-bids.go deleted file mode 100644 index 8a376f1..0000000 --- a/cmd/service/collect-live-bids.go +++ /dev/null @@ -1,40 +0,0 @@ -package service - -import ( - "github.com/flashbots/relayscan/common" - "github.com/flashbots/relayscan/database" - "github.com/flashbots/relayscan/services/collector" - "github.com/flashbots/relayscan/vars" - "github.com/spf13/cobra" -) - -func init() { - liveBidsCmd.Flags().StringVar(&beaconNodeURI, "beacon-uri", vars.DefaultBeaconURI, "beacon endpoint") -} - -var liveBidsCmd = &cobra.Command{ - Use: "collect-live-bids", - Short: "On every slot, ask for live bids (using getHeader)", - Run: func(cmd *cobra.Command, args []string) { - // Connect to Postgres - db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) - - // _relay, err := common.RelayURLToEntry(common.RelayURLs[0]) - // if err != nil { - // log.WithError(err).Fatal("failed to get relays") - // } - // relays := []common.RelayEntry{_relay} - relays, err := common.GetRelays() - if err != nil { - log.WithError(err).Fatal("failed to get relays") - } - - log.Infof("Using %d relays", len(relays)) - for index, relay := range relays { - log.Infof("relay #%d: %s", index+1, relay.Hostname()) - } - - relayCollector := collector.NewRelayCollector(log, relays, beaconNodeURI, db) - relayCollector.Start() - }, -} diff --git a/cmd/service/service.go b/cmd/service/service.go index 8d75727..e1c6328 100644 --- a/cmd/service/service.go +++ b/cmd/service/service.go @@ -21,6 +21,5 @@ var ServiceCmd = &cobra.Command{ func init() { ServiceCmd.AddCommand(websiteCmd) - ServiceCmd.AddCommand(liveBidsCmd) ServiceCmd.AddCommand(bidCollectCmd) } diff --git a/services/collector/collector.go b/services/collector/collector.go deleted file mode 100644 index cb6fd31..0000000 --- a/services/collector/collector.go +++ /dev/null @@ -1,157 +0,0 @@ -// Package collector collects data from the relays -package collector - -import ( - "context" - "fmt" - "net/http" - "strings" - "time" - - "github.com/flashbots/go-boost-utils/types" - "github.com/flashbots/mev-boost-relay/beaconclient" - relaycommon "github.com/flashbots/mev-boost-relay/common" - "github.com/flashbots/relayscan/common" - "github.com/flashbots/relayscan/database" - "github.com/sirupsen/logrus" -) - -type RelayCollector struct { - log *logrus.Entry - relays []common.RelayEntry - bn *beaconclient.ProdBeaconInstance - db *database.DatabaseService -} - -func NewRelayCollector(log *logrus.Entry, relays []common.RelayEntry, beaconURL string, db *database.DatabaseService) *RelayCollector { - srv := &RelayCollector{ - log: log, - relays: relays, - db: db, - bn: beaconclient.NewProdBeaconInstance(log, beaconURL), - } - - return srv -} - -func (s *RelayCollector) Start() { - s.log.Info("Starting relay collector service") - - // Check beacon-node sync status, process current slot and start slot updates - syncStatus, err := s.bn.SyncStatus() - if err != nil { - s.log.WithError(err).Fatal("couldn't get BN sync status") - } else if syncStatus.IsSyncing { - s.log.Fatal("beacon node is syncing") - } - - var latestSlot uint64 - var latestEpoch uint64 - var duties map[uint64]string - - // subscribe to head events - c := make(chan beaconclient.HeadEventData) - go s.bn.SubscribeToHeadEvents(c) - for { - headEvent := <-c - if headEvent.Slot <= latestSlot { - continue - } - - latestSlot = headEvent.Slot - currentEpoch := latestSlot / relaycommon.SlotsPerEpoch - slotsUntilNextEpoch := relaycommon.SlotsPerEpoch - (latestSlot % relaycommon.SlotsPerEpoch) - s.log.Infof("headSlot: %d / currentEpoch: %d / slotsUntilNextEpoch: %d", latestSlot, currentEpoch, slotsUntilNextEpoch) - - // On every new epoch, get proposer duties for current and next epoch (to avoid boundary problems) - if len(duties) == 0 || currentEpoch > latestEpoch { - dutiesResp, err := s.bn.GetProposerDuties(currentEpoch) - if err != nil { - s.log.WithError(err).Error("couldn't get proposer duties") - continue - } - - duties = make(map[uint64]string) - for _, d := range dutiesResp.Data { - duties[d.Slot] = d.Pubkey - } - - dutiesResp, err = s.bn.GetProposerDuties(currentEpoch + 1) - if err != nil { - s.log.WithError(err).Error("failed get proposer duties") - } else { - for _, d := range dutiesResp.Data { - duties[d.Slot] = d.Pubkey - } - } - s.log.Infof("Got %d duties", len(duties)) - } - latestEpoch = currentEpoch - - // Now get the latest block, for the execution payload - block, err := s.bn.GetBlock("head") - if err != nil { - s.log.WithError(err).Error("failed get latest block from BN") - continue - } - - nextSlot := block.Data.Message.Slot + 1 - nextProposerPubkey := duties[nextSlot] - s.log.Infof("next slot: %d / block: %s / parent: %s / proposerPubkey: %s", nextSlot, block.Data.Message.Body.ExecutionPayload.BlockHash.String(), block.Data.Message.Body.ExecutionPayload.ParentHash, nextProposerPubkey) - - if nextProposerPubkey == "" { - s.log.WithField("duties", duties).Error("no proposerPubkey for next slot") - } else { - go s.CallGetHeader(10*time.Second, nextSlot, block.Data.Message.Body.ExecutionPayload.BlockHash.String(), duties[nextSlot]) - go s.CallGetHeader(12*time.Second, nextSlot, block.Data.Message.Body.ExecutionPayload.BlockHash.String(), duties[nextSlot]) - } - fmt.Println("") - } -} - -func (s *RelayCollector) CallGetHeader(timeout time.Duration, slot uint64, parentHash, proposerPubkey string) { - s.log.Infof("querying relays for bid in %.0f sec...", timeout.Seconds()) - - // Wait 12 seconds, allowing the builder to prepare bids - time.Sleep(timeout) - - for _, relay := range s.relays { - go s.CallGetHeaderOnRelay(relay, slot, parentHash, proposerPubkey) - } -} - -func (s *RelayCollector) CallGetHeaderOnRelay(relay common.RelayEntry, slot uint64, parentHash, proposerPubkey string) { - path := fmt.Sprintf("/eth/v1/builder/header/%d/%s/%s", slot, parentHash, proposerPubkey) - url := relay.GetURI(path) - log := s.log.WithField("relay", relay.Hostname()) - - log.Debugf("Querying %s", url) - var bid types.GetHeaderResponse - timeRequestStart := time.Now().UTC() - code, err := common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &bid) - timeRequestEnd := time.Now().UTC() - if err != nil { - if strings.Contains(err.Error(), "no builder bid") { - return - } - log.WithFields(logrus.Fields{ - "code": code, - "url": url, - }).WithError(err).Error("error on getHeader request") - return - } - if code != 200 { - // log.WithField("code", code).Info("no bid received") - return - } - entry := database.SignedBuilderBidToEntry(relay.Hostname(), slot, parentHash, proposerPubkey, timeRequestStart, timeRequestEnd, bid.Data) - log.Infof("bid received! slot: %d \t value: %s \t block_hash: %s \t timestamp: %d / %d", slot, bid.Data.Message.Value.String(), bid.Data.Message.Header.BlockHash.String(), bid.Data.Message.Header.Timestamp, entry.Timestamp) - err = s.db.SaveSignedBuilderBid(entry) - if err != nil { - log.WithFields(logrus.Fields{ - "extraData": bid.Data.Message.Header.ExtraData.String(), - "extraDataProcessed": entry.ExtraData, - }).WithError(err).Error("failed saving bid to database") - return - } -} From dc7818f751acf09c2eefbc9cca9225cc6de1db64 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 00:39:57 +0200 Subject: [PATCH 10/19] add comments to data-api-backfill --- cmd/core/data-api-backfill.go | 56 ++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 7145df0..9460ecf 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -18,14 +18,12 @@ var ( cliRelay string minSlot int64 initCursor uint64 - // bidsOnly bool ) func init() { backfillDataAPICmd.Flags().StringVar(&cliRelay, "relay", "", "specific relay only") backfillDataAPICmd.Flags().Uint64Var(&initCursor, "cursor", 0, "initial cursor") backfillDataAPICmd.Flags().Int64Var(&minSlot, "min-slot", 0, "minimum slot (if unset, backfill until the merge, negative number for that number of slots before latest)") - // backfillDataAPICmd.Flags().BoolVar(&bidsOnly, "bids", false, "only bids") } var backfillDataAPICmd = &cobra.Command{ @@ -76,7 +74,6 @@ var backfillDataAPICmd = &cobra.Command{ for _, relay := range relays { backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot)) - // backfiller.backfillDataAPIBids() err = backfiller.backfillPayloadsDelivered() if err != nil { log.WithError(err).WithField("relay", relay).Error("backfill failed") @@ -102,18 +99,19 @@ func newBackfiller(db *database.DatabaseService, relay common.RelayEntry, cursor } func (bf *backfiller) backfillPayloadsDelivered() error { - log.Infof("backfilling payloads data-api for relay %s ...", bf.relay.Hostname()) + _log := log.WithField("relay", bf.relay.Hostname()) + _log.Info("backfilling payloads from relay data-api ...") // 1. get latest entry from DB latestEntry, err := bf.db.GetDataAPILatestPayloadDelivered(bf.relay.Hostname()) latestSlotInDB := uint64(0) if err != nil && !errors.Is(err, sql.ErrNoRows) { - log.WithError(err).Fatal("failed to get latest entry") + _log.WithError(err).Fatal("failed to get latest entry") return err } else { latestSlotInDB = latestEntry.Slot } - log.Infof("last payload in db at slot: %d", latestSlotInDB) + _log.Infof("last payload in db for slot: %d", latestSlotInDB) // 2. backfill until latest DB entry is reached baseURL := bf.relay.GetURI("/relay/v1/data/bidtraces/proposer_payload_delivered") @@ -127,65 +125,69 @@ func (bf *backfiller) backfillPayloadsDelivered() error { if cursorSlot > 0 { url = fmt.Sprintf("%s?cursor=%d", baseURL, cursorSlot) } - log.Info("url: ", url) + _log.WithField("url: ", url).Info("fetching payloads...") var data []relaycommon.BidTraceV2JSON _, err = common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data) if err != nil { return err } - log.Infof("got %d entries", len(data)) - entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data)) + _log.Infof("- response contains %d delivered payloads", len(data)) - for index, dataEntry := range data { - log.Debugf("saving entry for slot %d", dataEntry.Slot) - dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), dataEntry) + // build a list of entries for batch DB update + entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data)) + for index, payload := range data { + _log.Debugf("saving entry for slot %d", payload.Slot) + dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), payload) entries[index] = &dbEntry - if !slotsReceived[dataEntry.Slot] { - slotsReceived[dataEntry.Slot] = true + // Count number of slots with payloads + if !slotsReceived[payload.Slot] { + slotsReceived[payload.Slot] = true payloadsNew += 1 } - if cursorSlot == 0 { - log.Infof("latest received payload at slot %d", dataEntry.Slot) - cursorSlot = dataEntry.Slot - } else if cursorSlot > dataEntry.Slot { - cursorSlot = dataEntry.Slot + // Set cursor for next request + if cursorSlot == 0 || cursorSlot > payload.Slot { + cursorSlot = payload.Slot } - builders[dataEntry.BuilderPubkey] = true + // Remember the builder + builders[payload.BuilderPubkey] = true } + // Save entries err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) if err != nil { - log.WithError(err).Fatal("failed to save entries") + _log.WithError(err).Fatal("failed to save entries") return err } - // save builders + // Save builders for builderPubkey := range builders { err = bf.db.SaveBuilder(&database.BlockBuilderEntry{BuilderPubkey: builderPubkey}) if err != nil { - log.WithError(err).Error("failed to save builder") + _log.WithError(err).Error("failed to save builder") } } + // Stop as soon as no new payloads are received if payloadsNew == 0 { - log.Infof("No new payloads, all done. Earliest payload for slot: %d", cursorSlot) + _log.Infof("No new payloads, all done. Earliest payload for slot: %d", cursorSlot) return nil } + // Stop if at the latest slot in DB if cursorSlot < latestSlotInDB { - log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB) + _log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB) return nil } + // Stop if at min slot if cursorSlot < bf.minSlot { - log.Infof("Payloads backfilled until min slot %d", bf.minSlot) + _log.Infof("Payloads backfilled until min slot %d", bf.minSlot) return nil } - // time.Sleep(1 * time.Second) } } From a851144cd9c032ab93a6846ff52f2992029e52cf Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 00:51:12 +0200 Subject: [PATCH 11/19] backfill: logs cleanup --- README.md | 2 +- cmd/core/data-api-backfill.go | 35 ++++++++++++++++++++++++++--------- database/database.go | 17 ++++++++++++----- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 5c9889f..48da90b 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ source .env.local make dev-postgres-start # Query only a single relay, and for the shortest time possible -go run . core data-api-backfill --relay fb --min-slot -1 +go run . core data-api-backfill --relay fb --min-slot -1000 # Now the DB has data, check it (and update in DB) go run . core check-payload-value diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 9460ecf..aeb7720 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -11,6 +11,7 @@ import ( "github.com/flashbots/relayscan/common" "github.com/flashbots/relayscan/database" "github.com/flashbots/relayscan/vars" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -54,7 +55,7 @@ var backfillDataAPICmd = &cobra.Command{ log.Infof("Relayscan %s", vars.Version) log.Infof("Using %d relays", len(relays)) for index, relay := range relays { - log.Infof("relay #%d: %s", index+1, relay.Hostname()) + log.Infof("- relay #%d: %s", index+1, relay.Hostname()) } // Connect to Postgres @@ -62,9 +63,9 @@ var backfillDataAPICmd = &cobra.Command{ // If needed, get latest slot (i.e. if min-slot is negative) if minSlot < 0 { - log.Infof("Getting latest slot from beacon chain for offset %d", minSlot) + log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot) latestSlotOnBeaconChain := common.MustGetLatestSlot() - log.Infof("- Latest slot from beacon chain: %d", latestSlotOnBeaconChain) + log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain) minSlot = int64(latestSlotOnBeaconChain) + minSlot } @@ -100,7 +101,7 @@ func newBackfiller(db *database.DatabaseService, relay common.RelayEntry, cursor func (bf *backfiller) backfillPayloadsDelivered() error { _log := log.WithField("relay", bf.relay.Hostname()) - _log.Info("backfilling payloads from relay data-api ...") + // _log.Info("backfilling payloads from relay data-api ...") // 1. get latest entry from DB latestEntry, err := bf.db.GetDataAPILatestPayloadDelivered(bf.relay.Hostname()) @@ -111,7 +112,7 @@ func (bf *backfiller) backfillPayloadsDelivered() error { } else { latestSlotInDB = latestEntry.Slot } - _log.Infof("last payload in db for slot: %d", latestSlotInDB) + _log.Infof("Latest payload in DB for slot: %d", latestSlotInDB) // 2. backfill until latest DB entry is reached baseURL := bf.relay.GetURI("/relay/v1/data/bidtraces/proposer_payload_delivered") @@ -125,22 +126,32 @@ func (bf *backfiller) backfillPayloadsDelivered() error { if cursorSlot > 0 { url = fmt.Sprintf("%s?cursor=%d", baseURL, cursorSlot) } - _log.WithField("url: ", url).Info("fetching payloads...") + _log.WithField("url: ", url).Info("Fetching payloads...") var data []relaycommon.BidTraceV2JSON _, err = common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data) if err != nil { return err } - _log.Infof("- response contains %d delivered payloads", len(data)) + _log.Infof("Response contains %d delivered payloads", len(data)) // build a list of entries for batch DB update entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data)) + slotFirst := uint64(0) + slotLast := uint64(0) for index, payload := range data { _log.Debugf("saving entry for slot %d", payload.Slot) dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), payload) entries[index] = &dbEntry + // Set first and last slot + if slotFirst == 0 || payload.Slot < slotFirst { + slotFirst = payload.Slot + } + if slotLast == 0 || payload.Slot > slotLast { + slotLast = payload.Slot + } + // Count number of slots with payloads if !slotsReceived[payload.Slot] { slotsReceived[payload.Slot] = true @@ -157,12 +168,18 @@ func (bf *backfiller) backfillPayloadsDelivered() error { } // Save entries - err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) + rowsAffected, err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) if err != nil { _log.WithError(err).Fatal("failed to save entries") return err } + _log.WithFields(logrus.Fields{ + "rowsAffected": rowsAffected, + "slotFirst": slotFirst, + "slotLast": slotLast, + }).Info("Batch of payloads saved to database") + // Save builders for builderPubkey := range builders { err = bf.db.SaveBuilder(&database.BlockBuilderEntry{BuilderPubkey: builderPubkey}) @@ -179,7 +196,7 @@ func (bf *backfiller) backfillPayloadsDelivered() error { // Stop if at the latest slot in DB if cursorSlot < latestSlotInDB { - _log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB) + _log.Infof("Payloads backfilled until last in DB at slot %d", latestSlotInDB) return nil } diff --git a/database/database.go b/database/database.go index 1948df9..50b35d2 100644 --- a/database/database.go +++ b/database/database.go @@ -67,9 +67,9 @@ func (s *DatabaseService) SaveDataAPIPayloadDelivered(entry *DataAPIPayloadDeliv return err } -func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPayloadDeliveredEntry) error { +func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPayloadDeliveredEntry) (rowsAffected int64, err error) { if len(entries) == 0 { - return nil + return 0, nil } query := `INSERT INTO ` + vars.TableDataAPIPayloadDelivered + ` @@ -84,12 +84,19 @@ func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPay end = len(entries) } - _, err := s.DB.NamedExec(query, entries[i:end]) + r, err := s.DB.NamedExec(query, entries[i:end]) if err != nil { - return err + return 0, err } + + _rowsAffected, err := r.RowsAffected() + if err != nil { + return 0, err + } + + rowsAffected += _rowsAffected } - return nil + return rowsAffected, nil } func (s *DatabaseService) GetDataAPILatestPayloadDelivered(relay string) (*DataAPIPayloadDeliveredEntry, error) { From 0c2c1d9be65cf8c35f9fab9e464a477cf79da9fb Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 00:53:23 +0200 Subject: [PATCH 12/19] cleanup --- Makefile | 5 +++ README.md | 2 +- cmd/core/data-api-backfill.go | 79 +++-------------------------------- 3 files changed, 12 insertions(+), 74 deletions(-) diff --git a/Makefile b/Makefile index d8a6623..11c6c34 100644 --- a/Makefile +++ b/Makefile @@ -14,21 +14,26 @@ v: ## Show the current version ##@ Building +.PHONY: clean clean: ## Remove build artifacts rm -rf relayscan build/ +.PHONY: build build: ## Build the relayscan binary go build -trimpath -ldflags "-s -X cmd.Version=${VERSION} -X main.Version=${VERSION}" -v -o relayscan . +.PHONY: docker-image docker-image: ## Build the relayscan docker image DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan +.PHONY: generate-ssz generate-ssz: ## Generate SSZ serialization code rm -f common/ultrasoundbid_encoding.go sszgen --path common --objs UltrasoundStreamBid ##@ Production tasks +.PHONY: update-bids-website update-bids-website: ## Update the bid archive website go run . service bidcollect --build-website --build-website-upload diff --git a/README.md b/README.md index 48da90b..f0227ab 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ source .env.local make dev-postgres-start # Query only a single relay, and for the shortest time possible -go run . core data-api-backfill --relay fb --min-slot -1000 +go run . core data-api-backfill --relay fb --min-slot -2000 # Now the DB has data, check it (and update in DB) go run . core check-payload-value diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index aeb7720..8ce7e78 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -168,16 +168,16 @@ func (bf *backfiller) backfillPayloadsDelivered() error { } // Save entries - rowsAffected, err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) + newEntries, err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) if err != nil { _log.WithError(err).Fatal("failed to save entries") return err } _log.WithFields(logrus.Fields{ - "rowsAffected": rowsAffected, - "slotFirst": slotFirst, - "slotLast": slotLast, + "newEntries": newEntries, + "slotFirst": slotFirst, + "slotLast": slotLast, }).Info("Batch of payloads saved to database") // Save builders @@ -196,81 +196,14 @@ func (bf *backfiller) backfillPayloadsDelivered() error { // Stop if at the latest slot in DB if cursorSlot < latestSlotInDB { - _log.Infof("Payloads backfilled until last in DB at slot %d", latestSlotInDB) + _log.Infof("Payloads backfilled until latest slot in DB: %d", latestSlotInDB) return nil } // Stop if at min slot if cursorSlot < bf.minSlot { - _log.Infof("Payloads backfilled until min slot %d", bf.minSlot) + _log.Infof("Payloads backfilled until min slot: %d", bf.minSlot) return nil } } } - -// func (bf *backfiller) backfillDataAPIBids() error { -// log.Infof("backfilling bids from relay %s ...", bf.relay.Hostname()) - -// // 1. get latest entry from DB -// latestEntry, err := bf.db.GetDataAPILatestBid(bf.relay.Hostname()) -// latestSlotInDB := uint64(0) -// if err != nil && !errors.Is(err, sql.ErrNoRows) { -// log.WithError(err).Fatal("failed to get latest entry") -// return err -// } else { -// latestSlotInDB = latestEntry.Slot -// } -// log.Infof("last known slot: %d", latestSlotInDB) - -// // 2. backfill until latest DB entry is reached -// baseURL := bf.relay.GetURI("/relay/v1/data/bidtraces/builder_blocks_received") -// cursorSlot := bf.cursorSlot -// slotsReceived := make(map[uint64]bool) - -// for { -// entriesNew := 0 -// url := baseURL -// if cursorSlot > 0 { -// url = fmt.Sprintf("%s?slot=%d", baseURL, cursorSlot) -// } -// log.Info("url: ", url) -// var data []relaycommon.BidTraceV2WithTimestampJSON -// common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data) - -// log.Infof("got %d entries", len(data)) -// entries := make([]*database.DataAPIBuilderBidEntry, len(data)) - -// for index, dataEntry := range data { -// log.Debugf("saving entry for slot %d", dataEntry.Slot) -// dbEntry := database.BidTraceV2WithTimestampJSONToBuilderBidEntry(bf.relay.Hostname(), dataEntry) -// entries[index] = &dbEntry - -// if !slotsReceived[dataEntry.Slot] { -// slotsReceived[dataEntry.Slot] = true -// entriesNew += 1 -// } - -// if cursorSlot == 0 { -// cursorSlot = dataEntry.Slot -// } -// } - -// err := bf.db.SaveDataAPIBids(entries) -// if err != nil { -// log.WithError(err).Fatal("failed to save bids") -// return err -// } - -// if entriesNew == 0 { -// log.Info("No new bids, all done") -// return nil -// } - -// if cursorSlot < latestSlotInDB { -// log.Infof("Bids backfilled until last in DB (%d)", latestSlotInDB) -// return nil -// } -// cursorSlot -= 1 -// // time.Sleep(1 * time.Second) -// } -// } From 566f02a0ba046dc5ea6aefa2697e1f23a666d027 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 10:05:54 +0200 Subject: [PATCH 13/19] payload check with min slot --- cmd/core/check-payload-value.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/core/check-payload-value.go b/cmd/core/check-payload-value.go index c521dfb..c9705c3 100644 --- a/cmd/core/check-payload-value.go +++ b/cmd/core/check-payload-value.go @@ -21,6 +21,7 @@ import ( var ( limit uint64 slotMax uint64 + slotMin uint64 ethNodeURI string ethNodeBackupURI string checkIncorrectOnly bool @@ -31,7 +32,8 @@ var ( func init() { checkPayloadValueCmd.Flags().Uint64Var(&slot, "slot", 0, "a specific slot") - checkPayloadValueCmd.Flags().Uint64Var(&slotMax, "slot-max", 0, "a specific max slot, only check slots below this") + checkPayloadValueCmd.Flags().Uint64Var(&slotMax, "slot-max", 0, "a specific max slot, only check slots before") + checkPayloadValueCmd.Flags().Uint64Var(&slotMin, "slot-min", 0, "only check slots after") checkPayloadValueCmd.Flags().Uint64Var(&limit, "limit", 1000, "how many payloads") checkPayloadValueCmd.Flags().Uint64Var(&numThreads, "threads", 10, "how many threads") checkPayloadValueCmd.Flags().StringVar(ðNodeURI, "eth-node", vars.DefaultEthNodeURI, "eth node URI (i.e. Infura)") @@ -88,6 +90,9 @@ var checkPayloadValueCmd = &cobra.Command{ if slotMax > 0 { query += fmt.Sprintf(" WHERE slot<=%d", slotMax) } + if slotMin > 0 { + query += fmt.Sprintf(" WHERE slot>=%d", slotMin) + } query += ` ORDER BY slot DESC` if limit > 0 { query += fmt.Sprintf(" limit %d", limit) From 66f06250ff4eb2b23eb6e3dccfe1e450d66e0a82 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 10:08:02 +0200 Subject: [PATCH 14/19] backfill: query with limit --- cmd/core/data-api-backfill.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 8ce7e78..8867e52 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -19,6 +19,7 @@ var ( cliRelay string minSlot int64 initCursor uint64 + pageLimit = 1000 ) func init() { @@ -122,9 +123,9 @@ func (bf *backfiller) backfillPayloadsDelivered() error { for { payloadsNew := 0 - url := baseURL + url := fmt.Sprintf("%s?limit=%d", baseURL, pageLimit) if cursorSlot > 0 { - url = fmt.Sprintf("%s?cursor=%d", baseURL, cursorSlot) + url = fmt.Sprintf("%s&cursor=%d", baseURL, cursorSlot) } _log.WithField("url: ", url).Info("Fetching payloads...") var data []relaycommon.BidTraceV2JSON From f5f1382d39eb078c8e014dac6a7210661872b7f4 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 10:11:05 +0200 Subject: [PATCH 15/19] check-payload-value finish info --- cmd/core/check-payload-value.go | 153 ++++++++++++++++---------------- 1 file changed, 78 insertions(+), 75 deletions(-) diff --git a/cmd/core/check-payload-value.go b/cmd/core/check-payload-value.go index c9705c3..a7aa97b 100644 --- a/cmd/core/check-payload-value.go +++ b/cmd/core/check-payload-value.go @@ -6,6 +6,7 @@ import ( "math/big" "strings" "sync" + "time" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -47,92 +48,94 @@ func init() { var checkPayloadValueCmd = &cobra.Command{ Use: "check-payload-value", Short: "Check payload value for delivered payloads", - Run: func(cmd *cobra.Command, args []string) { - var err error + Run: checkPayloadValue, +} - client, err := ethclient.Dial(ethNodeURI) - if err != nil { - log.Fatalf("Failed to create RPC client for '%s'", ethNodeURI) - } - log.Infof("Using eth node: %s", ethNodeURI) +func checkPayloadValue(cmd *cobra.Command, args []string) { + var err error + startTime := time.Now().UTC() - client2 := client - if ethNodeBackupURI != "" { - client2, err = ethclient.Dial(ethNodeBackupURI) - if err != nil { - log.Fatalf("Failed to create backup RPC client for '%s'", ethNodeBackupURI) - } - log.Infof("Using eth backup node: %s", ethNodeBackupURI) - } + client, err := ethclient.Dial(ethNodeURI) + if err != nil { + log.Fatalf("Failed to create RPC client for '%s'", ethNodeURI) + } + log.Infof("Using eth node: %s", ethNodeURI) - // Connect to Postgres - db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) + client2 := client + if ethNodeBackupURI != "" { + client2, err = ethclient.Dial(ethNodeBackupURI) + if err != nil { + log.Fatalf("Failed to create backup RPC client for '%s'", ethNodeBackupURI) + } + log.Infof("Using eth backup node: %s", ethNodeBackupURI) + } - // Connect to BN - // bn, headSlot := common.MustConnectBeaconNode(log, beaconNodeURI, false) - // log.Infof("beacon node connected. headslot: %d", headSlot) + // Connect to Postgres + db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) - entries := []database.DataAPIPayloadDeliveredEntry{} - query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + dbvars.TableDataAPIPayloadDelivered - if checkIncorrectOnly { - query += ` WHERE value_check_ok=false ORDER BY slot DESC` - if limit > 0 { - query += fmt.Sprintf(" limit %d", limit) - } - err = db.DB.Select(&entries, query) - } else if checkMissedOnly { - query += ` WHERE slot_missed=true ORDER BY slot DESC` - if limit > 0 { - query += fmt.Sprintf(" limit %d", limit) - } - err = db.DB.Select(&entries, query) - } else if checkAll { - if slotMax > 0 { - query += fmt.Sprintf(" WHERE slot<=%d", slotMax) - } - if slotMin > 0 { - query += fmt.Sprintf(" WHERE slot>=%d", slotMin) - } - query += ` ORDER BY slot DESC` - if limit > 0 { - query += fmt.Sprintf(" limit %d", limit) - } - err = db.DB.Select(&entries, query) - } else if slot != 0 { - query += ` WHERE slot=$1` - err = db.DB.Select(&entries, query, slot) - } else { - // query += ` WHERE value_check_ok IS NULL AND slot_missed IS NULL ORDER BY slot DESC LIMIT $1` - query += ` WHERE value_check_ok IS NULL ORDER BY slot DESC LIMIT $1` - err = db.DB.Select(&entries, query, limit) + entries := []database.DataAPIPayloadDeliveredEntry{} + query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + dbvars.TableDataAPIPayloadDelivered + if checkIncorrectOnly { + query += ` WHERE value_check_ok=false ORDER BY slot DESC` + if limit > 0 { + query += fmt.Sprintf(" limit %d", limit) } - if err != nil { - log.WithError(err).Fatalf("couldn't get entries") + err = db.DB.Select(&entries, query) + } else if checkMissedOnly { + query += ` WHERE slot_missed=true ORDER BY slot DESC` + if limit > 0 { + query += fmt.Sprintf(" limit %d", limit) } - - log.Infof("query: %s", query) - log.Infof("got %d entries", len(entries)) - if len(entries) == 0 { - return + err = db.DB.Select(&entries, query) + } else if checkAll { + if slotMax > 0 { + query += fmt.Sprintf(" WHERE slot<=%d", slotMax) } - - wg := new(sync.WaitGroup) - entryC := make(chan database.DataAPIPayloadDeliveredEntry) - if slot != 0 { - numThreads = 1 + if slotMin > 0 { + query += fmt.Sprintf(" WHERE slot>=%d", slotMin) } - for i := 0; i < int(numThreads); i++ { - log.Infof("starting worker %d", i+1) - wg.Add(1) - go startUpdateWorker(wg, db, client, client2, entryC) + query += ` ORDER BY slot DESC` + if limit > 0 { + query += fmt.Sprintf(" limit %d", limit) } + err = db.DB.Select(&entries, query) + } else if slot != 0 { + query += ` WHERE slot=$1` + err = db.DB.Select(&entries, query, slot) + } else { + // query += ` WHERE value_check_ok IS NULL AND slot_missed IS NULL ORDER BY slot DESC LIMIT $1` + query += ` WHERE value_check_ok IS NULL ORDER BY slot DESC LIMIT $1` + err = db.DB.Select(&entries, query, limit) + } + if err != nil { + log.WithError(err).Fatalf("couldn't get entries") + } - for _, entry := range entries { - entryC <- entry - } - close(entryC) - wg.Wait() - }, + log.Infof("query: %s", query) + log.Infof("got %d entries", len(entries)) + if len(entries) == 0 { + return + } + + wg := new(sync.WaitGroup) + entryC := make(chan database.DataAPIPayloadDeliveredEntry) + if slot != 0 { + numThreads = 1 + } + for i := 0; i < int(numThreads); i++ { + log.Infof("starting worker %d", i+1) + wg.Add(1) + go startUpdateWorker(wg, db, client, client2, entryC) + } + + for _, entry := range entries { + entryC <- entry + } + close(entryC) + wg.Wait() + + timeNeeded := time.Since(startTime) + log.WithField("timeNeeded", timeNeeded).Info("All done!") } func _getBalanceDiff(ethClient *ethclient.Client, address ethcommon.Address, blockNumber *big.Int) (*big.Int, error) { From 22f68b62c1dd0abc927a95ba4ea446afda309e99 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 10:13:54 +0200 Subject: [PATCH 16/19] backfill: log done and time needed --- cmd/core/data-api-backfill.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 8867e52..48fa1fb 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "time" relaycommon "github.com/flashbots/mev-boost-relay/common" "github.com/flashbots/relayscan/common" @@ -19,7 +20,7 @@ var ( cliRelay string minSlot int64 initCursor uint64 - pageLimit = 1000 + pageLimit = 200 // 200 is max on some relays ) func init() { @@ -34,6 +35,7 @@ var backfillDataAPICmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { var err error var relays []common.RelayEntry + startTime := time.Now().UTC() if cliRelay != "" { var relayEntry common.RelayEntry @@ -75,12 +77,16 @@ var backfillDataAPICmd = &cobra.Command{ } for _, relay := range relays { + log.Infof("Starting backfilling for relay %s ...", relay.Hostname()) backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot)) err = backfiller.backfillPayloadsDelivered() if err != nil { log.WithError(err).WithField("relay", relay).Error("backfill failed") } } + + timeNeeded := time.Since(startTime) + log.WithField("timeNeeded", timeNeeded).Info("All done!") }, } From 4392220df10dd46cb6463f9b2381506594103cd5 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 10:19:26 +0200 Subject: [PATCH 17/19] cleanup --- cmd/core/check-payload-value.go | 12 +++++++----- cmd/core/data-api-backfill.go | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cmd/core/check-payload-value.go b/cmd/core/check-payload-value.go index a7aa97b..d9bfe72 100644 --- a/cmd/core/check-payload-value.go +++ b/cmd/core/check-payload-value.go @@ -33,8 +33,8 @@ var ( func init() { checkPayloadValueCmd.Flags().Uint64Var(&slot, "slot", 0, "a specific slot") - checkPayloadValueCmd.Flags().Uint64Var(&slotMax, "slot-max", 0, "a specific max slot, only check slots before") - checkPayloadValueCmd.Flags().Uint64Var(&slotMin, "slot-min", 0, "only check slots after") + checkPayloadValueCmd.Flags().Uint64Var(&slotMax, "slot-max", 0, "a specific max slot, only check slots before (only works with --check-all)") + checkPayloadValueCmd.Flags().Uint64Var(&slotMin, "slot-min", 0, "only check slots after this one") checkPayloadValueCmd.Flags().Uint64Var(&limit, "limit", 1000, "how many payloads") checkPayloadValueCmd.Flags().Uint64Var(&numThreads, "threads", 10, "how many threads") checkPayloadValueCmd.Flags().StringVar(ðNodeURI, "eth-node", vars.DefaultEthNodeURI, "eth node URI (i.e. Infura)") @@ -91,9 +91,6 @@ func checkPayloadValue(cmd *cobra.Command, args []string) { if slotMax > 0 { query += fmt.Sprintf(" WHERE slot<=%d", slotMax) } - if slotMin > 0 { - query += fmt.Sprintf(" WHERE slot>=%d", slotMin) - } query += ` ORDER BY slot DESC` if limit > 0 { query += fmt.Sprintf(" limit %d", limit) @@ -129,6 +126,11 @@ func checkPayloadValue(cmd *cobra.Command, args []string) { } for _, entry := range entries { + // possibly skip + if slotMin != 0 && entry.Slot < slotMin { + continue + } + entryC <- entry } close(entryC) diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 48fa1fb..e9c88cb 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -20,7 +20,7 @@ var ( cliRelay string minSlot int64 initCursor uint64 - pageLimit = 200 // 200 is max on some relays + pageLimit = 100 // 100 is max on bloxroute ) func init() { From d4a808a7056bab059ef8c5d123acd863c31da863 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 10:25:51 +0200 Subject: [PATCH 18/19] fix GetLatestDeliveredPayload --- database/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/database.go b/database/database.go index 50b35d2..f889388 100644 --- a/database/database.go +++ b/database/database.go @@ -209,7 +209,7 @@ func (s *DatabaseService) GetDeliveredPayloadsForSlot(slot uint64) (res []*DataA func (s *DatabaseService) GetLatestDeliveredPayload() (*DataAPIPayloadDeliveredEntry, error) { query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number - FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL ORDER BY id DESC LIMIT 1;` + FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL ORDER BY slot DESC LIMIT 1;` entry := new(DataAPIPayloadDeliveredEntry) err := s.DB.Get(entry, query) return entry, err From 4898344e7dc412a443617f358d32fe59b41c3e72 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 10:31:10 +0200 Subject: [PATCH 19/19] add migration for blob indexes --- cmd/core/data-api-backfill.go | 2 +- database/migrations/003_add_blob_index.go | 19 +++++++++++++++++++ database/migrations/migration.go | 1 + 3 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 database/migrations/003_add_blob_index.go diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index e9c88cb..5beba13 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -131,7 +131,7 @@ func (bf *backfiller) backfillPayloadsDelivered() error { payloadsNew := 0 url := fmt.Sprintf("%s?limit=%d", baseURL, pageLimit) if cursorSlot > 0 { - url = fmt.Sprintf("%s&cursor=%d", baseURL, cursorSlot) + url = fmt.Sprintf("%s&cursor=%d", url, cursorSlot) } _log.WithField("url: ", url).Info("Fetching payloads...") var data []relaycommon.BidTraceV2JSON diff --git a/database/migrations/003_add_blob_index.go b/database/migrations/003_add_blob_index.go new file mode 100644 index 0000000..b7ab15c --- /dev/null +++ b/database/migrations/003_add_blob_index.go @@ -0,0 +1,19 @@ +package migrations + +import ( + "github.com/flashbots/relayscan/database/vars" + migrate "github.com/rubenv/sql-migrate" +) + +var migration003SQL = ` + CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_num_blob_txs_idx ON ` + vars.TableDataAPIPayloadDelivered + `("num_blob_txs"); + CREATE INDEX IF NOT EXISTS ` + vars.TableDataAPIPayloadDelivered + `_num_blobs_idx ON ` + vars.TableDataAPIPayloadDelivered + `("num_blobs"); +` + +var Migration003AddBlobIndexes = &migrate.Migration{ + Id: "003-add-blob-indexes", + Up: []string{migration003SQL}, + + DisableTransactionUp: false, + DisableTransactionDown: true, +} diff --git a/database/migrations/migration.go b/database/migrations/migration.go index 973c7c4..b6feb38 100644 --- a/database/migrations/migration.go +++ b/database/migrations/migration.go @@ -9,5 +9,6 @@ var Migrations = migrate.MemoryMigrationSource{ Migrations: []*migrate.Migration{ Migration001InitDatabase, Migration002AddBlobCount, + Migration003AddBlobIndexes, }, }