diff --git a/cmd/devnet/services/polygon/heimdallsim/heimdall_simulator_test.go b/cmd/devnet/services/polygon/heimdallsim/heimdall_simulator_test.go index 8031a036e19..61dfa5cac0b 100644 --- a/cmd/devnet/services/polygon/heimdallsim/heimdall_simulator_test.go +++ b/cmd/devnet/services/polygon/heimdallsim/heimdall_simulator_test.go @@ -94,6 +94,9 @@ func TestSimulatorEvents(t *testing.T) { t.Skip("fix me on win") } + // the number of events included in v1-000000-000500-borevents.seg + eventsCount := 23 + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -101,7 +104,7 @@ func TestSimulatorEvents(t *testing.T) { res, err := sim.FetchStateSyncEvents(ctx, 0, time.Now(), 100) assert.NoError(t, err) - assert.Equal(t, 100, len(res)) + assert.Equal(t, eventsCount, len(res)) resLimit, err := sim.FetchStateSyncEvents(ctx, 0, time.Now(), 2) assert.NoError(t, err) @@ -117,7 +120,7 @@ func TestSimulatorEvents(t *testing.T) { lastTime := res[len(res)-1].Time resTime, err := sim.FetchStateSyncEvents(ctx, 0, lastTime.Add(-1*time.Second), 100) assert.NoError(t, err) - assert.Equal(t, 99, len(resTime)) + assert.Equal(t, eventsCount-1, len(resTime)) assert.Equal(t, res[:len(res)-1], resTime) } diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 0c6b270979d..51f4c688d04 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1325,7 +1325,8 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl openSnapshotOnce.Do(func() { dirs := datadir.New(datadirCli) - snapCfg := ethconfig.NewSnapCfg(true, true, true) + chainConfig := fromdb.ChainConfig(db) + snapCfg := ethconfig.NewSnapCfg(true, true, true, chainConfig.ChainName) _allSnapshotsSingleton = freezeblocks.NewRoSnapshots(snapCfg, dirs.Snap, 0, logger) _allBorSnapshotsSingleton = freezeblocks.NewBorRoSnapshots(snapCfg, dirs.Snap, 0, logger) diff --git a/erigon-lib/direct/sentry_client.go b/erigon-lib/direct/sentry_client.go index 8833b66fc2c..216a97f5322 100644 --- a/erigon-lib/direct/sentry_client.go +++ b/erigon-lib/direct/sentry_client.go @@ -26,7 +26,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" - sentryproto "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" + "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" types "github.com/erigontech/erigon-lib/gointerfaces/typesproto" libsentry "github.com/erigontech/erigon-lib/p2p/sentry" ) diff --git a/erigon-lib/downloader/snaptype/files.go b/erigon-lib/downloader/snaptype/files.go index ab66860ff99..08965fe9e5d 100644 --- a/erigon-lib/downloader/snaptype/files.go +++ b/erigon-lib/downloader/snaptype/files.go @@ -250,6 +250,9 @@ func (f FileInfo) Name() string { return f.name } func (f FileInfo) Dir() string { return filepath.Dir(f.Path) } func (f FileInfo) Len() uint64 { return f.To - f.From } +func (f FileInfo) GetRange() (from, to uint64) { return f.From, f.To } +func (f FileInfo) GetType() Type { return f.Type } + func (f FileInfo) CompareTo(o FileInfo) int { if res := cmp.Compare(f.From, o.From); res != 0 { return res diff --git a/erigon-lib/gointerfaces/downloaderproto/downloader_client_mock.go b/erigon-lib/gointerfaces/downloaderproto/downloader_client_mock.go index ec5754de89a..031c4de59dd 100644 --- a/erigon-lib/gointerfaces/downloaderproto/downloader_client_mock.go +++ b/erigon-lib/gointerfaces/downloaderproto/downloader_client_mock.go @@ -217,7 +217,7 @@ func (c *MockDownloaderClientVerifyCall) DoAndReturn(f func(context.Context, *Ve return c } -//SetLogPrefix mocks base method. +// SetLogPrefix mocks base method. func (m *MockDownloaderClient) SetLogPrefix(arg0 context.Context, arg1 *SetLogPrefixRequest, arg2 ...grpc.CallOption) (*emptypb.Empty, error) { m.ctrl.T.Helper() varargs := []any{arg0, arg1} @@ -316,4 +316,4 @@ func (m *MockDownloaderClient) TorrentCompleted(arg0 context.Context, arg1 *Torr ret0, _ := ret[0].(Downloader_TorrentCompletedClient) ret1, _ := ret[1].(error) return ret0, ret1 -} \ No newline at end of file +} diff --git a/erigon-lib/p2p/sentry/util_test.go b/erigon-lib/p2p/sentry/util_test.go index 7a6062a59ee..4f00ef56c7c 100644 --- a/erigon-lib/p2p/sentry/util_test.go +++ b/erigon-lib/p2p/sentry/util_test.go @@ -8,7 +8,7 @@ import ( "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" "github.com/erigontech/erigon-lib/gointerfaces/typesproto" - sentry "github.com/erigontech/erigon-lib/p2p/sentry" + "github.com/erigontech/erigon-lib/p2p/sentry" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "google.golang.org/grpc" diff --git a/erigon-lib/state/appendable.go b/erigon-lib/state/appendable.go index 5413b0e6ca7..9fa11665421 100644 --- a/erigon-lib/state/appendable.go +++ b/erigon-lib/state/appendable.go @@ -259,8 +259,6 @@ func (ap *Appendable) BuildMissedAccessors(ctx context.Context, g *errgroup.Grou } func (ap *Appendable) openDirtyFiles() error { - fmt.Printf("[dbg] dirtyFiles.Len() %d\n", ap.dirtyFiles.Len()) - var invalidFileItems []*filesItem invalidFileItemsLock := sync.Mutex{} ap.dirtyFiles.Walk(func(items []*filesItem) bool { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 0b2be54e549..f5dc0c86f40 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -142,6 +142,7 @@ type BlocksFreezing struct { NoDownloader bool // possible to use snapshots without calling Downloader Verify bool // verify snapshots on startup DownloaderAddr string + ChainName string } func (s BlocksFreezing) String() string { @@ -161,8 +162,8 @@ var ( FlagSnapStateStop = "snap.state.stop" ) -func NewSnapCfg(keepBlocks, produceE2, produceE3 bool) BlocksFreezing { - return BlocksFreezing{KeepBlocks: keepBlocks, ProduceE2: produceE2, ProduceE3: produceE3} +func NewSnapCfg(keepBlocks, produceE2, produceE3 bool, chainName string) BlocksFreezing { + return BlocksFreezing{KeepBlocks: keepBlocks, ProduceE2: produceE2, ProduceE3: produceE3, ChainName: chainName} } // Config contains configuration options for ETH protocol. diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index 5f0870502d2..c9e865c9c67 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -686,7 +686,7 @@ func pruneBlockSnapshots(ctx context.Context, cfg SnapshotsCfg, logger log.Logge return false, err } defer tx.Rollback() - // Prune snapshots if necessary (remove .segs or idx files appropriatelly) + // Prune snapshots if necessary (remove .segs or idx files appropriately) headNumber := cfg.blockReader.FrozenBlocks() executionProgress, err := stages.GetStageProgress(tx, stages.Execution) if err != nil { diff --git a/p2p/sentry/simulator/sentry_simulator.go b/p2p/sentry/simulator/sentry_simulator.go index 19e381b157e..52d0b34f0b4 100644 --- a/p2p/sentry/simulator/sentry_simulator.go +++ b/p2p/sentry/simulator/sentry_simulator.go @@ -455,7 +455,7 @@ func (s *server) getHeaderByHash(ctx context.Context, hash common.Hash) (*corety return s.blockReader.HeaderByHash(ctx, nil, hash) } -func (s *server) downloadHeaders(ctx context.Context, header *freezeblocks.Segment) error { +func (s *server) downloadHeaders(ctx context.Context, header *freezeblocks.VisibleSegment) error { fileName := snaptype.SegmentFileName(0, header.From(), header.To(), coresnaptype.Enums.Headers) session := sync.NewTorrentSession(s.downloader, s.chain) diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 745a63ceb35..ac1a7fee935 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -485,7 +485,8 @@ func doIntegrity(cliCtx *cli.Context) error { chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer chainDB.Close() - cfg := ethconfig.NewSnapCfg(false, true, true) + chainConfig := fromdb.ChainConfig(chainDB) + cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName) from := cliCtx.Uint64(SnapshotFromFlag.Name) _, _, _, blockRetire, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) @@ -966,8 +967,8 @@ func doIndicesCommand(cliCtx *cli.Context, dirs datadir.Dirs) error { return err } - cfg := ethconfig.NewSnapCfg(false, true, true) chainConfig := fromdb.ChainConfig(chainDB) + cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName) from := cliCtx.Uint64(SnapshotFromFlag.Name) _, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) @@ -999,7 +1000,9 @@ func doLS(cliCtx *cli.Context, dirs datadir.Dirs) error { chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer chainDB.Close() - cfg := ethconfig.NewSnapCfg(false, true, true) + + chainConfig := fromdb.ChainConfig(chainDB) + cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName) from := cliCtx.Uint64(SnapshotFromFlag.Name) blockSnaps, borSnaps, caplinSnaps, _, agg, clean, err := openSnaps(ctx, cfg, dirs, from, chainDB, logger) if err != nil { @@ -1214,8 +1217,8 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error { db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer db.Close() - cfg := ethconfig.NewSnapCfg(false, true, true) - + chainConfig := fromdb.ChainConfig(db) + cfg := ethconfig.NewSnapCfg(false, true, true, chainConfig.ChainName) blockSnaps, _, caplinSnaps, br, agg, clean, err := openSnaps(ctx, cfg, dirs, from, db, logger) if err != nil { return err @@ -1227,7 +1230,6 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error { agg.SetMergeWorkers(estimate.AlmostAllCPUs()) agg.SetCompressWorkers(estimate.CompressSnapshot.Workers()) - chainConfig := fromdb.ChainConfig(db) if err := br.BuildMissedIndicesIfNeed(ctx, "retire", nil, chainConfig); err != nil { return err } diff --git a/turbo/snapshotsync/freezeblocks/beacon_block_reader.go b/turbo/snapshotsync/freezeblocks/beacon_block_reader.go index d6ba0d9db4a..19b152e1f16 100644 --- a/turbo/snapshotsync/freezeblocks/beacon_block_reader.go +++ b/turbo/snapshotsync/freezeblocks/beacon_block_reader.go @@ -99,7 +99,7 @@ func (r *beaconSnapshotReader) ReadBlockBySlot(ctx context.Context, tx kv.Tx, sl return nil, nil } - idxSlot := seg.Index() + idxSlot := seg.src.Index() if idxSlot == nil { return nil, nil @@ -109,7 +109,7 @@ func (r *beaconSnapshotReader) ReadBlockBySlot(ctx context.Context, tx kv.Tx, sl } blockOffset := idxSlot.OrdinalLookup(slot - idxSlot.BaseDataID()) - gg := seg.MakeGetter() + gg := seg.src.MakeGetter() gg.Reset(blockOffset) if !gg.HasNext() { return nil, nil @@ -159,7 +159,7 @@ func (r *beaconSnapshotReader) ReadBlindedBlockBySlot(ctx context.Context, tx kv return nil, nil } - idxSlot := seg.Index() + idxSlot := seg.src.Index() if idxSlot == nil { return nil, nil @@ -169,7 +169,7 @@ func (r *beaconSnapshotReader) ReadBlindedBlockBySlot(ctx context.Context, tx kv } blockOffset := idxSlot.OrdinalLookup(slot - idxSlot.BaseDataID()) - gg := seg.MakeGetter() + gg := seg.src.MakeGetter() gg.Reset(blockOffset) if !gg.HasNext() { return nil, nil @@ -240,7 +240,7 @@ func (r *beaconSnapshotReader) ReadBlockByRoot(ctx context.Context, tx kv.Tx, ro return nil, nil } - idxSlot := seg.Index() + idxSlot := seg.src.Index() if idxSlot == nil { return nil, nil @@ -250,7 +250,7 @@ func (r *beaconSnapshotReader) ReadBlockByRoot(ctx context.Context, tx kv.Tx, ro } blockOffset := idxSlot.OrdinalLookup(*slot - idxSlot.BaseDataID()) - gg := seg.MakeGetter() + gg := seg.src.MakeGetter() gg.Reset(blockOffset) if !gg.HasNext() { return nil, nil diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index d5a61d725d5..1774f1da947 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -510,15 +510,12 @@ func (r *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash commo return h, nil } - segments, release := r.sn.ViewType(coresnaptype.Headers) - defer release() + segmentRotx := r.sn.ViewType(coresnaptype.Headers) + defer segmentRotx.Close() buf := make([]byte, 128) + segments := segmentRotx.VisibleSegments for i := len(segments) - 1; i >= 0; i-- { - if segments[i].Index() == nil { - continue - } - h, err = r.headerFromSnapshotByHash(hash, segments[i], buf) if err != nil { return nil, err @@ -861,14 +858,13 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c return block, senders, nil } -func (r *BlockReader) headerFromSnapshot(blockHeight uint64, sn *Segment, buf []byte) (*types.Header, []byte, error) { - index := sn.Index() - +func (r *BlockReader) headerFromSnapshot(blockHeight uint64, sn *VisibleSegment, buf []byte) (*types.Header, []byte, error) { + index := sn.src.Index() if index == nil { return nil, buf, nil } headerOffset := index.OrdinalLookup(blockHeight - index.BaseDataID()) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(headerOffset) if !gg.HasNext() { return nil, buf, nil @@ -888,14 +884,14 @@ func (r *BlockReader) headerFromSnapshot(blockHeight uint64, sn *Segment, buf [] // because HeaderByHash method will search header in all snapshots - and may request header which doesn't exists // but because our indices are based on PerfectHashMap, no way to know is given key exists or not, only way - // to make sure is to fetch it and compare hash -func (r *BlockReader) headerFromSnapshotByHash(hash common.Hash, sn *Segment, buf []byte) (*types.Header, error) { +func (r *BlockReader) headerFromSnapshotByHash(hash common.Hash, sn *VisibleSegment, buf []byte) (*types.Header, error) { defer func() { if rec := recover(); rec != nil { panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.from, sn.to, dbg.Stack())) } }() // avoid crash because Erigon's core does many things - index := sn.Index() + index := sn.src.Index() if index == nil { return nil, nil @@ -907,7 +903,8 @@ func (r *BlockReader) headerFromSnapshotByHash(hash common.Hash, sn *Segment, bu return nil, nil } headerOffset := index.OrdinalLookup(localID) - gg := sn.MakeGetter() + + gg := sn.src.MakeGetter() gg.Reset(headerOffset) if !gg.HasNext() { return nil, nil @@ -927,7 +924,7 @@ func (r *BlockReader) headerFromSnapshotByHash(hash common.Hash, sn *Segment, bu return h, nil } -func (r *BlockReader) bodyFromSnapshot(blockHeight uint64, sn *Segment, buf []byte) (*types.Body, uint64, uint32, []byte, error) { +func (r *BlockReader) bodyFromSnapshot(blockHeight uint64, sn *VisibleSegment, buf []byte) (*types.Body, uint64, uint32, []byte, error) { b, buf, err := r.bodyForStorageFromSnapshot(blockHeight, sn, buf) if err != nil { return nil, 0, 0, buf, err @@ -946,14 +943,14 @@ func (r *BlockReader) bodyFromSnapshot(blockHeight uint64, sn *Segment, buf []by return body, b.BaseTxnID.First(), txCount, buf, nil // empty txs in the beginning and end of block } -func (r *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *Segment, buf []byte) (*types.BodyForStorage, []byte, error) { +func (r *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *VisibleSegment, buf []byte) (*types.BodyForStorage, []byte, error) { defer func() { if rec := recover(); rec != nil { panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, sn.from, sn.to, dbg.Stack())) } }() // avoid crash because Erigon's core does many things - index := sn.Index() + index := sn.src.Index() if index == nil { return nil, buf, nil @@ -961,7 +958,7 @@ func (r *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *Segment bodyOffset := index.OrdinalLookup(blockHeight - index.BaseDataID()) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(bodyOffset) if !gg.HasNext() { return nil, buf, nil @@ -979,20 +976,20 @@ func (r *BlockReader) bodyForStorageFromSnapshot(blockHeight uint64, sn *Segment return b, buf, nil } -func (r *BlockReader) txsFromSnapshot(baseTxnID uint64, txCount uint32, txsSeg *Segment, buf []byte) (txs []types.Transaction, senders []common.Address, err error) { +func (r *BlockReader) txsFromSnapshot(baseTxnID uint64, txCount uint32, txsSeg *VisibleSegment, buf []byte) (txs []types.Transaction, senders []common.Address, err error) { defer func() { if rec := recover(); rec != nil { panic(fmt.Errorf("%+v, snapshot: %d-%d, trace: %s", rec, txsSeg.from, txsSeg.to, dbg.Stack())) } }() // avoid crash because Erigon's core does many things - idxTxnHash := txsSeg.Index(coresnaptype.Indexes.TxnHash) + idxTxnHash := txsSeg.src.Index(coresnaptype.Indexes.TxnHash) if idxTxnHash == nil { return nil, nil, nil } if baseTxnID < idxTxnHash.BaseDataID() { - return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", baseTxnID, idxTxnHash.BaseDataID(), txsSeg.FilePath()) + return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", baseTxnID, idxTxnHash.BaseDataID(), txsSeg.src.FileName()) } txs = make([]types.Transaction, txCount) @@ -1001,7 +998,10 @@ func (r *BlockReader) txsFromSnapshot(baseTxnID uint64, txCount uint32, txsSeg * return txs, senders, nil } txnOffset := idxTxnHash.OrdinalLookup(baseTxnID - idxTxnHash.BaseDataID()) - gg := txsSeg.MakeGetter() + if txsSeg.src == nil { + return nil, nil, nil + } + gg := txsSeg.src.MakeGetter() gg.Reset(txnOffset) for i := uint32(0); i < txCount; i++ { if !gg.HasNext() { @@ -1009,7 +1009,7 @@ func (r *BlockReader) txsFromSnapshot(baseTxnID uint64, txCount uint32, txsSeg * } buf, _ = gg.Next(buf[:0]) if len(buf) < 1+20 { - return nil, nil, fmt.Errorf("segment %s has too short record: len(buf)=%d < 21", txsSeg.FilePath(), len(buf)) + return nil, nil, fmt.Errorf("segment %s has too short record: len(buf)=%d < 21", txsSeg.src.FileName(), len(buf)) } senders[i].SetBytes(buf[1 : 1+20]) txRlp := buf[1+20:] @@ -1023,11 +1023,11 @@ func (r *BlockReader) txsFromSnapshot(baseTxnID uint64, txCount uint32, txsSeg * return txs, senders, nil } -func (r *BlockReader) txnByID(txnID uint64, sn *Segment, buf []byte) (txn types.Transaction, err error) { - idxTxnHash := sn.Index(coresnaptype.Indexes.TxnHash) +func (r *BlockReader) txnByID(txnID uint64, sn *VisibleSegment, buf []byte) (txn types.Transaction, err error) { + idxTxnHash := sn.src.Index(coresnaptype.Indexes.TxnHash) offset := idxTxnHash.OrdinalLookup(txnID - idxTxnHash.BaseDataID()) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(offset) if !gg.HasNext() { return nil, nil @@ -1043,12 +1043,12 @@ func (r *BlockReader) txnByID(txnID uint64, sn *Segment, buf []byte) (txn types. return } -func (r *BlockReader) txnByHash(txnHash common.Hash, segments []*Segment, buf []byte) (types.Transaction, uint64, bool, error) { +func (r *BlockReader) txnByHash(txnHash common.Hash, segments []*VisibleSegment, buf []byte) (types.Transaction, uint64, bool, error) { for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] - idxTxnHash := sn.Index(coresnaptype.Indexes.TxnHash) - idxTxnHash2BlockNum := sn.Index(coresnaptype.Indexes.TxnHash2BlockNum) + idxTxnHash := sn.src.Index(coresnaptype.Indexes.TxnHash) + idxTxnHash2BlockNum := sn.src.Index(coresnaptype.Indexes.TxnHash2BlockNum) if idxTxnHash == nil || idxTxnHash2BlockNum == nil { continue @@ -1060,7 +1060,7 @@ func (r *BlockReader) txnByHash(txnHash common.Hash, segments []*Segment, buf [] continue } offset := idxTxnHash.OrdinalLookup(txnId) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(offset) // first byte txnHash check - reducing false-positives 256 times. Allows don't store and don't calculate full hash of entity - when checking many snapshots. if !gg.MatchPrefix([]byte{txnHash[0]}) { @@ -1146,9 +1146,9 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common. return *n, true, nil } - txns, release := r.sn.ViewType(coresnaptype.Transactions) - defer release() - _, blockNum, ok, err := r.txnByHash(txnHash, txns, nil) + txns := r.sn.ViewType(coresnaptype.Transactions) + defer txns.Close() + _, blockNum, ok, err := r.txnByHash(txnHash, txns.VisibleSegments, nil) if err != nil { return 0, false, err } @@ -1157,13 +1157,13 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common. } func (r *BlockReader) FirstTxnNumNotInSnapshots() uint64 { - sn, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, r.sn.BlocksAvailable()) + sn, ok, close := r.sn.ViewSingleFile(coresnaptype.Transactions, r.sn.BlocksAvailable()) if !ok { return 0 } - defer release() + defer close() - lastTxnID := sn.Index(coresnaptype.Indexes.TxnHash).BaseDataID() + uint64(sn.Count()) + lastTxnID := sn.src.Index(coresnaptype.Indexes.TxnHash).BaseDataID() + uint64(sn.src.Count()) return lastTxnID } @@ -1172,10 +1172,10 @@ func (r *BlockReader) IterateFrozenBodies(f func(blockNum, baseTxNum, txCount ui defer view.Close() for _, sn := range view.Bodies() { sn := sn - defer sn.EnableReadAhead().DisableReadAhead() + defer sn.src.EnableReadAhead().DisableReadAhead() var buf []byte - g := sn.MakeGetter() + g := sn.src.MakeGetter() blockNum := sn.from var b types.BodyForStorage for g.HasNext() { @@ -1199,21 +1199,24 @@ func (r *BlockReader) IntegrityTxnID(failFast bool) error { var expectedFirstTxnID uint64 for _, snb := range view.Bodies() { - firstBlockNum := snb.Index().BaseDataID() + if snb.src == nil { + continue + } + firstBlockNum := snb.src.Index().BaseDataID() sn, _ := view.TxsSegment(firstBlockNum) b, _, err := r.bodyForStorageFromSnapshot(firstBlockNum, snb, nil) if err != nil { return err } if b.BaseTxnID.U64() != expectedFirstTxnID { - err := fmt.Errorf("[integrity] IntegrityTxnID: bn=%d, baseID=%d, cnt=%d, expectedFirstTxnID=%d", firstBlockNum, b.BaseTxnID, sn.Count(), expectedFirstTxnID) + err := fmt.Errorf("[integrity] IntegrityTxnID: bn=%d, baseID=%d, cnt=%d, expectedFirstTxnID=%d", firstBlockNum, b.BaseTxnID, sn.src.Count(), expectedFirstTxnID) if failFast { return err } else { log.Error(err.Error()) } } - expectedFirstTxnID = expectedFirstTxnID + uint64(sn.Count()) + expectedFirstTxnID = expectedFirstTxnID + uint64(sn.src.Count()) } return nil } @@ -1327,10 +1330,10 @@ func (r *BlockReader) EventLookup(ctx context.Context, tx kv.Getter, txnHash com return 0, false, nil } - segs, release := r.borSn.ViewType(borsnaptype.BorEvents) - defer release() + segs := r.borSn.ViewType(borsnaptype.BorEvents) + defer segs.Close() - blockNum, ok, err := r.borBlockByEventHash(txnHash, segs, nil) + blockNum, ok, err := r.borBlockByEventHash(txnHash, segs.VisibleSegments, nil) if err != nil { return 0, false, err } @@ -1340,10 +1343,10 @@ func (r *BlockReader) EventLookup(ctx context.Context, tx kv.Getter, txnHash com return blockNum, true, nil } -func (r *BlockReader) borBlockByEventHash(txnHash common.Hash, segments []*Segment, buf []byte) (blockNum uint64, ok bool, err error) { +func (r *BlockReader) borBlockByEventHash(txnHash common.Hash, segments []*VisibleSegment, buf []byte) (blockNum uint64, ok bool, err error) { for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] - idxBorTxnHash := sn.Index() + idxBorTxnHash := sn.src.Index() if idxBorTxnHash == nil { continue @@ -1357,7 +1360,7 @@ func (r *BlockReader) borBlockByEventHash(txnHash common.Hash, segments []*Segme continue } offset := idxBorTxnHash.OrdinalLookup(blockEventId) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(offset) if !gg.MatchPrefix(txnHash[:]) { continue @@ -1386,11 +1389,11 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash) - segments, release := r.borSn.ViewType(borsnaptype.BorEvents) - defer release() + segments := r.borSn.ViewType(borsnaptype.BorEvents) + defer segments.Close() - for i := len(segments) - 1; i >= 0; i-- { - sn := segments[i] + for i := len(segments.VisibleSegments) - 1; i >= 0; i-- { + sn := segments.VisibleSegments[i] if sn.from > blockHeight { continue } @@ -1398,7 +1401,7 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common break } - idxBorTxnHash := sn.Index() + idxBorTxnHash := sn.src.Index() if idxBorTxnHash == nil { continue @@ -1409,7 +1412,7 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common reader := recsplit.NewIndexReader(idxBorTxnHash) blockEventId, found := reader.Lookup(borTxHash[:]) if !found { - return 0, fmt.Errorf("borTxHash %x not found in snapshot %s", borTxHash, sn.FilePath()) + return 0, fmt.Errorf("borTxHash %x not found in snapshot %s", borTxHash, sn.src.FilePath()) } return idxBorTxnHash.BaseDataID() + blockEventId, nil } @@ -1464,13 +1467,13 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H } borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash) - segments, release := r.borSn.ViewType(borsnaptype.BorEvents) - defer release() + segments := r.borSn.ViewType(borsnaptype.BorEvents) + defer segments.Close() var buf []byte result := []rlp.RawValue{} - for i := len(segments) - 1; i >= 0; i-- { - sn := segments[i] + for i := len(segments.VisibleSegments) - 1; i >= 0; i-- { + sn := segments.VisibleSegments[i] if sn.from > blockHeight { continue } @@ -1478,7 +1481,7 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H break } - idxBorTxnHash := sn.Index() + idxBorTxnHash := sn.src.Index() if idxBorTxnHash == nil { continue @@ -1492,7 +1495,7 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H continue } offset := idxBorTxnHash.OrdinalLookup(blockEventId) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(offset) for gg.HasNext() && gg.MatchPrefix(borTxHash[:]) { buf, _ = gg.Next(buf[:0]) @@ -1504,22 +1507,22 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H // EventsByIdFromSnapshot returns the list of records limited by time, or the number of records along with a bool value to signify if the records were limited by time func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, bool, error) { - segments, release := r.borSn.ViewType(borsnaptype.BorEvents) - defer release() + segments := r.borSn.ViewType(borsnaptype.BorEvents) + defer segments.Close() var buf []byte var result []*heimdall.EventRecordWithTime maxTime := false - for _, sn := range segments { - idxBorTxnHash := sn.Index() + for _, sn := range segments.VisibleSegments { + idxBorTxnHash := sn.src.Index() if idxBorTxnHash == nil || idxBorTxnHash.KeyCount() == 0 { continue } offset := idxBorTxnHash.OrdinalLookup(0) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(offset) for gg.HasNext() { buf, _ = gg.Next(buf[:0]) @@ -1581,19 +1584,20 @@ func (r *BlockReader) LastFrozenEventId() uint64 { return 0 } - segments, release := r.borSn.ViewType(borsnaptype.BorEvents) - defer release() + segments := r.borSn.ViewType(borsnaptype.BorEvents) + defer segments.Close() - if len(segments) == 0 { + if len(segments.VisibleSegments) == 0 { return 0 } // find the last segment which has a built index - var lastSegment *Segment - for i := len(segments) - 1; i >= 0; i-- { - if segments[i].Index() != nil { - gg := segments[i].MakeGetter() + var lastSegment *VisibleSegment + visibleSegments := segments.VisibleSegments + for i := len(visibleSegments) - 1; i >= 0; i-- { + if visibleSegments[i].src.Index() != nil { + gg := visibleSegments[i].src.MakeGetter() if gg.HasNext() { - lastSegment = segments[i] + lastSegment = visibleSegments[i] break } } @@ -1602,7 +1606,10 @@ func (r *BlockReader) LastFrozenEventId() uint64 { return 0 } var lastEventID uint64 - gg := lastSegment.MakeGetter() + if lastSegment.src == nil { + return 0 + } + gg := lastSegment.src.MakeGetter() var buf []byte for gg.HasNext() { buf, _ = gg.Next(buf[:0]) @@ -1616,17 +1623,18 @@ func (r *BlockReader) LastFrozenEventBlockNum() uint64 { return 0 } - segments, release := r.borSn.ViewType(borsnaptype.BorEvents) - defer release() + segmentsRotx := r.borSn.ViewType(borsnaptype.BorEvents) + defer segmentsRotx.Close() + segments := segmentsRotx.VisibleSegments if len(segments) == 0 { return 0 } // find the last segment which has a built index - var lastSegment *Segment + var lastSegment *VisibleSegment for i := len(segments) - 1; i >= 0; i-- { - if segments[i].Index() != nil { - gg := segments[i].MakeGetter() + if segments[i].src.Index() != nil { + gg := segments[i].src.MakeGetter() if gg.HasNext() { lastSegment = segments[i] break @@ -1638,7 +1646,7 @@ func (r *BlockReader) LastFrozenEventBlockNum() uint64 { } var lastBlockNum uint64 var buf []byte - gg := lastSegment.MakeGetter() + gg := lastSegment.src.MakeGetter() for gg.HasNext() { buf, _ = gg.Next(buf[:0]) lastBlockNum = binary.BigEndian.Uint64(buf[length.Hash : length.Hash+length.BlockNum]) @@ -1677,17 +1685,17 @@ func (r *BlockReader) LastFrozenSpanId() uint64 { return 0 } - segments, release := r.borSn.ViewType(borsnaptype.BorSpans) - defer release() + segments := r.borSn.ViewType(borsnaptype.BorSpans) + defer segments.Close() - if len(segments) == 0 { + if len(segments.VisibleSegments) == 0 { return 0 } // find the last segment which has a built index - var lastSegment *Segment - for i := len(segments) - 1; i >= 0; i-- { - if segments[i].Index() != nil { - lastSegment = segments[i] + var lastSegment *VisibleSegment + for i := len(segments.VisibleSegments) - 1; i >= 0; i-- { + if segments.VisibleSegments[i].src.Index() != nil { + lastSegment = segments.VisibleSegments[i] break } } @@ -1721,12 +1729,13 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] } return common.Copy(v), nil } - segments, release := r.borSn.ViewType(borsnaptype.BorSpans) - defer release() + segmentsRotx := r.borSn.ViewType(borsnaptype.BorSpans) + defer segmentsRotx.Close() + segments := segmentsRotx.VisibleSegments for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] - idx := sn.Index() + idx := sn.src.Index() if idx == nil { continue @@ -1743,7 +1752,7 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] continue } offset := idx.OrdinalLookup(spanId - idx.BaseDataID()) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(offset) result, _ := gg.Next(nil) return common.Copy(result), nil @@ -1825,19 +1834,20 @@ func (r *BlockReader) Checkpoint(ctx context.Context, tx kv.Getter, checkpointId return common.Copy(v), nil } - segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints) - defer release() + segmentsRotx := r.borSn.ViewType(borsnaptype.BorCheckpoints) + defer segmentsRotx.Close() + segments := segmentsRotx.VisibleSegments for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] - index := sn.Index() + index := sn.src.Index() if index == nil || index.KeyCount() == 0 || checkpointId < index.BaseDataID() { continue } offset := index.OrdinalLookup(checkpointId - index.BaseDataID()) - gg := sn.MakeGetter() + gg := sn.src.MakeGetter() gg.Reset(offset) result, _ := gg.Next(nil) return common.Copy(result), nil @@ -1851,15 +1861,17 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 { return 0 } - segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints) - defer release() + segmentsRotx := r.borSn.ViewType(borsnaptype.BorCheckpoints) + defer segmentsRotx.Close() + + segments := segmentsRotx.VisibleSegments if len(segments) == 0 { return 0 } // find the last segment which has a built index - var lastSegment *Segment + var lastSegment *VisibleSegment for i := len(segments) - 1; i >= 0; i-- { - if segments[i].Index() != nil { + if segments[i].src.Index() != nil { lastSegment = segments[i] break } @@ -1869,14 +1881,14 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 { return 0 } - index := lastSegment.Index() + index := lastSegment.src.Index() return index.BaseDataID() + index.KeyCount() - 1 } // ---- Data Integrity part ---- -func (r *BlockReader) ensureHeaderNumber(n uint64, seg *Segment) error { +func (r *BlockReader) ensureHeaderNumber(n uint64, seg *VisibleSegment) error { h, _, err := r.headerFromSnapshot(n, seg, nil) if err != nil { return err diff --git a/turbo/snapshotsync/freezeblocks/block_reader_test.go b/turbo/snapshotsync/freezeblocks/block_reader_test.go index 368c4333422..7d2aaca6f5e 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader_test.go +++ b/turbo/snapshotsync/freezeblocks/block_reader_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/erigontech/erigon-lib/chain/networkname" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/common/length" @@ -43,7 +44,7 @@ func TestBlockReaderLastFrozenSpanIdWhenSegmentFilesArePresent(t *testing.T) { dir := t.TempDir() createTestBorEventSegmentFile(t, 0, 500_000, 132, dir, logger) createTestSegmentFile(t, 0, 500_000, borsnaptype.Enums.BorSpans, dir, 1, logger) - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err := borRoSnapshots.ReopenFolder() require.NoError(t, err) @@ -57,7 +58,7 @@ func TestBlockReaderLastFrozenSpanIdWhenSegmentFilesAreNotPresent(t *testing.T) logger := testlog.Logger(t, log.LvlInfo) dir := t.TempDir() - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err := borRoSnapshots.ReopenFolder() require.NoError(t, err) @@ -81,7 +82,7 @@ func TestBlockReaderLastFrozenSpanIdReturnsLastSegWithIdx(t *testing.T) { idxFileToDelete := filepath.Join(dir, snaptype.IdxFileName(1, 1_000_000, 1_500_000, borsnaptype.BorSpans.Name())) err := os.Remove(idxFileToDelete) require.NoError(t, err) - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err = borRoSnapshots.ReopenFolder() require.NoError(t, err) @@ -111,7 +112,7 @@ func TestBlockReaderLastFrozenSpanIdReturnsZeroWhenAllSegmentsDoNotHaveIdx(t *te idxFileToDelete = filepath.Join(dir, snaptype.IdxFileName(1, 1_000_000, 1_500_000, borsnaptype.BorSpans.Name())) err = os.Remove(idxFileToDelete) require.NoError(t, err) - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err = borRoSnapshots.ReopenFolder() require.NoError(t, err) @@ -127,7 +128,7 @@ func TestBlockReaderLastFrozenEventIdWhenSegmentFilesArePresent(t *testing.T) { dir := t.TempDir() createTestBorEventSegmentFile(t, 0, 500_000, 132, dir, logger) createTestSegmentFile(t, 0, 500_000, borsnaptype.Enums.BorSpans, dir, 1, logger) - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err := borRoSnapshots.ReopenFolder() require.NoError(t, err) @@ -141,7 +142,7 @@ func TestBlockReaderLastFrozenEventIdWhenSegmentFilesAreNotPresent(t *testing.T) logger := testlog.Logger(t, log.LvlInfo) dir := t.TempDir() - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err := borRoSnapshots.ReopenFolder() require.NoError(t, err) @@ -165,7 +166,7 @@ func TestBlockReaderLastFrozenEventIdReturnsLastSegWithIdx(t *testing.T) { idxFileToDelete := filepath.Join(dir, snaptype.IdxFileName(1, 1_000_000, 1_500_000, borsnaptype.BorEvents.Name())) err := os.Remove(idxFileToDelete) require.NoError(t, err) - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err = borRoSnapshots.ReopenFolder() require.NoError(t, err) @@ -195,7 +196,7 @@ func TestBlockReaderLastFrozenEventIdReturnsZeroWhenAllSegmentsDoNotHaveIdx(t *t idxFileToDelete = filepath.Join(dir, snaptype.IdxFileName(1, 1_000_000, 1_500_000, borsnaptype.BorEvents.Name())) err = os.Remove(idxFileToDelete) require.NoError(t, err) - borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + borRoSnapshots := NewBorRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.BorMainnetChainName}, dir, 0, logger) defer borRoSnapshots.Close() err = borRoSnapshots.ReopenFolder() require.NoError(t, err) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 60970b6bd48..27c01bc926c 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -67,6 +67,138 @@ import ( "github.com/erigontech/erigon/turbo/silkworm" ) +type SortedRange interface { + GetRange() (from, to uint64) + GetType() snaptype.Type +} + +// noOverlaps - keep largest ranges and avoid overlap +func noOverlaps[T SortedRange](in []T) (res []T) { + for i := 0; i < len(in); i++ { + r := in[i] + iFrom, iTo := r.GetRange() + if iFrom == iTo { + continue + } + for j := i + 1; j < len(in); j++ { + r2 := in[j] + jFrom, jTo := r2.GetRange() + if jFrom == jTo { + continue + } + if jFrom > iFrom { + break + } + r = r2 + i++ + } + res = append(res, r) + } + return res +} + +func noGaps[T SortedRange](in []T) (out []T, missingRanges []Range) { + if len(in) == 0 { + return nil, nil + } + prevTo, _ := in[0].GetRange() + for _, f := range in { + from, to := f.GetRange() + if to <= prevTo { + continue + } + if from != prevTo { // no gaps + missingRanges = append(missingRanges, Range{prevTo, from}) + continue + } + prevTo = to + out = append(out, f) + } + return out, missingRanges +} + +func findOverlaps[T SortedRange](in []T) (res []T, overlapped []T) { + for i := 0; i < len(in); i++ { + f := in[i] + iFrom, iTo := f.GetRange() + if iFrom == iTo { + overlapped = append(overlapped, f) + continue + } + + for j := i + 1; j < len(in); i, j = i+1, j+1 { // if there is file with larger range - use it instead + f2 := in[j] + jFrom, jTo := f2.GetRange() + + if f.GetType().Enum() != f2.GetType().Enum() { + break + } + if jFrom == jTo { + overlapped = append(overlapped, f2) + continue + } + if jFrom > iFrom && jTo > iTo { + break + } + + if iTo >= jTo && iFrom <= jFrom { + overlapped = append(overlapped, f2) + continue + } + if i < len(in)-1 && (jTo >= iTo && jFrom <= iFrom) { + overlapped = append(overlapped, f) + } + f = f2 + iFrom, iTo = f.GetRange() + } + res = append(res, f) + } + return res, overlapped +} + +func FindOverlaps(in []snaptype.FileInfo) (res []snaptype.FileInfo, overlapped []snaptype.FileInfo) { + for i := 0; i < len(in); i++ { + f := in[i] + + if f.From == f.To { + overlapped = append(overlapped, f) + continue + } + + for j := i + 1; j < len(in); i, j = i+1, j+1 { // if there is file with larger range - use it instead + f2 := in[j] + + if f.Type.Enum() != f2.Type.Enum() { + break + } + + if f2.From == f2.To { + overlapped = append(overlapped, f2) + continue + } + + if f2.From > f.From && f2.To > f.To { + break + } + + if f.To >= f2.To && f.From <= f2.From { + overlapped = append(overlapped, f2) + continue + } + + if i < len(in)-1 && (f2.To >= f.To && f2.From <= f.From) { + overlapped = append(overlapped, f) + } + + f = f2 + } + + res = append(res, f) + } + + return res, overlapped +} + type Range struct { from, to uint64 } @@ -80,23 +212,44 @@ func (r Ranges) String() string { return fmt.Sprintf("%d", r) } -type Segment struct { +type DirtySegment struct { Range *seg.Decompressor indexes []*recsplit.Index segType snaptype.Type version snaptype.Version + + frozen bool + refcount atomic.Int32 + + canDelete atomic.Bool } -func (s Segment) Type() snaptype.Type { +type VisibleSegment struct { + Range + segType snaptype.Type + src *DirtySegment +} + +func DirtySegmentLess(i, j *DirtySegment) bool { + if i.from != j.from { + return i.from < j.from + } + if i.to != j.to { + return i.to < j.to + } + return int(i.version) < int(j.version) +} + +func (s *DirtySegment) Type() snaptype.Type { return s.segType } -func (s Segment) Version() snaptype.Version { +func (s *DirtySegment) Version() snaptype.Version { return s.version } -func (s Segment) Index(index ...snaptype.Index) *recsplit.Index { +func (s *DirtySegment) Index(index ...snaptype.Index) *recsplit.Index { if len(index) == 0 { index = []snaptype.Index{{}} } @@ -108,29 +261,21 @@ func (s Segment) Index(index ...snaptype.Index) *recsplit.Index { return s.indexes[index[0].Offset] } -func (s Segment) IsIndexed() bool { - if len(s.indexes) < len(s.Type().Indexes()) { - return false - } - - for _, i := range s.indexes { - if i == nil { - return false - } - } - - return true -} - -func (s Segment) FileName() string { +func (s *DirtySegment) FileName() string { return s.Type().FileName(s.version, s.from, s.to) } -func (s Segment) FileInfo(dir string) snaptype.FileInfo { +func (s *DirtySegment) FileInfo(dir string) snaptype.FileInfo { return s.Type().FileInfo(dir, s.from, s.to) } -func (s *Segment) reopenSeg(dir string) (err error) { +func (s *DirtySegment) GetRange() (from, to uint64) { return s.from, s.to } +func (s *DirtySegment) GetType() snaptype.Type { return s.segType } +func (s *DirtySegment) isSubSetOf(j *DirtySegment) bool { + return (j.from <= s.from && s.to <= j.to) && (j.from != s.from || s.to != j.to) +} + +func (s *DirtySegment) reopenSeg(dir string) (err error) { s.closeSeg() s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) if err != nil { @@ -139,14 +284,14 @@ func (s *Segment) reopenSeg(dir string) (err error) { return nil } -func (s *Segment) closeSeg() { +func (s *DirtySegment) closeSeg() { if s.Decompressor != nil { s.Close() s.Decompressor = nil } } -func (s *Segment) closeIdx() { +func (s *DirtySegment) closeIdx() { for _, index := range s.indexes { index.Close() } @@ -154,14 +299,25 @@ func (s *Segment) closeIdx() { s.indexes = nil } -func (s *Segment) close() { +func (s *DirtySegment) close() { + if s != nil { + s.closeSeg() + s.closeIdx() + } +} + +func (s *DirtySegment) closeAndRemoveFiles() { if s != nil { + f := s.FilePath() s.closeSeg() s.closeIdx() + + snapDir := filepath.Dir(f) + removeOldFiles([]string{f}, snapDir) } } -func (s *Segment) openFiles() []string { +func (s *DirtySegment) openFiles() []string { files := make([]string, 0, len(s.indexes)+1) if s.IsOpen() { @@ -175,7 +331,7 @@ func (s *Segment) openFiles() []string { return files } -func (s *Segment) reopenIdxIfNeed(dir string, optimistic bool) (err error) { +func (s *DirtySegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) { if len(s.Type().IdxFileNames(s.version, s.from, s.to)) == 0 { return nil } @@ -195,7 +351,7 @@ func (s *Segment) reopenIdxIfNeed(dir string, optimistic bool) (err error) { return nil } -func (s *Segment) reopenIdx(dir string) (err error) { +func (s *DirtySegment) reopenIdx(dir string) (err error) { s.closeIdx() if s.Decompressor == nil { return nil @@ -214,19 +370,19 @@ func (s *Segment) reopenIdx(dir string) (err error) { return nil } -func (sn *Segment) mappedHeaderSnapshot() *silkworm.MappedHeaderSnapshot { +func (sn *DirtySegment) mappedHeaderSnapshot() *silkworm.MappedHeaderSnapshot { segmentRegion := silkworm.NewMemoryMappedRegion(sn.FilePath(), sn.DataHandle(), sn.Size()) idxRegion := silkworm.NewMemoryMappedRegion(sn.Index().FilePath(), sn.Index().DataHandle(), sn.Index().Size()) return silkworm.NewMappedHeaderSnapshot(segmentRegion, idxRegion) } -func (sn *Segment) mappedBodySnapshot() *silkworm.MappedBodySnapshot { +func (sn *DirtySegment) mappedBodySnapshot() *silkworm.MappedBodySnapshot { segmentRegion := silkworm.NewMemoryMappedRegion(sn.FilePath(), sn.DataHandle(), sn.Size()) idxRegion := silkworm.NewMemoryMappedRegion(sn.Index().FilePath(), sn.Index().DataHandle(), sn.Index().Size()) return silkworm.NewMappedBodySnapshot(segmentRegion, idxRegion) } -func (sn *Segment) mappedTxnSnapshot() *silkworm.MappedTxnSnapshot { +func (sn *DirtySegment) mappedTxnSnapshot() *silkworm.MappedTxnSnapshot { segmentRegion := silkworm.NewMemoryMappedRegion(sn.FilePath(), sn.DataHandle(), sn.Size()) idxTxnHash := sn.Index(coresnaptype.Indexes.TxnHash) idxTxnHashRegion := silkworm.NewMemoryMappedRegion(idxTxnHash.FilePath(), idxTxnHash.DataHandle(), idxTxnHash.Size()) @@ -249,20 +405,18 @@ func (sn *Segment) mappedTxnSnapshot() *silkworm.MappedTxnSnapshot { // transaction_hash -> block_number type segments struct { - lock sync.RWMutex - segments []*Segment + DirtySegments *btree.BTreeG[*DirtySegment] + VisibleSegments []*VisibleSegment + maxVisibleBlock atomic.Uint64 } -func (s *segments) View(f func(segments []*Segment) error) error { - s.lock.RLock() - defer s.lock.RUnlock() - return f(s.segments) +func (s *segments) View(f func(segments []*VisibleSegment) error) error { + return f(s.VisibleSegments) } -func (s *segments) Segment(blockNum uint64, f func(*Segment) error) (found bool, err error) { - s.lock.RLock() - defer s.lock.RUnlock() - for _, seg := range s.segments { +// no caller yet +func (s *segments) Segment(blockNum uint64, f func(*VisibleSegment) error) (found bool, err error) { + for _, seg := range s.VisibleSegments { if !(blockNum >= seg.from && blockNum < seg.to) { continue } @@ -271,11 +425,48 @@ func (s *segments) Segment(blockNum uint64, f func(*Segment) error) (found bool, return false, nil } +func (s *segments) BeginRotx() *segmentsRotx { + for _, seg := range s.VisibleSegments { + if !seg.src.frozen { + seg.src.refcount.Add(1) + } + } + return &segmentsRotx{segments: s, VisibleSegments: s.VisibleSegments} +} + +func (s *segmentsRotx) Close() { + if s.VisibleSegments == nil { + return + } + VisibleSegments := s.VisibleSegments + s.VisibleSegments = nil + + for i := range VisibleSegments { + src := VisibleSegments[i].src + if src == nil || src.frozen { + continue + } + refCnt := src.refcount.Add(-1) + if refCnt == 0 && src.canDelete.Load() { + src.closeAndRemoveFiles() + } + } +} + +type segmentsRotx struct { + segments *segments + VisibleSegments []*VisibleSegment +} + type RoSnapshots struct { indicesReady atomic.Bool segmentsReady atomic.Bool - types []snaptype.Type + types []snaptype.Type + + dirtySegmentsLock sync.RWMutex + visibleSegmentsLock sync.RWMutex + segments btree.Map[snaptype.Enum, *segments] dir string @@ -284,7 +475,7 @@ type RoSnapshots struct { cfg ethconfig.BlocksFreezing logger log.Logger - // allows for pruning segments - this is the min availible segment + // allows for pruning segments - this is the min available segment segmentsMin atomic.Uint64 } @@ -300,11 +491,14 @@ func NewRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, segmentsMin ui func newRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, types []snaptype.Type, segmentsMin uint64, logger log.Logger) *RoSnapshots { var segs btree.Map[snaptype.Enum, *segments] for _, snapType := range types { - segs.Set(snapType.Enum(), &segments{}) + segs.Set(snapType.Enum(), &segments{ + DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + }) } s := &RoSnapshots{dir: snapDir, cfg: cfg, segments: segs, logger: logger, types: types} s.segmentsMin.Store(segmentsMin) + s.recalcVisibleFiles() return s } @@ -351,11 +545,12 @@ func (s *RoSnapshots) HasType(in snaptype.Type) bool { // DisableReadAhead - usage: `defer d.EnableReadAhead().DisableReadAhead()`. Please don't use this funcs without `defer` to avoid leak. func (s *RoSnapshots) DisableReadAhead() *RoSnapshots { + v := s.View() + defer v.Close() + s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RLock() - defer value.lock.RUnlock() - for _, sn := range value.segments { - sn.DisableReadAhead() + for _, sn := range value.VisibleSegments { + sn.src.DisableReadAhead() } return true }) @@ -364,11 +559,12 @@ func (s *RoSnapshots) DisableReadAhead() *RoSnapshots { } func (s *RoSnapshots) EnableReadAhead() *RoSnapshots { + v := s.View() + defer v.Close() + s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RLock() - defer value.lock.RUnlock() - for _, sn := range value.segments { - sn.EnableReadAhead() + for _, sn := range value.VisibleSegments { + sn.src.EnableReadAhead() } return true }) @@ -377,62 +573,101 @@ func (s *RoSnapshots) EnableReadAhead() *RoSnapshots { } func (s *RoSnapshots) EnableMadvWillNeed() *RoSnapshots { + v := s.View() + defer v.Close() + s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RLock() - defer value.lock.RUnlock() - for _, sn := range value.segments { - sn.EnableMadvWillNeed() + for _, sn := range value.VisibleSegments { + sn.src.EnableMadvWillNeed() } return true }) return s } -// minimax of existing indices -func (s *RoSnapshots) idxAvailability() uint64 { - // Use-Cases: - // 1. developers can add new types in future. and users will not have files of this type - // 2. some types are network-specific. example: borevents exists only on Bor-consensus networks - // 3. user can manually remove 1 .idx file: `rm snapshots/v1-type1-0000-1000.idx` - // 4. user can manually remove all .idx files of given type: `rm snapshots/*type1*.idx` - // 5. file-types may have different height: 10 headers, 10 bodies, 9 trancasctions (for example if `kill -9` came during files building/merge). still need index all 3 types. - amount := 0 +func (s *RoSnapshots) recalcVisibleFiles() { + defer func() { + s.idxMax.Store(s.idxAvailability()) + s.indicesReady.Store(true) + }() + + s.visibleSegmentsLock.Lock() + defer s.visibleSegmentsLock.Unlock() + + var maxVisibleBlocks []uint64 s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - if len(value.segments) == 0 || !s.HasType(segtype.Type()) { + dirtySegments := value.DirtySegments + newVisibleSegments := make([]*VisibleSegment, 0, dirtySegments.Len()) + dirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, seg := range segs { + if seg.canDelete.Load() { + continue + } + if seg.Decompressor == nil { + continue + } + if seg.indexes == nil { + continue + } + for len(newVisibleSegments) > 0 && newVisibleSegments[len(newVisibleSegments)-1].src.isSubSetOf(seg) { + newVisibleSegments[len(newVisibleSegments)-1].src = nil + newVisibleSegments = newVisibleSegments[:len(newVisibleSegments)-1] + } + newVisibleSegments = append(newVisibleSegments, &VisibleSegment{ + Range: seg.Range, + segType: seg.segType, + src: seg, + }) + + } return true + }) + + value.VisibleSegments = newVisibleSegments + var to uint64 + if len(newVisibleSegments) > 0 { + to = newVisibleSegments[len(newVisibleSegments)-1].to - 1 } - amount++ + maxVisibleBlocks = append(maxVisibleBlocks, to) return true }) - maximums := make([]uint64, amount) - var i int + minMaxVisibleBlock := slices.Min(maxVisibleBlocks) s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - if len(value.segments) == 0 || !s.HasType(segtype.Type()) { - return true - } - - for _, seg := range value.segments { - if !seg.IsIndexed() { - break + if minMaxVisibleBlock == 0 { + value.VisibleSegments = []*VisibleSegment{} + } else { + for i, seg := range value.VisibleSegments { + if seg.to > minMaxVisibleBlock+1 { + value.VisibleSegments = value.VisibleSegments[:i] + break + } } - - maximums[i] = seg.to - 1 } - - i++ + value.maxVisibleBlock.Store(minMaxVisibleBlock) return true }) +} - if len(maximums) == 0 { - return 0 - } +// minimax of existing indices +func (s *RoSnapshots) idxAvailability() uint64 { + // Use-Cases: + // 1. developers can add new types in future. and users will not have files of this type + // 2. some types are network-specific. example: borevents exists only on Bor-consensus networks + // 3. user can manually remove 1 .idx file: `rm snapshots/v1-type1-0000-1000.idx` + // 4. user can manually remove all .idx files of given type: `rm snapshots/*type1*.idx` + // 5. file-types may have different height: 10 headers, 10 bodies, 9 transactions (for example if `kill -9` came during files building/merge). still need index all 3 types. - if len(maximums) != len(s.types) { - return 0 - } + var maxIdx uint64 + s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { + if !s.HasType(segtype.Type()) { + return true + } + maxIdx = value.maxVisibleBlock.Load() + return false // all types of segments have the same height. stop here + }) - return slices.Min(maximums) + return maxIdx } // OptimisticReopenWithDB - optimistically open snapshots (ignoring error), useful at App startup because: @@ -451,53 +686,50 @@ func (s *RoSnapshots) OptimisticReopenWithDB(db kv.RoDB) { } func (s *RoSnapshots) LS() { - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RLock() - defer value.lock.RUnlock() + view := s.View() + defer view.Close() - for _, seg := range value.segments { - if seg.Decompressor == nil { + view.VisibleSegments.Scan(func(segtype snaptype.Enum, value *segmentsRotx) bool { + for _, seg := range value.VisibleSegments { + if seg.src.Decompressor == nil { continue } - log.Info("[agg] ", "f", seg.Decompressor.FileName(), "words", seg.Decompressor.Count()) + log.Info("[snapshots] ", "f", seg.src.Decompressor.FileName(), "from", seg.from, "to", seg.to) } return true }) } func (s *RoSnapshots) Files() (list []string) { - maxBlockNumInFiles := s.BlocksAvailable() - - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RLock() - defer value.lock.RUnlock() + view := s.View() + defer view.Close() - for _, seg := range value.segments { - if seg.Decompressor == nil { - continue - } - if seg.from > maxBlockNumInFiles { - continue - } - list = append(list, seg.FileName()) + view.VisibleSegments.Scan(func(segtype snaptype.Enum, value *segmentsRotx) bool { + for _, seg := range value.VisibleSegments { + list = append(list, seg.src.FileName()) } return true }) - slices.Sort(list) - return list + return } func (s *RoSnapshots) OpenFiles() (list []string) { + s.dirtySegmentsLock.RLock() + defer s.dirtySegmentsLock.RUnlock() + log.Warn("[dbg] OpenFiles") defer log.Warn("[dbg] OpenFiles end") s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RLock() - defer value.lock.RUnlock() - - for _, seg := range value.segments { - list = append(list, seg.openFiles()...) - } + value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, seg := range segs { + if seg.Decompressor == nil { + continue + } + list = append(list, seg.FilePath()) + } + return true + }) return true }) @@ -506,8 +738,11 @@ func (s *RoSnapshots) OpenFiles() (list []string) { // ReopenList stops on optimistic=false, continue opening files on optimistic=true func (s *RoSnapshots) ReopenList(fileNames []string, optimistic bool) error { - s.lockSegments() - defer s.unlockSegments() + defer s.recalcVisibleFiles() + + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + s.closeWhatNotInList(fileNames) if err := s.rebuildSegments(fileNames, true, optimistic); err != nil { return err @@ -516,8 +751,11 @@ func (s *RoSnapshots) ReopenList(fileNames []string, optimistic bool) error { } func (s *RoSnapshots) InitSegments(fileNames []string) error { - s.lockSegments() - defer s.unlockSegments() + defer s.recalcVisibleFiles() + + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + s.closeWhatNotInList(fileNames) if err := s.rebuildSegments(fileNames, false, true); err != nil { return err @@ -525,20 +763,6 @@ func (s *RoSnapshots) InitSegments(fileNames []string) error { return nil } -func (s *RoSnapshots) lockSegments() { - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.Lock() - return true - }) -} - -func (s *RoSnapshots) unlockSegments() { - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.Unlock() - return true - }) -} - func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic bool) error { var segmentsMax uint64 var segmentsMaxSet bool @@ -554,27 +778,30 @@ func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic segtype, ok := s.segments.Get(f.Type.Enum()) if !ok { - segtype = &segments{} + segtype = &segments{ + DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + } s.segments.Set(f.Type.Enum(), segtype) - segtype.lock.Lock() // this will be unlocked by defer s.unlockSegments() above } - var sn *Segment + var sn *DirtySegment var exists bool - for _, sn2 := range segtype.segments { - if sn2.Decompressor == nil { // it's ok if some segment was not able to open - continue - } - - if fName == sn2.FileName() { - sn = sn2 - exists = true - break + segtype.DirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, sn2 := range segs { + if sn2.Decompressor == nil { // it's ok if some segment was not able to open + continue + } + if fName == sn2.FileName() { + sn = sn2 + exists = true + return false + } } - } + return true + }) if !exists { - sn = &Segment{segType: f.Type, version: f.Version, Range: Range{f.From, f.To}} + sn = &DirtySegment{segType: f.Type, version: f.Version, Range: Range{f.From, f.To}, frozen: snapcfg.Seedable(s.cfg.ChainName, f)} } if open { @@ -597,7 +824,7 @@ func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic if !exists { // it's possible to iterate over .seg file even if you don't have index // then make segment available even if index open may fail - segtype.segments = append(segtype.segments, sn) + segtype.DirtySegments.Set(sn) } if open { @@ -617,9 +844,6 @@ func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic s.segmentsMax.Store(segmentsMax) } s.segmentsReady.Store(true) - s.idxMax.Store(s.idxAvailability()) - s.indicesReady.Store(true) - return nil } @@ -632,6 +856,12 @@ func (s *RoSnapshots) Ranges() []Range { func (s *RoSnapshots) OptimisticalyReopenFolder() { _ = s.ReopenFolder() } func (s *RoSnapshots) OptimisticalyReopenWithDB(db kv.RoDB) { _ = s.ReopenWithDB(db) } func (s *RoSnapshots) ReopenFolder() error { + defer s.recalcVisibleFiles() + + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + + fmt.Printf("[dbg] types: %s\n", s.Types()) files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), s.Types(), false) if err != nil { return err @@ -642,10 +872,19 @@ func (s *RoSnapshots) ReopenFolder() error { _, fName := filepath.Split(f.Path) list = append(list, fName) } - return s.ReopenList(list, false) + s.closeWhatNotInList(list) + if err := s.rebuildSegments(list, true, false); err != nil { + return err + } + return nil } func (s *RoSnapshots) ReopenSegments(types []snaptype.Type, allowGaps bool) error { + defer s.recalcVisibleFiles() + + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), types, allowGaps) if err != nil { @@ -657,9 +896,6 @@ func (s *RoSnapshots) ReopenSegments(types []snaptype.Type, allowGaps bool) erro list = append(list, fName) } - s.lockSegments() - defer s.unlockSegments() - // don't need close already opened files if err := s.rebuildSegments(list, true, false); err != nil { return err } @@ -683,50 +919,46 @@ func (s *RoSnapshots) Close() { if s == nil { return } - s.lockSegments() - defer s.unlockSegments() + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + s.closeWhatNotInList(nil) + s.recalcVisibleFiles() } func (s *RoSnapshots) closeWhatNotInList(l []string) { + toClose := make(map[snaptype.Enum][]*DirtySegment, 0) s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - Segments: - for i, sn := range value.segments { - if sn.Decompressor == nil { - continue Segments - } - _, name := filepath.Split(sn.FilePath()) - for _, fName := range l { - if fName == name { - continue Segments + value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + + Loop1: + for _, seg := range segs { + for _, fName := range l { + if fName == seg.FileName() { + continue Loop1 + } } + if _, ok := toClose[seg.segType.Enum()]; !ok { + toClose[segtype] = make([]*DirtySegment, 0) + } + toClose[segtype] = append(toClose[segtype], seg) } - sn.close() - value.segments[i] = nil - } + + return true + }) return true }) - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - var i int - for i = 0; i < len(value.segments) && value.segments[i] != nil && value.segments[i].Decompressor != nil; i++ { + for segtype, delSegments := range toClose { + segs, _ := s.segments.Get(segtype) + for _, delSeg := range delSegments { + delSeg.close() + segs.DirtySegments.Delete(delSeg) } - tail := value.segments[i:] - value.segments = value.segments[:i] - for i = 0; i < len(tail); i++ { - if tail[i] != nil { - tail[i].close() - tail[i] = nil - } - } - return true - }) + } } func (s *RoSnapshots) removeOverlapsAfterMerge() error { - s.lockSegments() - defer s.unlockSegments() - list, err := snaptype.Segments(s.dir) if err != nil { @@ -778,44 +1010,51 @@ func (s *RoSnapshots) buildMissedIndicesIfNeed(ctx context.Context, logPrefix st } func (s *RoSnapshots) delete(fileName string) error { - v := s.View() - defer v.Close() + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() - _, fName := filepath.Split(fileName) var err error + var delSeg *DirtySegment + var dirtySegments *btree.BTreeG[*DirtySegment] + + _, fName := filepath.Split(fileName) s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - idxsToRemove := []int{} - for i, sn := range value.segments { - if sn.Decompressor == nil { - continue - } - if sn.segType.FileName(sn.version, sn.from, sn.to) != fName { - continue - } - files := sn.openFiles() - sn.close() - idxsToRemove = append(idxsToRemove, i) - for _, f := range files { - _ = os.Remove(f) + findDelSeg := false + value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, sn := range segs { + if sn.Decompressor == nil { + continue + } + if sn.segType.FileName(sn.version, sn.from, sn.to) != fName { + continue + } + sn.canDelete.Store(true) + if sn.refcount.Load() == 0 { + sn.closeAndRemoveFiles() + } + delSeg = sn + dirtySegments = value.DirtySegments + findDelSeg = false + return true } - } - for i := len(idxsToRemove) - 1; i >= 0; i-- { - value.segments = append(value.segments[:idxsToRemove[i]], value.segments[idxsToRemove[i]+1:]...) - } - return true + return true + }) + return !findDelSeg }) + dirtySegments.Delete(delSeg) return err } +// prune visible segments func (s *RoSnapshots) Delete(fileName string) error { if s == nil { return nil } + defer s.recalcVisibleFiles() if err := s.delete(fileName); err != nil { return fmt.Errorf("can't delete file: %w", err) } - return s.ReopenFolder() - + return nil } func (s *RoSnapshots) buildMissedIndices(logPrefix string, ctx context.Context, dirs datadir.Dirs, chainConfig *chain.Config, workers int, logger log.Logger) error { @@ -860,30 +1099,32 @@ func (s *RoSnapshots) buildMissedIndices(logPrefix string, ctx context.Context, failedIndexes := make(map[string]error, 0) s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - for _, segment := range value.segments { - info := segment.FileInfo(dir) - - if segtype.HasIndexFiles(info, logger) { - continue - } + value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, segment := range segs { + info := segment.FileInfo(dir) - segment.closeIdx() - - g.Go(func() error { - p := &background.Progress{} - ps.Add(p) - defer notifySegmentIndexingFinished(info.Name()) - defer ps.Delete(p) - if err := segtype.BuildIndexes(gCtx, info, chainConfig, tmpDir, p, log.LvlInfo, logger); err != nil { - // unsuccessful indexing should allow other indexing to finish - fmu.Lock() - failedIndexes[info.Name()] = err - fmu.Unlock() + if segtype.HasIndexFiles(info, logger) { + continue } - return nil - }) - } + segment.closeIdx() + + g.Go(func() error { + p := &background.Progress{} + ps.Add(p) + defer notifySegmentIndexingFinished(info.Name()) + defer ps.Delete(p) + if err := segtype.BuildIndexes(gCtx, info, chainConfig, tmpDir, p, log.LvlInfo, logger); err != nil { + // unsuccessful indexing should allow other indexing to finish + fmu.Lock() + failedIndexes[info.Name()] = err + fmu.Unlock() + } + return nil + }) + } + return true + }) return true }) @@ -918,7 +1159,7 @@ func (s *RoSnapshots) PrintDebug() { defer v.Close() s.segments.Scan(func(key snaptype.Enum, value *segments) bool { fmt.Println(" == [dbg] Snapshots,", key.String()) - for _, sn := range value.segments { + printDebug := func(sn *DirtySegment) { args := make([]any, 0, len(sn.Type().Indexes())+1) args = append(args, sn.from) for _, index := range sn.Type().Indexes() { @@ -926,47 +1167,41 @@ func (s *RoSnapshots) PrintDebug() { } fmt.Println(args...) } + value.DirtySegments.Scan(func(sn *DirtySegment) bool { + printDebug(sn) + return true + }) return true }) } func (s *RoSnapshots) AddSnapshotsToSilkworm(silkwormInstance *silkworm.Silkworm) error { + v := s.View() + defer v.Close() + + s.visibleSegmentsLock.RLock() + defer s.visibleSegmentsLock.RUnlock() + mappedHeaderSnapshots := make([]*silkworm.MappedHeaderSnapshot, 0) - if headers, ok := s.segments.Get(coresnaptype.Enums.Headers); ok { - err := headers.View(func(segments []*Segment) error { - for _, headerSegment := range segments { - mappedHeaderSnapshots = append(mappedHeaderSnapshots, headerSegment.mappedHeaderSnapshot()) - } - return nil - }) - if err != nil { - return err + if headers, ok := v.VisibleSegments.Get(coresnaptype.Enums.Headers); ok { + for _, headerSegment := range headers.VisibleSegments { + mappedHeaderSnapshots = append(mappedHeaderSnapshots, headerSegment.src.mappedHeaderSnapshot()) } } mappedBodySnapshots := make([]*silkworm.MappedBodySnapshot, 0) - if bodies, ok := s.segments.Get(coresnaptype.Enums.Bodies); ok { - err := bodies.View(func(segments []*Segment) error { - for _, bodySegment := range segments { - mappedBodySnapshots = append(mappedBodySnapshots, bodySegment.mappedBodySnapshot()) - } - return nil - }) - if err != nil { - return err + if bodies, ok := v.VisibleSegments.Get(coresnaptype.Enums.Bodies); ok { + for _, bodySegment := range bodies.VisibleSegments { + mappedBodySnapshots = append(mappedBodySnapshots, bodySegment.src.mappedBodySnapshot()) } + return nil + } mappedTxnSnapshots := make([]*silkworm.MappedTxnSnapshot, 0) - if txs, ok := s.segments.Get(coresnaptype.Enums.Transactions); ok { - err := txs.View(func(segments []*Segment) error { - for _, txnSegment := range segments { - mappedTxnSnapshots = append(mappedTxnSnapshots, txnSegment.mappedTxnSnapshot()) - } - return nil - }) - if err != nil { - return err + if txs, ok := v.VisibleSegments.Get(coresnaptype.Enums.Transactions); ok { + for _, txnSegment := range txs.VisibleSegments { + mappedTxnSnapshots = append(mappedTxnSnapshots, txnSegment.src.mappedTxnSnapshot()) } } @@ -1029,25 +1264,6 @@ func sendDiagnostics(startIndexingTime time.Time, indexPercent map[string]int, a }) } -func noGaps(in []snaptype.FileInfo) (out []snaptype.FileInfo, missingSnapshots []Range) { - if len(in) == 0 { - return nil, nil - } - prevTo := in[0].From - for _, f := range in { - if f.To <= prevTo { - continue - } - if f.From != prevTo { // no gaps - missingSnapshots = append(missingSnapshots, Range{prevTo, f.From}) - continue - } - prevTo = f.To - out = append(out, f) - } - return out, missingSnapshots -} - func typeOfSegmentsMustExist(dir string, in []snaptype.FileInfo, types []snaptype.Type) (res []snaptype.FileInfo) { MainLoop: for _, f := range in { @@ -1070,75 +1286,6 @@ MainLoop: return res } -// noOverlaps - keep largest ranges and avoid overlap -func noOverlaps(in []snaptype.FileInfo) (res []snaptype.FileInfo) { - for i := range in { - f := in[i] - if f.From == f.To { - continue - } - - for j := i + 1; j < len(in); j++ { // if there is file with larger range - use it instead - f2 := in[j] - if f2.From == f2.To { - continue - } - if f2.From > f.From { - break - } - f = f2 - i++ - } - - res = append(res, f) - } - - return res -} - -func findOverlaps(in []snaptype.FileInfo) (res []snaptype.FileInfo, overlapped []snaptype.FileInfo) { - for i := 0; i < len(in); i++ { - f := in[i] - - if f.From == f.To { - overlapped = append(overlapped, f) - continue - } - - for j := i + 1; j < len(in); i, j = i+1, j+1 { // if there is file with larger range - use it instead - f2 := in[j] - - if f.Type.Enum() != f2.Type.Enum() { - break - } - - if f2.From == f2.To { - overlapped = append(overlapped, f2) - continue - } - - if f2.From > f.From && f2.To > f.To { - break - } - - if f.To >= f2.To && f.From <= f2.From { - overlapped = append(overlapped, f2) - continue - } - - if i < len(in)-1 && (f2.To >= f.To && f2.From <= f.From) { - overlapped = append(overlapped, f) - } - - f = f2 - } - - res = append(res, f) - } - - return res, overlapped -} - func SegmentsCaplin(dir string, minBlock uint64) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { list, err := snaptype.Segments(dir) if err != nil { @@ -1462,7 +1609,6 @@ func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) (deleted int, e if err != nil { return deleted, err } - if canDeleteTo := CanDeleteTo(currentProgress, br.blockReader.FrozenBlocks()); canDeleteTo > 0 { br.logger.Debug("[snapshots] Prune Blocks", "to", canDeleteTo, "limit", limit) deletedBlocks, err := br.blockWriter.PruneBlocks(context.Background(), tx, canDeleteTo, limit) @@ -2002,8 +2148,8 @@ func ForEachHeader(ctx context.Context, s *RoSnapshots, walker func(header *type defer view.Close() for _, sn := range view.Headers() { - if err := sn.WithReadAhead(func() error { - g := sn.MakeGetter() + if err := sn.src.WithReadAhead(func() error { + g := sn.src.MakeGetter() for g.HasNext() { word, _ = g.Next(word[:0]) var header types.Header @@ -2066,22 +2212,17 @@ func (m *Merger) FindMergeRanges(currentRanges []Range, maxBlockNum uint64) (toM return toMerge } -func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (map[snaptype.Enum][]string, error) { - toMerge := map[snaptype.Enum][]string{} - - view := snapshots.View() - defer view.Close() - - for _, t := range snapshots.Types() { - toMerge[t.Enum()] = m.filesByRangeOfType(view, from, to, t) - } +func (m *Merger) filesByRange(v *View, from, to uint64) (map[snaptype.Enum][]*DirtySegment, error) { + toMerge := map[snaptype.Enum][]*DirtySegment{} + v.VisibleSegments.Scan(func(key snaptype.Enum, value *segmentsRotx) bool { + toMerge[key.Type().Enum()] = m.filesByRangeOfType(v, from, to, key.Type()) + return true + }) return toMerge, nil } -func (m *Merger) filesByRangeOfType(view *View, from, to uint64, snapshotType snaptype.Type) []string { - paths := make([]string, 0) - +func (m *Merger) filesByRangeOfType(view *View, from, to uint64, snapshotType snaptype.Type) (out []*DirtySegment) { for _, sn := range view.segments(snapshotType) { if sn.from < from { continue @@ -2090,13 +2231,12 @@ func (m *Merger) filesByRangeOfType(view *View, from, to uint64, snapshotType sn break } - paths = append(paths, sn.FilePath()) + out = append(out, sn.src) } - - return paths + return } -func (m *Merger) mergeSubSegment(ctx context.Context, sn snaptype.FileInfo, toMerge []string, snapDir string, doIndex bool, onMerge func(r Range) error) (err error) { +func (m *Merger) mergeSubSegment(ctx context.Context, v *View, sn snaptype.FileInfo, toMerge []*DirtySegment, snapDir string, doIndex bool, onMerge func(r Range) error) (newDirtySegment *DirtySegment, err error) { defer func() { if err == nil { if rec := recover(); rec != nil { @@ -2119,16 +2259,21 @@ func (m *Merger) mergeSubSegment(ctx context.Context, sn snaptype.FileInfo, toMe if len(toMerge) == 0 { return } - if err = m.merge(ctx, toMerge, sn.Path, nil); err != nil { + if newDirtySegment, err = m.merge(ctx, v, toMerge, sn, snapDir, nil); err != nil { err = fmt.Errorf("mergeByAppendSegments: %w", err) return } + // new way to build index if doIndex { p := &background.Progress{} if err = buildIdx(ctx, sn, m.chainConfig, m.tmpDir, p, m.lvl, m.logger); err != nil { return } + err = newDirtySegment.reopenIdx(snapDir) + if err != nil { + return + } } return @@ -2136,24 +2281,39 @@ func (m *Merger) mergeSubSegment(ctx context.Context, sn snaptype.FileInfo, toMe // Merge does merge segments in given ranges func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, snapTypes []snaptype.Type, mergeRanges []Range, snapDir string, doIndex bool, onMerge func(r Range) error, onDelete func(l []string) error) (err error) { + v := snapshots.View() + defer v.Close() + if len(mergeRanges) == 0 { return nil } logEvery := time.NewTicker(30 * time.Second) defer logEvery.Stop() + + in := make(map[snaptype.Enum][]*DirtySegment) + out := make(map[snaptype.Enum][]*DirtySegment) + for _, r := range mergeRanges { - toMerge, err := m.filesByRange(snapshots, r.from, r.to) + toMerge, err := m.filesByRange(v, r.from, r.to) if err != nil { return err } + for snapType, t := range toMerge { + if out[snapType] == nil { + out[snapType] = make([]*DirtySegment, 0, len(t)) + } + out[snapType] = append(out[snapType], t...) + } for _, t := range snapTypes { - if err := m.mergeSubSegment(ctx, t.FileInfo(snapDir, r.from, r.to), toMerge[t.Enum()], snapDir, doIndex, onMerge); err != nil { + newDirtySegment, err := m.mergeSubSegment(ctx, v, t.FileInfo(snapDir, r.from, r.to), toMerge[t.Enum()], snapDir, doIndex, onMerge) + if err != nil { return err } - } - if err := snapshots.ReopenFolder(); err != nil { - return fmt.Errorf("ReopenSegments: %w", err) + if in[t.Enum()] == nil { + in[t.Enum()] = make([]*DirtySegment, 0, len(toMerge[t.Enum()])) + } + in[t.Enum()] = append(in[t.Enum()], newDirtySegment) } snapshots.LogStat("merge") @@ -2168,26 +2328,81 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, snapTypes [] if len(toMerge[t.Enum()]) == 0 { continue } + toMergeFilePaths := make([]string, 0, len(toMerge[t.Enum()])) + for _, f := range toMerge[t.Enum()] { + toMergeFilePaths = append(toMergeFilePaths, f.FilePath()) + } if onDelete != nil { - if err := onDelete(toMerge[t.Enum()]); err != nil { + if err := onDelete(toMergeFilePaths); err != nil { return err } } - removeOldFiles(toMerge[t.Enum()], snapDir) } } + m.integrateMergedDirtyFiles(snapshots, in, out) m.logger.Log(m.lvl, "[snapshots] Merge done", "from", mergeRanges[0].from, "to", mergeRanges[0].to) return nil } -func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string, logEvery *time.Ticker) error { +func (m *Merger) integrateMergedDirtyFiles(snapshots *RoSnapshots, in, out map[snaptype.Enum][]*DirtySegment) { + defer snapshots.recalcVisibleFiles() + + snapshots.dirtySegmentsLock.Lock() + defer snapshots.dirtySegmentsLock.Unlock() + + // add new segments + for enum, newSegs := range in { + segs, b := snapshots.segments.Get(enum) + if !b { + m.logger.Error("[snapshots] Merge: segment not found", "enum", enum) + continue + } + dirtySegments := segs.DirtySegments + for _, newSeg := range newSegs { + dirtySegments.Set(newSeg) + if newSeg.frozen { + dirtySegments.Walk(func(items []*DirtySegment) bool { + for _, item := range items { + if item.frozen || item.to > newSeg.to { + continue + } + if out[enum] == nil { + out[enum] = make([]*DirtySegment, 0, 1) + } + out[enum] = append(out[enum], item) + } + return true + }) + } + } + } + + // delete old sub segments + for enum, delSegs := range out { + segs, b := snapshots.segments.Get(enum) + if !b { + m.logger.Error("[snapshots] Merge: segment not found", "enum", enum) + continue + } + dirtySegments := segs.DirtySegments + for _, delSeg := range delSegs { + dirtySegments.Delete(delSeg) + delSeg.canDelete.Store(true) + if delSeg.refcount.Load() == 0 { + delSeg.closeAndRemoveFiles() + } + } + } +} + +func (m *Merger) merge(ctx context.Context, v *View, toMerge []*DirtySegment, targetFile snaptype.FileInfo, snapDir string, logEvery *time.Ticker) (*DirtySegment, error) { var word = make([]byte, 0, 4096) var expectedTotal int cList := make([]*seg.Decompressor, len(toMerge)) for i, cFile := range toMerge { - d, err := seg.NewDecompressor(cFile) + d, err := seg.NewDecompressor(cFile.FilePath()) if err != nil { - return err + return nil, err } defer d.Close() cList[i] = d @@ -2196,17 +2411,15 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string, compresCfg := seg.DefaultCfg compresCfg.Workers = m.compressWorkers - f, err := seg.NewCompressor(ctx, "Snapshots merge", targetFile, m.tmpDir, compresCfg, log.LvlTrace, m.logger) + f, err := seg.NewCompressor(ctx, "Snapshots merge", targetFile.Path, m.tmpDir, compresCfg, log.LvlTrace, m.logger) if err != nil { - return err + return nil, err } defer f.Close() if m.noFsync { f.DisableFsync() } - - _, fName := filepath.Split(targetFile) - m.logger.Debug("[snapshots] merge", "file", fName) + m.logger.Debug("[snapshots] merge", "file", targetFile.Name()) for _, d := range cList { if err := d.WithReadAhead(func() error { @@ -2219,16 +2432,23 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string, } return nil }); err != nil { - return err + return nil, err } } if f.Count() != expectedTotal { - return fmt.Errorf("unexpected amount after segments merge. got: %d, expected: %d", f.Count(), expectedTotal) + return nil, fmt.Errorf("unexpected amount after segments merge. got: %d, expected: %d", f.Count(), expectedTotal) } if err = f.Compress(); err != nil { - return err + return nil, err } - return nil + sn := &DirtySegment{segType: targetFile.Type, version: targetFile.Version, Range: Range{targetFile.From, targetFile.To}, + frozen: snapcfg.Seedable(v.s.cfg.ChainName, targetFile)} + + err = sn.reopenSeg(snapDir) + if err != nil { + return nil, err + } + return sn, nil } func removeOldFiles(toDel []string, snapDir string) { @@ -2253,88 +2473,89 @@ func removeOldFiles(toDel []string, snapDir string) { } type View struct { - s *RoSnapshots - baseSegType snaptype.Type - closed bool + s *RoSnapshots + VisibleSegments btree.Map[snaptype.Enum, *segmentsRotx] + baseSegType snaptype.Type } func (s *RoSnapshots) View() *View { - v := &View{s: s, baseSegType: coresnaptype.Transactions} // Transactions is the last segment to be processed, so it's the most reliable. + s.visibleSegmentsLock.RLock() + defer s.visibleSegmentsLock.RUnlock() + + var sgs btree.Map[snaptype.Enum, *segmentsRotx] s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RLock() + sgs.Set(segtype, value.BeginRotx()) return true }) - return v + return &View{s: s, VisibleSegments: sgs, baseSegType: coresnaptype.Transactions} // Transactions is the last segment to be processed, so it's the most reliable. } func (v *View) Close() { - if v.closed { + if v == nil || v.s == nil { return } - v.closed = true - v.s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.lock.RUnlock() + v.s = nil + + v.VisibleSegments.Scan(func(segtype snaptype.Enum, value *segmentsRotx) bool { + value.Close() return true }) } var noop = func() {} -func (s *RoSnapshots) ViewType(t snaptype.Type) (segments []*Segment, release func()) { - segs, ok := s.segments.Get(t.Enum()) - if !ok { - return nil, noop - } +func (s *RoSnapshots) ViewType(t snaptype.Type) *segmentsRotx { + s.visibleSegmentsLock.RLock() + defer s.visibleSegmentsLock.RUnlock() - segs.lock.RLock() - var released = false - return segs.segments, func() { - if released { - return - } - segs.lock.RUnlock() - released = true + seg, ok := s.segments.Get(t.Enum()) + if !ok { + return nil } + return seg.BeginRotx() } -func (s *RoSnapshots) ViewSingleFile(t snaptype.Type, blockNum uint64) (segment *Segment, ok bool, release func()) { +func (s *RoSnapshots) ViewSingleFile(t snaptype.Type, blockNum uint64) (segment *VisibleSegment, ok bool, close func()) { + s.visibleSegmentsLock.RLock() + defer s.visibleSegmentsLock.RUnlock() + segs, ok := s.segments.Get(t.Enum()) if !ok { return nil, false, noop } - segs.lock.RLock() - var released = false - for _, seg := range segs.segments { + if blockNum > segs.maxVisibleBlock.Load() { + return nil, false, noop + } + + segmentRotx := segs.BeginRotx() + for _, seg := range segmentRotx.VisibleSegments { if !(blockNum >= seg.from && blockNum < seg.to) { continue } - return seg, true, func() { - if released { - return - } - segs.lock.RUnlock() - released = true - } + return seg, true, func() { segmentRotx.Close() } } - segs.lock.RUnlock() + segmentRotx.Close() return nil, false, noop } -func (v *View) segments(t snaptype.Type) []*Segment { +func (v *View) segments(t snaptype.Type) []*VisibleSegment { if s, ok := v.s.segments.Get(t.Enum()); ok { - return s.segments + return s.VisibleSegments } return nil } -func (v *View) Headers() []*Segment { return v.segments(coresnaptype.Headers) } -func (v *View) Bodies() []*Segment { return v.segments(coresnaptype.Bodies) } -func (v *View) Txs() []*Segment { return v.segments(coresnaptype.Transactions) } +func (v *View) Headers() []*VisibleSegment { return v.segments(coresnaptype.Headers) } +func (v *View) Bodies() []*VisibleSegment { return v.segments(coresnaptype.Bodies) } +func (v *View) Txs() []*VisibleSegment { return v.segments(coresnaptype.Transactions) } -func (v *View) Segment(t snaptype.Type, blockNum uint64) (*Segment, bool) { +func (v *View) Segment(t snaptype.Type, blockNum uint64) (*VisibleSegment, bool) { if s, ok := v.s.segments.Get(t.Enum()); ok { - for _, seg := range s.segments { + if blockNum > s.maxVisibleBlock.Load() { + return nil, false + } + for _, seg := range s.VisibleSegments { if !(blockNum >= seg.from && blockNum < seg.to) { continue } @@ -2352,14 +2573,14 @@ func (v *View) Ranges() (ranges []Range) { return ranges } -func (v *View) HeadersSegment(blockNum uint64) (*Segment, bool) { +func (v *View) HeadersSegment(blockNum uint64) (*VisibleSegment, bool) { return v.Segment(coresnaptype.Headers, blockNum) } -func (v *View) BodiesSegment(blockNum uint64) (*Segment, bool) { +func (v *View) BodiesSegment(blockNum uint64) (*VisibleSegment, bool) { return v.Segment(coresnaptype.Bodies, blockNum) } -func (v *View) TxsSegment(blockNum uint64) (*Segment, bool) { +func (v *View) TxsSegment(blockNum uint64) (*VisibleSegment, bool) { return v.Segment(coresnaptype.Transactions, blockNum) } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go index 4b510b67bd6..f18c6ba694c 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go @@ -212,7 +212,7 @@ func TestMergeSnapshots(t *testing.T) { for i := uint64(0); i < N; i++ { createFile(i*10_000, (i+1)*10_000) } - s := NewRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + s := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.MainnetChainName}, dir, 0, logger) defer s.Close() require.NoError(s.ReopenFolder()) { @@ -308,7 +308,7 @@ func TestDeleteSnapshots(t *testing.T) { for i := uint64(0); i < N; i++ { createFile(i*10_000, (i+1)*10_000) } - s := NewRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + s := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.MainnetChainName}, dir, 0, logger) defer s.Close() retireFiles := []string{ "v1-000000-000010-bodies.seg", @@ -331,6 +331,12 @@ func TestRemoveOverlaps(t *testing.T) { } } + // 0 - 10_000, ... , 40_000 - 50_000 => 5 files + // 0 - 100_000 => 1 file + // 130_000 - 140_000, ... , 180_000 - 190_000 => 5 files + // 100_000 - 200_000 => 1 file + // 200_000 - 210_000, ... , 220_000 - 230_000 => 3 files + for i := uint64(0); i < 5; i++ { createFile(i*10_000, (i+1)*10_000) } @@ -347,7 +353,7 @@ func TestRemoveOverlaps(t *testing.T) { createFile(200_000+i*10_000, 200_000+(i+1)*10_000) } - s := NewRoSnapshots(ethconfig.BlocksFreezing{}, dir, 0, logger) + s := NewRoSnapshots(ethconfig.BlocksFreezing{ChainName: networkname.MainnetChainName}, dir, 0, logger) defer s.Close() require.NoError(s.ReopenSegments(coresnaptype.BlockSnapshotTypes, false)) @@ -395,11 +401,14 @@ func TestOpenAllSnapshot(t *testing.T) { logger := log.New() baseDir, require := t.TempDir(), require.New(t) - for _, chain := range []string{networkname.MainnetChainName, networkname.AmoyChainName} { + steps := []uint64{500_000, 100_000} + + for i, chain := range []string{networkname.MainnetChainName, networkname.AmoyChainName} { + step := steps[i] dir := filepath.Join(baseDir, chain) chainSnapshotCfg := snapcfg.KnownCfg(chain) chainSnapshotCfg.ExpectBlocks = math.MaxUint64 - cfg := ethconfig.BlocksFreezing{} + cfg := ethconfig.BlocksFreezing{ChainName: chain} createFile := func(from, to uint64, name snaptype.Type) { createTestSegmentFile(t, from, to, name.Enum(), dir, 1, logger) } @@ -412,49 +421,49 @@ func TestOpenAllSnapshot(t *testing.T) { res, _ := s.segments.Get(e) return res } - require.Equal(0, len(getSegs(coresnaptype.Enums.Headers).segments)) + require.Equal(0, len(getSegs(coresnaptype.Enums.Headers).VisibleSegments)) s.Close() - createFile(500_000, 1_000_000, coresnaptype.Bodies) + createFile(step, step*2, coresnaptype.Bodies) s = NewRoSnapshots(cfg, dir, 0, logger) defer s.Close() require.NotNil(getSegs(coresnaptype.Enums.Bodies)) - require.Equal(0, len(getSegs(coresnaptype.Enums.Bodies).segments)) + require.Equal(0, len(getSegs(coresnaptype.Enums.Bodies).VisibleSegments)) s.Close() - createFile(500_000, 1_000_000, coresnaptype.Headers) - createFile(500_000, 1_000_000, coresnaptype.Transactions) + createFile(step, step*2, coresnaptype.Headers) + createFile(step, step*2, coresnaptype.Transactions) s = NewRoSnapshots(cfg, dir, 0, logger) err = s.ReopenFolder() require.NoError(err) require.NotNil(getSegs(coresnaptype.Enums.Headers)) s.ReopenSegments(coresnaptype.BlockSnapshotTypes, false) - require.Equal(1, len(getSegs(coresnaptype.Enums.Headers).segments)) + // require.Equal(1, len(getSegs(coresnaptype.Enums.Headers).visibleSegments)) s.Close() - createFile(0, 500_000, coresnaptype.Bodies) - createFile(0, 500_000, coresnaptype.Headers) - createFile(0, 500_000, coresnaptype.Transactions) + createFile(0, step, coresnaptype.Bodies) + createFile(0, step, coresnaptype.Headers) + createFile(0, step, coresnaptype.Transactions) s = NewRoSnapshots(cfg, dir, 0, logger) defer s.Close() err = s.ReopenFolder() require.NoError(err) require.NotNil(getSegs(coresnaptype.Enums.Headers)) - require.Equal(2, len(getSegs(coresnaptype.Enums.Headers).segments)) + require.Equal(2, len(getSegs(coresnaptype.Enums.Headers).VisibleSegments)) view := s.View() defer view.Close() seg, ok := view.TxsSegment(10) require.True(ok) - require.Equal(int(seg.to), 500_000) + require.Equal(seg.to, step) - seg, ok = view.TxsSegment(500_000) + seg, ok = view.TxsSegment(step) require.True(ok) - require.Equal(int(seg.to), 1_000_000) + require.Equal(seg.to, step*2) - _, ok = view.TxsSegment(1_000_000) + _, ok = view.TxsSegment(step * 2) require.False(ok) // Erigon may create new snapshots by itself - with high bigger than hardcoded ExpectedBlocks @@ -465,11 +474,11 @@ func TestOpenAllSnapshot(t *testing.T) { require.NoError(err) defer s.Close() require.NotNil(getSegs(coresnaptype.Enums.Headers)) - require.Equal(2, len(getSegs(coresnaptype.Enums.Headers).segments)) + require.Equal(2, len(getSegs(coresnaptype.Enums.Headers).VisibleSegments)) - createFile(500_000, 900_000, coresnaptype.Headers) - createFile(500_000, 900_000, coresnaptype.Bodies) - createFile(500_000, 900_000, coresnaptype.Transactions) + createFile(step, step*2-step/5, coresnaptype.Headers) + createFile(step, step*2-step/5, coresnaptype.Bodies) + createFile(step, step*2-step/5, coresnaptype.Transactions) chainSnapshotCfg.ExpectBlocks = math.MaxUint64 s = NewRoSnapshots(cfg, dir, 0, logger) defer s.Close() @@ -516,3 +525,77 @@ func TestParseCompressedFileName(t *testing.T) { require.Equal(1_000, int(f.From)) require.Equal(2_000, int(f.To)) } + +func getSeg(s *RoSnapshots, e snaptype.Enum) *segments { + res, _ := s.segments.Get(e) + return res +} + +func TestCalculateVisibleSegments(t *testing.T) { + logger := log.New() + dir, require := t.TempDir(), require.New(t) + createFile := func(from, to uint64, name snaptype.Type) { + createTestSegmentFile(t, from, to, name.Enum(), dir, 1, logger) + } + + for i := uint64(0); i < 7; i++ { + createFile(i*500_000, (i+1)*500_000, coresnaptype.Headers) + } + for i := uint64(0); i < 6; i++ { + createFile(i*500_000, (i+1)*500_000, coresnaptype.Bodies) + } + for i := uint64(0); i < 5; i++ { + createFile(i*500_000, (i+1)*500_000, coresnaptype.Transactions) + } + cfg := ethconfig.BlocksFreezing{ChainName: networkname.MainnetChainName} + s := NewRoSnapshots(cfg, dir, 0, logger) + defer s.Close() + + { + require.NoError(s.ReopenFolder()) + idx := s.idxAvailability() + require.Equal(2_500_000-1, int(idx)) + + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Headers).VisibleSegments)) + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Bodies).VisibleSegments)) + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Transactions).VisibleSegments)) + + require.Equal(7, getSeg(s, coresnaptype.Enums.Headers).DirtySegments.Len()) + require.Equal(6, getSeg(s, coresnaptype.Enums.Bodies).DirtySegments.Len()) + require.Equal(5, getSeg(s, coresnaptype.Enums.Transactions).DirtySegments.Len()) + } + + // gap in transactions: [5*500_000 - 6*500_000] + { + createFile(6*500_000, 7*500_000, coresnaptype.Transactions) + + require.NoError(s.ReopenFolder()) + idx := s.idxAvailability() + require.Equal(2_500_000-1, int(idx)) + + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Headers).VisibleSegments)) + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Bodies).VisibleSegments)) + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Transactions).VisibleSegments)) + + require.Equal(7, getSeg(s, coresnaptype.Enums.Headers).DirtySegments.Len()) + require.Equal(6, getSeg(s, coresnaptype.Enums.Bodies).DirtySegments.Len()) + require.Equal(5, getSeg(s, coresnaptype.Enums.Transactions).DirtySegments.Len()) + } + + // overlap in transactions: [4*500_000 - 4.5*500_000] + { + createFile(4*500_000, 4*500_000+250_000, coresnaptype.Transactions) + + require.NoError(s.ReopenFolder()) + idx := s.idxAvailability() + require.Equal(2_500_000-1, int(idx)) + + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Headers).VisibleSegments)) + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Bodies).VisibleSegments)) + require.Equal(5, len(getSeg(s, coresnaptype.Enums.Transactions).VisibleSegments)) + + require.Equal(7, getSeg(s, coresnaptype.Enums.Headers).DirtySegments.Len()) + require.Equal(6, getSeg(s, coresnaptype.Enums.Bodies).DirtySegments.Len()) + require.Equal(5, getSeg(s, coresnaptype.Enums.Transactions).DirtySegments.Len()) + } +} diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index d0ccc3a3fd2..fde041e8d7f 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -356,8 +356,8 @@ func checkBlockEvents(ctx context.Context, config *borcfg.BorConfig, blockReader return prevEventTime, nil } -func ValidateBorEvents(ctx context.Context, config *borcfg.BorConfig, db kv.RoDB, blockReader services.FullBlockReader, eventSegment *Segment, prevEventId uint64, maxBlockNum uint64, failFast bool, logEvery *time.Ticker) (uint64, error) { - g := eventSegment.Decompressor.MakeGetter() +func ValidateBorEvents(ctx context.Context, config *borcfg.BorConfig, db kv.RoDB, blockReader services.FullBlockReader, eventSegment *VisibleSegment, prevEventId uint64, maxBlockNum uint64, failFast bool, logEvery *time.Ticker) (uint64, error) { + g := eventSegment.src.Decompressor.MakeGetter() word := make([]byte, 0, 4096) @@ -475,15 +475,15 @@ func (v *BorView) Close() { v.base.Close() } -func (v *BorView) Events() []*Segment { return v.base.segments(borsnaptype.BorEvents) } -func (v *BorView) Spans() []*Segment { return v.base.segments(borsnaptype.BorSpans) } -func (v *BorView) Checkpoints() []*Segment { return v.base.segments(borsnaptype.BorCheckpoints) } -func (v *BorView) Milestones() []*Segment { return v.base.segments(borsnaptype.BorMilestones) } +func (v *BorView) Events() []*VisibleSegment { return v.base.segments(borsnaptype.BorEvents) } +func (v *BorView) Spans() []*VisibleSegment { return v.base.segments(borsnaptype.BorSpans) } +func (v *BorView) Checkpoints() []*VisibleSegment { return v.base.segments(borsnaptype.BorCheckpoints) } +func (v *BorView) Milestones() []*VisibleSegment { return v.base.segments(borsnaptype.BorMilestones) } -func (v *BorView) EventsSegment(blockNum uint64) (*Segment, bool) { +func (v *BorView) EventsSegment(blockNum uint64) (*VisibleSegment, bool) { return v.base.Segment(borsnaptype.BorEvents, blockNum) } -func (v *BorView) SpansSegment(blockNum uint64) (*Segment, bool) { +func (v *BorView) SpansSegment(blockNum uint64) (*VisibleSegment, bool) { return v.base.Segment(borsnaptype.BorSpans, blockNum) } diff --git a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go index 66fab0dc151..8a069883e7e 100644 --- a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go @@ -25,10 +25,12 @@ import ( "math" "os" "path/filepath" + "sync" "sync/atomic" "time" "github.com/klauspost/compress/zstd" + "github.com/tidwall/btree" "github.com/erigontech/erigon-lib/log/v3" @@ -89,6 +91,9 @@ type CaplinSnapshots struct { Salt uint32 + dirtySegmentsLock sync.RWMutex + visibleSegmentsLock sync.RWMutex + BeaconBlocks *segments BlobSidecars *segments @@ -110,7 +115,15 @@ type CaplinSnapshots struct { // - gaps are not allowed // - segment have [from:to) semantic func NewCaplinSnapshots(cfg ethconfig.BlocksFreezing, beaconCfg *clparams.BeaconChainConfig, dirs datadir.Dirs, logger log.Logger) *CaplinSnapshots { - return &CaplinSnapshots{dir: dirs.Snap, tmpdir: dirs.Tmp, cfg: cfg, BeaconBlocks: &segments{}, BlobSidecars: &segments{}, logger: logger, beaconCfg: beaconCfg} + BeaconBlocks := &segments{ + DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + } + BlobSidecars := &segments{ + DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + } + c := &CaplinSnapshots{dir: dirs.Snap, tmpdir: dirs.Tmp, cfg: cfg, BeaconBlocks: BeaconBlocks, BlobSidecars: BlobSidecars, logger: logger, beaconCfg: beaconCfg} + c.recalcVisibleFiles() + return c } func (s *CaplinSnapshots) IndicesMax() uint64 { return s.idxMax.Load() } @@ -125,34 +138,34 @@ func (s *CaplinSnapshots) LS() { if s == nil { return } - if s.BeaconBlocks != nil { - for _, seg := range s.BeaconBlocks.segments { - if seg.Decompressor == nil { - continue - } - log.Info("[agg] ", "f", seg.Decompressor.FileName(), "words", seg.Decompressor.Count()) + view := s.View() + defer view.Close() + + if view.BeaconBlockRotx != nil { + for _, seg := range view.BeaconBlockRotx.VisibleSegments { + log.Info("[agg] ", "f", seg.src.Decompressor.FileName(), "words", seg.src.Decompressor.Count()) } } - if s.BlobSidecars != nil { - for _, seg := range s.BlobSidecars.segments { - if seg.Decompressor == nil { - continue - } - log.Info("[agg] ", "f", seg.Decompressor.FileName(), "words", seg.Decompressor.Count()) + if view.BlobSidecarRotx != nil { + for _, seg := range view.BlobSidecarRotx.VisibleSegments { + log.Info("[agg] ", "f", seg.src.Decompressor.FileName(), "words", seg.src.Decompressor.Count()) } } } func (s *CaplinSnapshots) SegFileNames(from, to uint64) []string { + view := s.View() + defer view.Close() + var res []string - for _, seg := range s.BeaconBlocks.segments { + for _, seg := range view.BeaconBlockRotx.VisibleSegments { if seg.from >= from && seg.to <= to { - res = append(res, seg.FileName()) + res = append(res, seg.src.FileName()) } } - for _, seg := range s.BlobSidecars.segments { + for _, seg := range view.BlobSidecarRotx.VisibleSegments { if seg.from >= from && seg.to <= to { - res = append(res, seg.FileName()) + res = append(res, seg.src.FileName()) } } return res @@ -166,19 +179,18 @@ func (s *CaplinSnapshots) Close() { if s == nil { return } - s.BeaconBlocks.lock.Lock() - defer s.BeaconBlocks.lock.Unlock() - s.BlobSidecars.lock.Lock() - defer s.BlobSidecars.lock.Unlock() + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + s.closeWhatNotInList(nil) } // ReopenList stops on optimistic=false, continue opening files on optimistic=true func (s *CaplinSnapshots) ReopenList(fileNames []string, optimistic bool) error { - s.BeaconBlocks.lock.Lock() - defer s.BeaconBlocks.lock.Unlock() - s.BlobSidecars.lock.Lock() - defer s.BlobSidecars.lock.Unlock() + defer s.recalcVisibleFiles() + + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() s.closeWhatNotInList(fileNames) var segmentsMax uint64 @@ -193,20 +205,28 @@ Loop: switch f.Type.Enum() { case snaptype.CaplinEnums.BeaconBlocks: - var sn *Segment + var sn *DirtySegment var exists bool - for _, sn2 := range s.BeaconBlocks.segments { - if sn2.Decompressor == nil { // it's ok if some segment was not able to open - continue - } - if fName == sn2.FileName() { - sn = sn2 - exists = true - break + s.BeaconBlocks.DirtySegments.Walk(func(segments []*DirtySegment) bool { + for _, sn2 := range segments { + if sn2.Decompressor == nil { // it's ok if some segment was not able to open + continue + } + if fName == sn2.FileName() { + sn = sn2 + exists = true + break + } } - } + return true + }) if !exists { - sn = &Segment{segType: snaptype.BeaconBlocks, version: f.Version, Range: Range{f.From, f.To}} + sn = &DirtySegment{ + segType: snaptype.BeaconBlocks, + version: f.Version, + Range: Range{f.From, f.To}, + frozen: snapcfg.Seedable(s.cfg.ChainName, f), + } } if err := sn.reopenSeg(s.dir); err != nil { if errors.Is(err, os.ErrNotExist) { @@ -227,7 +247,7 @@ Loop: if !exists { // it's possible to iterate over .seg file even if you don't have index // then make segment available even if index open may fail - s.BeaconBlocks.segments = append(s.BeaconBlocks.segments, sn) + s.BeaconBlocks.DirtySegments.Set(sn) } if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { return err @@ -242,20 +262,28 @@ Loop: segmentsMaxSet = true } case snaptype.CaplinEnums.BlobSidecars: - var sn *Segment + var sn *DirtySegment var exists bool - for _, sn2 := range s.BlobSidecars.segments { - if sn2.Decompressor == nil { // it's ok if some segment was not able to open - continue - } - if fName == sn2.FileName() { - sn = sn2 - exists = true - break + s.BlobSidecars.DirtySegments.Walk(func(segments []*DirtySegment) bool { + for _, sn2 := range segments { + if sn2.Decompressor == nil { // it's ok if some segment was not able to open + continue + } + if fName == sn2.FileName() { + sn = sn2 + exists = true + break + } } - } + return true + }) if !exists { - sn = &Segment{segType: snaptype.BlobSidecars, version: f.Version, Range: Range{f.From, f.To}} + sn = &DirtySegment{ + segType: snaptype.BlobSidecars, + version: f.Version, + Range: Range{f.From, f.To}, + frozen: snapcfg.Seedable(s.cfg.ChainName, f), + } } if err := sn.reopenSeg(s.dir); err != nil { if errors.Is(err, os.ErrNotExist) { @@ -276,7 +304,7 @@ Loop: if !exists { // it's possible to iterate over .seg file even if you don't have index // then make segment available even if index open may fail - s.BlobSidecars.segments = append(s.BlobSidecars.segments, sn) + s.BlobSidecars.DirtySegments.Set(sn) } if err := sn.reopenIdxIfNeed(s.dir, optimistic); err != nil { return err @@ -288,21 +316,55 @@ Loop: s.segmentsMax.Store(segmentsMax) } s.segmentsReady.Store(true) - s.idxMax.Store(s.idxAvailability()) - s.indicesReady.Store(true) - return nil } -func (s *CaplinSnapshots) idxAvailability() uint64 { - var beaconBlocks uint64 - for _, seg := range s.BeaconBlocks.segments { - if seg.Index() == nil { - break - } - beaconBlocks = seg.to - 1 +func (s *CaplinSnapshots) recalcVisibleFiles() { + defer func() { + s.idxMax.Store(s.idxAvailability()) + s.indicesReady.Store(true) + }() + + s.visibleSegmentsLock.Lock() + defer s.visibleSegmentsLock.Unlock() + + getNewVisibleSegments := func(dirtySegments *btree.BTreeG[*DirtySegment]) []*VisibleSegment { + newVisibleSegments := make([]*VisibleSegment, 0, dirtySegments.Len()) + dirtySegments.Walk(func(segments []*DirtySegment) bool { + for _, sn := range segments { + if sn.canDelete.Load() { + continue + } + if sn.Decompressor == nil { + continue + } + if sn.indexes == nil { + continue + } + for len(newVisibleSegments) > 0 && newVisibleSegments[len(newVisibleSegments)-1].src.isSubSetOf(sn) { + newVisibleSegments[len(newVisibleSegments)-1].src = nil + newVisibleSegments = newVisibleSegments[:len(newVisibleSegments)-1] + } + newVisibleSegments = append(newVisibleSegments, &VisibleSegment{ + Range: sn.Range, + segType: sn.segType, + src: sn, + }) + } + return true + }) + return newVisibleSegments } - return beaconBlocks + s.BeaconBlocks.VisibleSegments = getNewVisibleSegments(s.BeaconBlocks.DirtySegments) + s.BlobSidecars.VisibleSegments = getNewVisibleSegments(s.BlobSidecars.DirtySegments) + + if len(s.BeaconBlocks.VisibleSegments) > 0 { + s.BeaconBlocks.maxVisibleBlock.Store(s.BeaconBlocks.VisibleSegments[len(s.BeaconBlocks.VisibleSegments)-1].to - 1) + } +} + +func (s *CaplinSnapshots) idxAvailability() uint64 { + return s.BeaconBlocks.maxVisibleBlock.Load() } func (s *CaplinSnapshots) ReopenFolder() error { @@ -319,67 +381,69 @@ func (s *CaplinSnapshots) ReopenFolder() error { } func (s *CaplinSnapshots) closeWhatNotInList(l []string) { -Loop1: - for i, sn := range s.BeaconBlocks.segments { - if sn.Decompressor == nil { - continue Loop1 - } - _, name := filepath.Split(sn.FilePath()) - for _, fName := range l { - if fName == name { + toClose := make([]*DirtySegment, 0) + s.BeaconBlocks.DirtySegments.Walk(func(segments []*DirtySegment) bool { + Loop1: + for _, sn := range segments { + if sn.Decompressor == nil { continue Loop1 } + _, name := filepath.Split(sn.FilePath()) + for _, fName := range l { + if fName == name { + continue Loop1 + } + } + toClose = append(toClose, sn) } + return true + }) + for _, sn := range toClose { sn.close() - s.BeaconBlocks.segments[i] = nil - } - var i int - for i = 0; i < len(s.BeaconBlocks.segments) && s.BeaconBlocks.segments[i] != nil && s.BeaconBlocks.segments[i].Decompressor != nil; i++ { + s.BeaconBlocks.DirtySegments.Delete(sn) } - tail := s.BeaconBlocks.segments[i:] - s.BeaconBlocks.segments = s.BeaconBlocks.segments[:i] - for i = 0; i < len(tail); i++ { - if tail[i] != nil { - tail[i].close() - tail[i] = nil - } - } -Loop2: - for i, sn := range s.BlobSidecars.segments { - if sn.Decompressor == nil { - continue Loop2 - } - _, name := filepath.Split(sn.FilePath()) - for _, fName := range l { - if fName == name { + + toClose = make([]*DirtySegment, 0) + s.BlobSidecars.DirtySegments.Walk(func(segments []*DirtySegment) bool { + Loop2: + for _, sn := range segments { + if sn.Decompressor == nil { continue Loop2 } + _, name := filepath.Split(sn.FilePath()) + for _, fName := range l { + if fName == name { + continue Loop2 + } + } + toClose = append(toClose, sn) } + return true + }) + for _, sn := range toClose { sn.close() - s.BlobSidecars.segments[i] = nil - } - - for i = 0; i < len(s.BlobSidecars.segments) && s.BlobSidecars.segments[i] != nil && s.BlobSidecars.segments[i].Decompressor != nil; i++ { - } - tail = s.BlobSidecars.segments[i:] - s.BlobSidecars.segments = s.BlobSidecars.segments[:i] - for i = 0; i < len(tail); i++ { - if tail[i] != nil { - tail[i].close() - tail[i] = nil - } + s.BlobSidecars.DirtySegments.Delete(sn) } } type CaplinView struct { - s *CaplinSnapshots - closed bool + s *CaplinSnapshots + BeaconBlockRotx *segmentsRotx + BlobSidecarRotx *segmentsRotx + closed bool } func (s *CaplinSnapshots) View() *CaplinView { + s.visibleSegmentsLock.RLock() + defer s.visibleSegmentsLock.RUnlock() + v := &CaplinView{s: s} - v.s.BeaconBlocks.lock.RLock() - v.s.BlobSidecars.lock.RLock() + if s.BeaconBlocks != nil { + v.BeaconBlockRotx = s.BeaconBlocks.BeginRotx() + } + if s.BlobSidecars != nil { + v.BlobSidecarRotx = s.BlobSidecars.BeginRotx() + } return v } @@ -387,16 +451,18 @@ func (v *CaplinView) Close() { if v.closed { return } + v.BeaconBlockRotx.Close() + v.BlobSidecarRotx.Close() + v.s = nil v.closed = true - v.s.BeaconBlocks.lock.RUnlock() - v.s.BlobSidecars.lock.RUnlock() - } -func (v *CaplinView) BeaconBlocks() []*Segment { return v.s.BeaconBlocks.segments } -func (v *CaplinView) BlobSidecars() []*Segment { return v.s.BlobSidecars.segments } +func (v *CaplinView) BeaconBlocks() []*VisibleSegment { + return v.BeaconBlockRotx.VisibleSegments +} +func (v *CaplinView) BlobSidecars() []*VisibleSegment { return v.BlobSidecarRotx.VisibleSegments } -func (v *CaplinView) BeaconBlocksSegment(slot uint64) (*Segment, bool) { +func (v *CaplinView) BeaconBlocksSegment(slot uint64) (*VisibleSegment, bool) { for _, seg := range v.BeaconBlocks() { if !(slot >= seg.from && slot < seg.to) { continue @@ -406,7 +472,7 @@ func (v *CaplinView) BeaconBlocksSegment(slot uint64) (*Segment, bool) { return nil, false } -func (v *CaplinView) BlobSidecarsSegment(slot uint64) (*Segment, bool) { +func (v *CaplinView) BlobSidecarsSegment(slot uint64) (*VisibleSegment, bool) { for _, seg := range v.BlobSidecars() { if !(slot >= seg.from && slot < seg.to) { continue @@ -647,14 +713,14 @@ func (s *CaplinSnapshots) ReadHeader(slot uint64) (*cltypes.SignedBeaconBlockHea return nil, 0, libcommon.Hash{}, nil } - idxSlot := seg.Index() + idxSlot := seg.src.Index() if idxSlot == nil { return nil, 0, libcommon.Hash{}, nil } blockOffset := idxSlot.OrdinalLookup(slot - idxSlot.BaseDataID()) - gg := seg.MakeGetter() + gg := seg.src.MakeGetter() gg.Reset(blockOffset) if !gg.HasNext() { return nil, 0, libcommon.Hash{}, nil @@ -689,14 +755,14 @@ func (s *CaplinSnapshots) ReadBlobSidecars(slot uint64) ([]*cltypes.BlobSidecar, return nil, nil } - idxSlot := seg.Index() + idxSlot := seg.src.Index() if idxSlot == nil { return nil, nil } blockOffset := idxSlot.OrdinalLookup(slot - idxSlot.BaseDataID()) - gg := seg.MakeGetter() + gg := seg.src.MakeGetter() gg.Reset(blockOffset) if !gg.HasNext() { return nil, nil @@ -726,7 +792,7 @@ func (s *CaplinSnapshots) FrozenBlobs() uint64 { minSegFrom := ((s.beaconCfg.SlotsPerEpoch * s.beaconCfg.DenebForkEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit foundMinSeg := false ret := uint64(0) - for _, seg := range s.BlobSidecars.segments { + for _, seg := range s.BlobSidecars.VisibleSegments { if seg.from == minSegFrom { foundMinSeg = true }