Skip to content

Commit

Permalink
e3 snaps: remove feature of storing files list in db (#11943)
Browse files Browse the repository at this point in the history
- seems we don't use much this feature 
- and i faced 1 bug on production server - where external RPCDaemon open
files list from db. there was up to `N` files, but smallest value in db
was `N+1000`. So, RPCDaemon see the gap. But if open RPCDaemon on folder
- it will see all files and work well.
  • Loading branch information
AskAlexSharov authored Sep 11, 2024
1 parent 2e0e1d0 commit 2179881
Show file tree
Hide file tree
Showing 9 changed files with 6 additions and 153 deletions.
12 changes: 0 additions & 12 deletions cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,16 +703,6 @@ func keybytesToHex(str []byte) []byte {
return nibbles
}

func rmSnKey(chaindata string) error {
db := mdbx.MustOpen(chaindata)
defer db.Close()
return db.Update(context.Background(), func(tx kv.RwTx) error {
_ = tx.Delete(kv.DatabaseInfo, rawdb.SnapshotsKey)
_ = tx.Delete(kv.DatabaseInfo, rawdb.SnapshotsHistoryKey)
return nil
})
}

func findLogs(chaindata string, block uint64, blockTotal uint64) error {
db := mdbx.MustOpen(chaindata)
defer db.Close()
Expand Down Expand Up @@ -931,8 +921,6 @@ func main() {
err = findLogs(*chaindata, uint64(*block), uint64(*blockTotal))
case "iterate":
err = iterate(*chaindata, *account)
case "rmSnKey":
err = rmSnKey(*chaindata)
}

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allBorSnapshots.OptimisticalyReopenWithDB(db)
allSnapshots.OptimisticalyReopenFolder()
allBorSnapshots.OptimisticalyReopenFolder()
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
Expand Down
38 changes: 0 additions & 38 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math"
"math/big"
Expand Down Expand Up @@ -1207,43 +1206,6 @@ func IsPosBlock(db kv.Getter, blockHash common.Hash) (trans bool, err error) {
return header.Difficulty.Sign() == 0, nil
}

var SnapshotsKey = []byte("snapshots")
var SnapshotsHistoryKey = []byte("snapshots_history")

func ReadSnapshots(tx kv.Tx) ([]string, []string, error) {
v, err := tx.GetOne(kv.DatabaseInfo, SnapshotsKey)
if err != nil {
return nil, nil, err
}
var res, resHist []string
_ = json.Unmarshal(v, &res)

v, err = tx.GetOne(kv.DatabaseInfo, SnapshotsHistoryKey)
if err != nil {
return nil, nil, err
}
_ = json.Unmarshal(v, &resHist)
return res, resHist, nil
}

func WriteSnapshots(tx kv.RwTx, list, histList []string) error {
res, err := json.Marshal(list)
if err != nil {
return err
}
if err := tx.Put(kv.DatabaseInfo, SnapshotsKey, res); err != nil {
return err
}
res, err = json.Marshal(histList)
if err != nil {
return err
}
if err := tx.Put(kv.DatabaseInfo, SnapshotsHistoryKey, res); err != nil {
return err
}
return nil
}

// PruneTable has `limit` parameter to avoid too large data deletes per one sync cycle - better delete by small portions to reduce db.FreeList size
func PruneTable(tx kv.RwTx, table string, pruneTo uint64, ctx context.Context, limit int) error {
c, err := tx.RwCursor(table)
Expand Down
15 changes: 0 additions & 15 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ type Aggregator struct {
ctx context.Context
ctxCancel context.CancelFunc

needSaveFilesListInDB atomic.Bool

wg sync.WaitGroup // goroutines spawned by Aggregator, to ensure all of them are finish at agg.Close

onFreeze OnFreezeFunc
Expand Down Expand Up @@ -808,8 +806,6 @@ func (a *Aggregator) mergeLoopStep(ctx context.Context) (somethingDone bool, err
a.recalcVisibleFiles(a.DirtyFilesEndTxNumMinimax())
a.cleanAfterMerge(in)

a.needSaveFilesListInDB.Store(true)

a.onFreeze(in.FrozenList())
closeAll = false
return true, nil
Expand All @@ -828,8 +824,6 @@ func (a *Aggregator) MergeLoop(ctx context.Context) error {
}

func (a *Aggregator) integrateDirtyFiles(sf AggV3StaticFiles, txNumFrom, txNumTo uint64) {
defer a.needSaveFilesListInDB.Store(true)

a.dirtyFilesLock.Lock()
defer a.dirtyFilesLock.Unlock()

Expand All @@ -841,13 +835,6 @@ func (a *Aggregator) integrateDirtyFiles(sf AggV3StaticFiles, txNumFrom, txNumTo
}
}

func (a *Aggregator) HasNewFrozenFiles() bool {
if a == nil {
return false
}
return a.needSaveFilesListInDB.CompareAndSwap(true, false)
}

type flusher interface {
Flush(ctx context.Context, tx kv.RwTx) error
}
Expand Down Expand Up @@ -1584,8 +1571,6 @@ func (ac *AggregatorRoTx) mergeFiles(ctx context.Context, files SelectedStaticFi
}

func (a *Aggregator) integrateMergedDirtyFiles(outs SelectedStaticFilesV3, in MergedFilesV3) {
defer a.needSaveFilesListInDB.Store(true)

a.dirtyFilesLock.Lock()
defer a.dirtyFilesLock.Unlock()

Expand Down
17 changes: 0 additions & 17 deletions eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,6 @@ func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs
return err
}
}
ac := agg.BeginFilesRo()
defer ac.Close()
if err := rawdb.WriteSnapshots(tx, blockReader.FrozenFiles(), ac.Files()); err != nil {
return err
}
ac.Close()

default:
diagnostics.Send(diagnostics.SnapshotFillDBStageUpdate{
Expand Down Expand Up @@ -581,17 +575,6 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv.
}

freezingCfg := cfg.blockReader.FreezingCfg()
if cfg.blockRetire.HasNewFrozenFiles() || cfg.agg.HasNewFrozenFiles() {
ac := cfg.agg.BeginFilesRo()
defer ac.Close()
aggFiles := ac.Files()
ac.Close()

if err := rawdb.WriteSnapshots(tx, cfg.blockReader.FrozenFiles(), aggFiles); err != nil {
return err
}
}

if freezingCfg.ProduceE2 {
//TODO: initialSync maybe save files progress here

Expand Down
27 changes: 1 addition & 26 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {

chainConfig := fromdb.ChainConfig(db)
cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName)
blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, db, logger)
_, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, db, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -1268,16 +1268,6 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
}

blockReader, _ := br.IO()
if err := db.Update(ctx, func(tx kv.RwTx) error {
ac := agg.BeginFilesRo()
defer ac.Close()
if err := rawdb.WriteSnapshots(tx, blockReader.FrozenFiles(), ac.Files()); err != nil {
return err
}
return nil
}); err != nil {
return err
}
deletedBlocks := math.MaxInt // To pass the first iteration
allDeletedBlocks := 0
for deletedBlocks > 0 { // prune happens by small steps, so need many runs
Expand Down Expand Up @@ -1378,21 +1368,6 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
if err = agg.BuildMissedIndices(ctx, indexWorkers); err != nil {
return err
}
if err := db.UpdateNosync(ctx, func(tx kv.RwTx) error {
blockReader, _ := br.IO()
ac := agg.BeginFilesRo()
defer ac.Close()
return rawdb.WriteSnapshots(tx, blockReader.FrozenFiles(), ac.Files())
}); err != nil {
return err
}
if err := db.Update(ctx, func(tx kv.RwTx) error {
ac := agg.BeginFilesRo()
defer ac.Close()
return rawdb.WriteSnapshots(tx, blockSnaps.Files(), ac.Files())
}); err != nil {
return err
}

return nil
}
Expand Down
1 change: 0 additions & 1 deletion turbo/services/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ type BlockSnapshots interface {
type BlockRetire interface {
PruneAncientBlocks(tx kv.RwTx, limit int) (deleted int, err error)
RetireBlocksInBackground(ctx context.Context, miBlockNum uint64, maxBlockNum uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []DownloadRequest) error, onDelete func(l []string) error, onFinishRetire func() error)
HasNewFrozenFiles() bool
BuildMissedIndicesIfNeed(ctx context.Context, logPrefix string, notifier DBEventNotifier, cc *chain.Config) error
SetWorkers(workers int)
GetWorkers() int
Expand Down
40 changes: 3 additions & 37 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,21 +670,6 @@ func (s *RoSnapshots) idxAvailability() uint64 {
return maxIdx
}

// OptimisticReopenWithDB - optimistically open snapshots (ignoring error), useful at App startup because:
// - user must be able: delete any snapshot file and Erigon will self-heal by re-downloading
// - RPC return Nil for historical blocks if snapshots are not open
func (s *RoSnapshots) OptimisticReopenWithDB(db kv.RoDB) {
var snList []string
_ = db.View(context.Background(), func(tx kv.Tx) (err error) {
snList, _, err = rawdb.ReadSnapshots(tx)
if err != nil {
return err
}
return nil
})
_ = s.ReopenList(snList, true)
}

func (s *RoSnapshots) LS() {
view := s.View()
defer view.Close()
Expand Down Expand Up @@ -853,8 +838,7 @@ func (s *RoSnapshots) Ranges() []Range {
return view.Ranges()
}

func (s *RoSnapshots) OptimisticalyReopenFolder() { _ = s.ReopenFolder() }
func (s *RoSnapshots) OptimisticalyReopenWithDB(db kv.RoDB) { _ = s.ReopenWithDB(db) }
func (s *RoSnapshots) OptimisticalyReopenFolder() { _ = s.ReopenFolder() }
func (s *RoSnapshots) ReopenFolder() error {
defer s.recalcVisibleFiles()

Expand Down Expand Up @@ -902,19 +886,6 @@ func (s *RoSnapshots) ReopenSegments(types []snaptype.Type, allowGaps bool) erro
return nil
}

func (s *RoSnapshots) ReopenWithDB(db kv.RoDB) error {
if err := db.View(context.Background(), func(tx kv.Tx) error {
snList, _, err := rawdb.ReadSnapshots(tx)
if err != nil {
return err
}
return s.ReopenList(snList, true)
}); err != nil {
return fmt.Errorf("ReopenWithDB: %w", err)
}
return nil
}

func (s *RoSnapshots) Close() {
if s == nil {
return
Expand Down Expand Up @@ -1383,9 +1354,8 @@ func chooseSegmentEnd(from, to uint64, snapType snaptype.Enum, chainConfig *chai
}

type BlockRetire struct {
maxScheduledBlock atomic.Uint64
working atomic.Bool
needSaveFilesListInDB atomic.Bool
maxScheduledBlock atomic.Uint64
working atomic.Bool

// shared semaphore with AggregatorV3 to allow only one type of snapshot building at a time
snBuildAllowed *semaphore.Weighted
Expand Down Expand Up @@ -1442,10 +1412,6 @@ func (br *BlockRetire) borSnapshots() *BorRoSnapshots {
return br.blockReader.BorSnapshots().(*BorRoSnapshots)
}

func (br *BlockRetire) HasNewFrozenFiles() bool {
return br.needSaveFilesListInDB.CompareAndSwap(true, false)
}

func CanRetire(curBlockNum uint64, blocksInSnapshots uint64, snapType snaptype.Enum, chainConfig *chain.Config) (blockFrom, blockTo uint64, can bool) {
var keep uint64 = 1024 //TODO: we will increase it to params.FullImmutabilityThreshold after some db optimizations
if curBlockNum <= keep {
Expand Down
5 changes: 0 additions & 5 deletions turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon-lib/state"

"github.com/erigontech/erigon/core/rawdb"
coresnaptype "github.com/erigontech/erigon/core/snaptype"
snaptype2 "github.com/erigontech/erigon/core/snaptype"
"github.com/erigontech/erigon/ethdb/prune"
Expand Down Expand Up @@ -451,10 +450,6 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
}
}

if err := rawdb.WriteSnapshots(tx, blockReader.FrozenFiles(), agg.Files()); err != nil {
return err
}

firstNonGenesis, err := rawdbv3.SecondKey(tx, kv.Headers)
if err != nil {
return err
Expand Down

0 comments on commit 2179881

Please sign in to comment.