Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
awskii committed Jan 16, 2025
1 parent 2a2a549 commit 20112cf
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 24 deletions.
30 changes: 26 additions & 4 deletions cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ var cmdResetState = &cobra.Command{
}
ctx, _ := common.RootContext()
defer db.Close()
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
logger.Error("Opening snapshots", "error", err)
return
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -138,11 +143,28 @@ func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *heimdall.
}
fmt.Fprintf(w, "--\n")
fmt.Fprintf(w, "prune distance: %s\n\n", pm.String())
fmt.Fprintf(w, "blocks: segments=%d, indices=%d\n", snapshots.SegmentsMax(), snapshots.IndicesMax())
fmt.Fprintf(w, "blocks.bor: segments=%d, indices=%d\n\n", borSn.SegmentsMax(), borSn.IndicesMax())
if snapshots != nil {
fmt.Fprintf(w, "blocks: segments=%d, indices=%d\n", snapshots.SegmentsMax(), snapshots.IndicesMax())
} else {
fmt.Fprintf(w, "blocks: segments=0, indices=0; failed to open snapshots\n")
}
if borSn != nil {
fmt.Fprintf(w, "blocks.bor: segments=%d, indices=%d\n", borSn.SegmentsMax(), borSn.IndicesMax())
} else {
fmt.Fprintf(w, "blocks.bor: segments=0, indices=0; failed to open bor snapshots\n")
}

_lb, _lt, _ := rawdbv3.TxNums.Last(tx)
fmt.Fprintf(w, "state.history: idx steps: %.02f, TxNums_Index(%d,%d), filesAmount: %d\n\n", rawdbhelpers.IdxStepsCountV3(tx), _lb, _lt, agg.FilesAmount())

var filesAmount []int
var aggIsNotOkMessage string
if agg != nil {
filesAmount = agg.FilesAmount()
} else {
aggIsNotOkMessage = "failed to open aggregator snapshots"
}

fmt.Fprintf(w, "state.history: idx steps: %.02f, TxNums_Index(%d,%d), filesAmount: %d%s\n\n", rawdbhelpers.IdxStepsCountV3(tx), _lb, _lt, filesAmount, aggIsNotOkMessage)
ethTxSequence, err := tx.ReadSequence(kv.EthTx)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion cmd/integration/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (tdb kv.
}
}

_, _, agg, _, _, _ := allSnapshots(context.Background(), rawDB, logger)
_, _, agg, _, _, _, err := allSnapshots(context.Background(), rawDB, logger)
if err != nil {
return nil, err
}
return temporal.New(rawDB, agg)
}
77 changes: 61 additions & 16 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,11 @@ func init() {
}

func stageSnapshots(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -694,7 +698,11 @@ func stageHeaders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
return err
}

sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -793,7 +801,10 @@ func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []str
return nil
}
if unwind > 0 {
sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, bridgeStore, heimdallStore, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -824,7 +835,10 @@ func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []str
return nil
}

sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, bridgeStore, heimdallStore, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -856,7 +870,10 @@ func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []str
}

func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -896,7 +913,11 @@ func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) err
func stagePolygonSync(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
engine, _, stageSync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
heimdallClient := engine.(*bor.Bor).HeimdallClient
sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, bridgeStore, heimdallStore, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -927,7 +948,11 @@ func stagePolygonSync(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
tmpdir := datadir.New(datadirCli).Tmp
chainConfig := fromdb.ChainConfig(db)
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -1019,7 +1044,11 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error

engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.Execution))
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -1176,7 +1205,10 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger

engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.Execution))
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -1212,7 +1244,10 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
func stagePatriciaTrie(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
_ = pm
sn, _, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, _, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer agg.Close()
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
Expand All @@ -1236,7 +1271,10 @@ func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) e
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)
must(sync.SetCurrentStage(stages.TxLookup))
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -1283,7 +1321,7 @@ func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) e
}

func printAllStages(db kv.RoDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, _ := allSnapshots(ctx, db, logger) // ignore error here to get some stat.
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -1322,7 +1360,7 @@ var _aggSingleton *libstate.Aggregator
var _bridgeStoreSingleton bridge.Store
var _heimdallStoreSingleton heimdall.Store

func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *heimdall.RoSnapshots, *libstate.Aggregator, *freezeblocks.CaplinSnapshots, bridge.Store, heimdall.Store) {
func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *heimdall.RoSnapshots, *libstate.Aggregator, *freezeblocks.CaplinSnapshots, bridge.Store, heimdall.Store, error) {
var err error

openSnapshotOnce.Do(func() {
Expand Down Expand Up @@ -1404,8 +1442,9 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl

if err != nil {
log.Error("[snapshots] failed to open", "err", err)
return nil, nil, nil, nil, nil, nil, err
}
return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _aggSingleton, _allCaplinSnapshotsSingleton, _bridgeStoreSingleton, _heimdallStoreSingleton
return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _aggSingleton, _allCaplinSnapshotsSingleton, _bridgeStoreSingleton, _heimdallStoreSingleton, nil
}

var openBlockReaderOnce sync.Once
Expand All @@ -1414,7 +1453,10 @@ var _blockWriterSingleton *blockio.BlockWriter

func blocksIO(db kv.RoDB, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter) {
openBlockReaderOnce.Do(func() {
sn, borSn, _, _, bridgeStore, heimdallStore := allSnapshots(context.Background(), db, logger)
sn, borSn, _, _, bridgeStore, heimdallStore, err := allSnapshots(context.Background(), db, logger)
if err != nil {
panic(err)
}
_blockReaderSingleton = freezeblocks.NewBlockReader(sn, borSn, heimdallStore, bridgeStore)
_blockWriterSingleton = blockio.NewBlockWriter()
})
Expand Down Expand Up @@ -1456,7 +1498,10 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.Minin
cfg.Miner = *miningConfig
}
cfg.Dirs = datadir.New(datadirCli)
allSn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
allSn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
panic(err) // we do already panic above on genesis error
}
cfg.Snapshot = allSn.Cfg()

blockReader, blockWriter := blocksIO(db, logger)
Expand Down
5 changes: 4 additions & 1 deletion cmd/integration/commands/state_domains.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ var readDomains = &cobra.Command{
}

func requestDomains(chainDb, stateDb kv.RwDB, ctx context.Context, readDomain string, addrs [][]byte, logger log.Logger) error {
sn, bsn, agg, _, _, _ := allSnapshots(ctx, chainDb, logger)
sn, bsn, agg, _, _, _, err := allSnapshots(ctx, chainDb, logger)
if err != nil {
return err
}
defer sn.Close()
defer bsn.Close()
defer agg.Close()
Expand Down
10 changes: 8 additions & 2 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ func syncBySmallSteps(db kv.TemporalRwDB, miningConfig params.MiningConfig, ctx
return err
}

sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger1)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger1)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -387,7 +390,10 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *chain2.Config) {
func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down

0 comments on commit 20112cf

Please sign in to comment.