From b0814e1eee0d04f1ba94742c984071150da2788a Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 12 Sep 2024 13:41:31 +0700 Subject: [PATCH] e3: support logs subscriptions (#11949) - E3 doesn't store receipts in db. So, let's store some limited amount in RAM. Maybe later will find better option. - need support unwind of receipts (notify their info) - need send notification after `rwtx.Commit` (or user will recv notification, but can't request new data by RPC) --- cmd/integration/commands/stages.go | 4 +- cmd/integration/commands/state_stages.go | 4 +- eth/backend.go | 15 +-- eth/stagedsync/exec3.go | 7 +- eth/stagedsync/stage_execute.go | 30 ++--- eth/stagedsync/stage_finish.go | 115 +----------------- .../engine_helpers/fork_validator.go | 6 +- turbo/shards/events.go | 88 +++++++++++++- turbo/stages/mock/mock_sentry.go | 26 ++-- turbo/stages/stageloop.go | 57 ++++++--- 10 files changed, 176 insertions(+), 176 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 74c5e246da0..c74bc75f210 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1468,7 +1468,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance)) agg.SetSnapshotBuildSema(blockSnapBuildSema) - notifications := &shards.Notifications{} + notifications := shards.NewNotifications(nil) blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, chainConfig, notifications.Events, blockSnapBuildSema, logger) var ( @@ -1504,7 +1504,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig, sentryControlServer.ChainConfig, sentryControlServer.Engine, &vm.Config{}, - notifications.Accumulator, + notifications, cfg.StateStream, /*stateStream=*/ false, dirs, diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index e846839cd73..387158289ef 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -179,7 +179,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. must(batchSize.UnmarshalText([]byte(batchSizeStr))) stateStages.DisableStages(stages.Snapshots, stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders) - changesAcc := shards.NewAccumulator() + notifications := shards.NewNotifications(nil) genesis := core.GenesisBlockByChainName(chain) syncCfg := ethconfig.Defaults.Sync @@ -187,7 +187,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. syncCfg.ReconWorkerCount = int(reconWorkers) br, _ := blocksIO(db, logger1) - execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, changesAcc, false, true, dirs, br, nil, genesis, syncCfg, nil) + execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, genesis, syncCfg, nil) execUntilFunc := func(execToBlock uint64) stagedsync.ExecFunc { return func(badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, txc wrap.TxContainer, logger log.Logger) error { diff --git a/eth/backend.go b/eth/backend.go index 1d03a6c46e2..e528161f585 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -188,7 +188,8 @@ type Ethereum struct { downloaderClient protodownloader.DownloaderClient - notifications *shards.Notifications + notifications *shards.Notifications + unsubscribeEthstat func() waitForStageLoopStop chan struct{} @@ -283,11 +284,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger etherbase: config.Miner.Etherbase, waitForStageLoopStop: make(chan struct{}), waitForMiningStop: make(chan struct{}), - notifications: &shards.Notifications{ - Events: shards.NewEvents(), - Accumulator: shards.NewAccumulator(), - }, - logger: logger, + logger: logger, stopNode: func() error { return stack.Close() }, @@ -360,7 +357,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger) - backend.notifications.StateChangesConsumer = kvRPC + backend.notifications = shards.NewNotifications(kvRPC) backend.kvRPC = kvRPC backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice) @@ -689,7 +686,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger chainConfig, backend.engine, &vm.Config{}, - backend.notifications.Accumulator, + backend.notifications, config.StateStream, /*stateStream=*/ false, dirs, @@ -731,7 +728,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger chainConfig, backend.engine, &vm.Config{}, - backend.notifications.Accumulator, + backend.notifications, config.StateStream, /*stateStream=*/ false, dirs, diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 79b81f3c26d..065d4a8bf64 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -364,7 +364,7 @@ func ExecV3(ctx context.Context, shouldReportToTxPool := maxBlockNum-blockNum <= 64 var accumulator *shards.Accumulator if shouldReportToTxPool { - accumulator = cfg.accumulator + accumulator = cfg.notifications.Accumulator if accumulator == nil { accumulator = shards.NewAccumulator() } @@ -872,7 +872,10 @@ Loop: blobGasUsed += txTask.Tx.GetBlobGas() } if txTask.Final { - checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts + if !isMining && !inMemExec && !execStage.CurrentSyncCycle.IsInitialCycle { + cfg.notifications.RecentLogs.Add(receipts) + } + checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && !isMining if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, isMining); err != nil { return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index b6837e5d83f..4aac17e2422 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -65,18 +65,18 @@ type headerDownloader interface { } type ExecuteBlockCfg struct { - db kv.RwDB - batchSize datasize.ByteSize - prune prune.Mode - chainConfig *chain.Config - engine consensus.Engine - vmConfig *vm.Config - badBlockHalt bool - stateStream bool - accumulator *shards.Accumulator - blockReader services.FullBlockReader - hd headerDownloader - author *common.Address + db kv.RwDB + batchSize datasize.ByteSize + prune prune.Mode + chainConfig *chain.Config + notifications *shards.Notifications + engine consensus.Engine + vmConfig *vm.Config + badBlockHalt bool + stateStream bool + blockReader services.FullBlockReader + hd headerDownloader + author *common.Address // last valid number of the stage dirs datadir.Dirs @@ -97,7 +97,7 @@ func StageExecuteBlocksCfg( chainConfig *chain.Config, engine consensus.Engine, vmConfig *vm.Config, - accumulator *shards.Accumulator, + notifications *shards.Notifications, stateStream bool, badBlockHalt bool, @@ -120,7 +120,7 @@ func StageExecuteBlocksCfg( engine: engine, vmConfig: vmConfig, dirs: dirs, - accumulator: accumulator, + notifications: notifications, stateStream: stateStream, badBlockHalt: badBlockHalt, blockReader: blockReader, @@ -357,7 +357,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) error { var accumulator *shards.Accumulator if cfg.stateStream && s.BlockNumber-u.UnwindPoint < stateStreamLimit { - accumulator = cfg.accumulator + accumulator = cfg.notifications.Accumulator hash, err := cfg.blockReader.CanonicalHash(ctx, txc.Tx, u.UnwindPoint) if err != nil { diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index c0f203e4b53..21be6a5c422 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -17,29 +17,17 @@ package stagedsync import ( - "bytes" "context" "encoding/binary" - "fmt" "time" - "github.com/erigontech/erigon-lib/kv/dbutils" - libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/hexutility" - "github.com/erigontech/erigon-lib/gointerfaces" - remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" - types2 "github.com/erigontech/erigon-lib/gointerfaces/typesproto" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" - bortypes "github.com/erigontech/erigon/polygon/bor/types" - "github.com/erigontech/erigon/turbo/engineapi/engine_helpers" - "github.com/erigontech/erigon/turbo/services" - "github.com/erigontech/erigon/core/rawdb" - "github.com/erigontech/erigon/core/types" - "github.com/erigontech/erigon/ethdb/cbor" "github.com/erigontech/erigon/params" + "github.com/erigontech/erigon/turbo/engineapi/engine_helpers" ) type FinishCfg struct { @@ -142,41 +130,24 @@ func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context) return nil } -func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishStageAfterSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx, logger log.Logger, blockReader services.FullBlockReader) error { +// [from,to) +func NotifyNewHeaders(ctx context.Context, notifyFrom, notifyTo uint64, notifier ChainEventNotifier, tx kv.Tx, logger log.Logger) error { t := time.Now() if notifier == nil { logger.Trace("RPC Daemon notification channel not set. No headers notifications will be sent") return nil } // Notify all headers we have (either canonical or not) in a maximum range span of 1024 - var notifyFrom uint64 - var isUnwind bool - if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync { - notifyFrom = *unwindTo - isUnwind = true - } else { - heightSpan := finishStageAfterSync - finishStageBeforeSync - if heightSpan > 1024 { - heightSpan = 1024 - } - notifyFrom = finishStageAfterSync - heightSpan - } - notifyFrom++ - - var notifyTo = notifyFrom - var notifyToHash libcommon.Hash var headersRlp [][]byte if err := tx.ForEach(kv.HeaderCanonical, hexutility.EncodeTs(notifyFrom), func(k, hash []byte) (err error) { if len(hash) == 0 { return nil } blockNum := binary.BigEndian.Uint64(k) - if blockNum > finishStageAfterSync { //[from,to) + if blockNum >= notifyTo { //[from,to) return nil } - notifyTo = blockNum - notifyToHash = libcommon.BytesToHash(hash) - headerRLP := rawdb.ReadHeaderRLP(tx, notifyToHash, notifyTo) + headerRLP := rawdb.ReadHeaderRLP(tx, libcommon.BytesToHash(hash), blockNum) if headerRLP != nil { headersRlp = append(headersRlp, libcommon.CopyBytes(headerRLP)) } @@ -189,81 +160,7 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS if len(headersRlp) > 0 { notifier.OnNewHeader(headersRlp) headerTiming := time.Since(t) - - t = time.Now() - if notifier.HasLogSubsriptions() { - logs, err := ReadLogs(tx, notifyFrom, isUnwind, blockReader) - if err != nil { - return err - } - notifier.OnLogs(logs) - } - logTiming := time.Since(t) - logger.Debug("RPC Daemon notified of new headers", "from", notifyFrom-1, "to", notifyTo, "amount", len(headersRlp), "hash", notifyToHash, "header sending", headerTiming, "log sending", logTiming) + logger.Debug("RPC Daemon notified of new headers", "from", notifyFrom-1, "to", notifyTo, "amount", len(headersRlp), "header sending", headerTiming) } return nil } - -func ReadLogs(tx kv.Tx, from uint64, isUnwind bool, blockReader services.FullBlockReader) ([]*remote.SubscribeLogsReply, error) { - logs, err := tx.Cursor(kv.Log) - if err != nil { - return nil, err - } - defer logs.Close() - reply := make([]*remote.SubscribeLogsReply, 0) - reader := bytes.NewReader(nil) - - var prevBlockNum uint64 - var block *types.Block - var logIndex uint64 - for k, v, err := logs.Seek(dbutils.LogKey(from, 0)); k != nil; k, v, err = logs.Next() { - if err != nil { - return nil, err - } - blockNum := binary.BigEndian.Uint64(k[:8]) - if block == nil || blockNum != prevBlockNum { - logIndex = 0 - prevBlockNum = blockNum - if block, err = blockReader.BlockByNumber(context.Background(), tx, blockNum); err != nil { - return nil, err - } - } - - txIndex := uint64(binary.BigEndian.Uint32(k[8:])) - - var txHash libcommon.Hash - - // bor transactions are at the end of the bodies transactions (added manually but not actually part of the block) - if txIndex == uint64(len(block.Transactions())) { - txHash = bortypes.ComputeBorTxHash(blockNum, block.Hash()) - } else { - txHash = block.Transactions()[txIndex].Hash() - } - - var ll types.Logs - reader.Reset(v) - if err := cbor.Unmarshal(&ll, reader); err != nil { - return nil, fmt.Errorf("receipt unmarshal failed: %w, blocl=%d", err, blockNum) - } - for _, l := range ll { - r := &remote.SubscribeLogsReply{ - Address: gointerfaces.ConvertAddressToH160(l.Address), - BlockHash: gointerfaces.ConvertHashToH256(block.Hash()), - BlockNumber: blockNum, - Data: l.Data, - LogIndex: logIndex, - Topics: make([]*types2.H256, 0, len(l.Topics)), - TransactionHash: gointerfaces.ConvertHashToH256(txHash), - TransactionIndex: txIndex, - Removed: isUnwind, - } - logIndex++ - for _, topic := range l.Topics { - r.Topics = append(r.Topics, gointerfaces.ConvertHashToH256(topic)) - } - reply = append(reply, r) - } - } - - return reply, nil -} diff --git a/turbo/engineapi/engine_helpers/fork_validator.go b/turbo/engineapi/engine_helpers/fork_validator.go index 24f5ce9c411..935b07b6e61 100644 --- a/turbo/engineapi/engine_helpers/fork_validator.go +++ b/turbo/engineapi/engine_helpers/fork_validator.go @@ -272,10 +272,8 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body var txc wrap.TxContainer txc.Tx = tx txc.Doms = fv.sharedDom - fv.extendingForkNotifications = &shards.Notifications{ - Events: shards.NewEvents(), - Accumulator: shards.NewAccumulator(), - } + + fv.extendingForkNotifications = shards.NewNotifications(nil) return fv.validateAndStorePayload(txc, header, body, unwindPoint, headersChain, bodiesChain, fv.extendingForkNotifications) } diff --git a/turbo/shards/events.go b/turbo/shards/events.go index 69ac068b401..9d571211ce4 100644 --- a/turbo/shards/events.go +++ b/turbo/shards/events.go @@ -20,7 +20,9 @@ import ( "sync" "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/gointerfaces" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" + types2 "github.com/erigontech/erigon-lib/gointerfaces/typesproto" "github.com/erigontech/erigon/core/types" ) @@ -156,6 +158,90 @@ func (e *Events) OnLogs(logs []*remote.SubscribeLogsReply) { type Notifications struct { Events *Events - Accumulator *Accumulator + Accumulator *Accumulator // StateAccumulator StateChangesConsumer StateChangeConsumer + RecentLogs *RecentLogs +} + +func NewNotifications(StateChangesConsumer StateChangeConsumer) *Notifications { + return &Notifications{ + Events: NewEvents(), + Accumulator: NewAccumulator(), + RecentLogs: NewRecentLogs(512), + StateChangesConsumer: StateChangesConsumer, + } +} + +// Requirements: +// - Erigon3 doesn't store logs in db (yet) +// - need support unwind of receipts +// - need send notification after `rwtx.Commit` (or user will recv notification, but can't request new data by RPC) +type RecentLogs struct { + receipts map[uint64]types.Receipts + limit uint64 + mu sync.Mutex +} + +func NewRecentLogs(limit uint64) *RecentLogs { + return &RecentLogs{receipts: make(map[uint64]types.Receipts, limit), limit: limit} +} + +// [from,to) +func (r *RecentLogs) Notify(n *Events, from, to uint64, isUnwind bool) { + if !n.HasLogSubsriptions() { + return + } + r.mu.Lock() + defer r.mu.Unlock() + for bn, receipts := range r.receipts { + if bn+r.limit < from { //evict old + delete(r.receipts, bn) + continue + } + if bn < from || bn >= to { + continue + } + + var blockNum uint64 + reply := make([]*remote.SubscribeLogsReply, 0, len(receipts)) + for _, receipt := range receipts { + blockNum = receipt.BlockNumber.Uint64() + //txIndex++ + //// bor transactions are at the end of the bodies transactions (added manually but not actually part of the block) + //if txIndex == uint64(len(block.Transactions())) { + // txHash = bortypes.ComputeBorTxHash(blockNum, block.Hash()) + //} else { + // txHash = block.Transactions()[txIndex].Hash() + //} + + for _, l := range receipt.Logs { + res := &remote.SubscribeLogsReply{ + Address: gointerfaces.ConvertAddressToH160(receipt.ContractAddress), + BlockHash: gointerfaces.ConvertHashToH256(receipt.BlockHash), + BlockNumber: blockNum, + Data: l.Data, + LogIndex: uint64(l.Index), + Topics: make([]*types2.H256, 0, len(l.Topics)), + TransactionHash: gointerfaces.ConvertHashToH256(receipt.TxHash), + TransactionIndex: uint64(l.TxIndex), + Removed: isUnwind, + } + for _, topic := range l.Topics { + res.Topics = append(res.Topics, gointerfaces.ConvertHashToH256(topic)) + } + reply = append(reply, res) + } + } + + n.OnLogs(reply) + } +} + +func (r *RecentLogs) Add(receipts types.Receipts) { + if len(receipts) == 0 { + return + } + r.mu.Lock() + defer r.mu.Unlock() + r.receipts[receipts[0].BlockNumber.Uint64()] = receipts } diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 4fa9a03f437..867442ef1c8 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -295,18 +295,14 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock := &MockSentry{ Ctx: ctx, cancel: ctxCancel, DB: db, agg: agg, - tb: tb, - Log: logger, - Dirs: dirs, - Engine: engine, - gspec: gspec, - ChainConfig: gspec.Config, - Key: key, - Notifications: &shards.Notifications{ - Events: shards.NewEvents(), - Accumulator: shards.NewAccumulator(), - StateChangesConsumer: erigonGrpcServeer, - }, + tb: tb, + Log: logger, + Dirs: dirs, + Engine: engine, + gspec: gspec, + ChainConfig: gspec.Config, + Key: key, + Notifications: shards.NewNotifications(erigonGrpcServeer), PeerId: gointerfaces.ConvertHashToH512([64]byte{0x12, 0x34, 0x50}), // "12345" BlockSnapshots: allSnapshots, BlockReader: br, @@ -493,7 +489,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.ChainConfig, mock.Engine, &vm.Config{}, - mock.Notifications.Accumulator, + mock.Notifications, cfg.StateStream, /*stateStream=*/ false, dirs, @@ -530,7 +526,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.ChainConfig, mock.Engine, &vm.Config{}, - mock.Notifications.Accumulator, + mock.Notifications, cfg.StateStream, /*stateStream=*/ false, dirs, @@ -566,7 +562,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.ChainConfig, mock.Engine, &vm.Config{}, - mock.Notifications.Accumulator, + mock.Notifications, cfg.StateStream, /*stateStream=*/ false, dirs, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 58d85999d5b..a3b2212fe36 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -381,37 +381,58 @@ func (h *Hook) afterRun(tx kv.Tx, finishProgressBefore uint64) error { if h.updateHead != nil { h.updateHead(h.ctx) } - if h.notifications != nil { - return h.sendNotifications(h.notifications, tx, finishProgressBefore) - } - return nil + return h.sendNotifications(tx, finishProgressBefore) + } -func (h *Hook) sendNotifications(notifications *shards.Notifications, tx kv.Tx, finishProgressBefore uint64) error { +func (h *Hook) sendNotifications(tx kv.Tx, finishStageBeforeSync uint64) error { + if h.notifications == nil { + return nil + } + // update the accumulator with a new plain state version so the cache can be notified that // state has moved on - if notifications.Accumulator != nil { + if h.notifications.Accumulator != nil { plainStateVersion, err := rawdb.GetStateVersion(tx) if err != nil { return err } - notifications.Accumulator.SetStateID(plainStateVersion) + h.notifications.Accumulator.SetStateID(plainStateVersion) } - if notifications.Events != nil { + if h.notifications.Events != nil { finishStageAfterSync, err := stages.GetStageProgress(tx, stages.Finish) if err != nil { return err } - if err = stagedsync.NotifyNewHeaders(h.ctx, finishProgressBefore, finishStageAfterSync, h.sync.PrevUnwindPoint(), notifications.Events, tx, h.logger, h.blockReader); err != nil { + + unwindTo := h.sync.PrevUnwindPoint() + + var notifyFrom uint64 + var isUnwind bool + if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync { + notifyFrom = *unwindTo + isUnwind = true + } else { + heightSpan := finishStageAfterSync - finishStageBeforeSync + if heightSpan > 1024 { + heightSpan = 1024 + } + notifyFrom = finishStageAfterSync - heightSpan + } + notifyFrom++ + notifyTo := finishStageAfterSync + 1 //[from, to) + + if err = stagedsync.NotifyNewHeaders(h.ctx, notifyFrom, notifyTo, h.notifications.Events, tx, h.logger); err != nil { return nil } + h.notifications.RecentLogs.Notify(h.notifications.Events, notifyFrom, notifyTo, isUnwind) } currentHeader := rawdb.ReadCurrentHeader(tx) - if (notifications.Accumulator != nil) && (currentHeader != nil) { + if (h.notifications.Accumulator != nil) && (currentHeader != nil) { if currentHeader.Number.Uint64() == 0 { - notifications.Accumulator.StartChange(0, currentHeader.Hash(), nil, false) + h.notifications.Accumulator.StartChange(0, currentHeader.Hash(), nil, false) } pendingBaseFee := misc.CalcBaseFee(h.chainConfig, currentHeader) @@ -431,7 +452,7 @@ func (h *Hook) sendNotifications(notifications *shards.Notifications, tx kv.Tx, } //h.logger.Debug("[hook] Sending state changes", "currentBlock", currentHeader.Number.Uint64(), "finalizedBlock", finalizedBlock) - notifications.Accumulator.SendAndReset(h.ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64(), pendingBlobFee, currentHeader.GasLimit, finalizedBlock) + h.notifications.Accumulator.SendAndReset(h.ctx, h.notifications.StateChangesConsumer, pendingBaseFee.Uint64(), pendingBlobFee, currentHeader.GasLimit, finalizedBlock) } return nil } @@ -629,7 +650,7 @@ func NewDefaultStages(ctx context.Context, stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), - stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications.Accumulator, cfg.StateStream, false, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)), + stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications, cfg.StateStream, false, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)), stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, controlServer.ChainConfig.Bor, blockReader), stagedsync.StageFinishCfg(db, dirs.Tmp, forkValidator), runInTestMode) } @@ -667,7 +688,7 @@ func NewPipelineStages(ctx context.Context, stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), - stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications.Accumulator, cfg.StateStream, false, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)), + stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications, cfg.StateStream, false, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)), stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, controlServer.ChainConfig.Bor, blockReader), stagedsync.StageFinishCfg(db, dirs.Tmp, forkValidator), runInTestMode) } @@ -678,7 +699,7 @@ func NewPipelineStages(ctx context.Context, stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, blockWriter), - stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications.Accumulator, cfg.StateStream, false, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)), stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, controlServer.ChainConfig.Bor, blockReader), stagedsync.StageFinishCfg(db, dirs.Tmp, forkValidator), runInTestMode) + stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications, cfg.StateStream, false, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)), stagedsync.StageTxLookupCfg(db, cfg.Prune, dirs.Tmp, controlServer.ChainConfig.Bor, blockReader), stagedsync.StageFinishCfg(db, dirs.Tmp, forkValidator), runInTestMode) } @@ -687,7 +708,9 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config silkworm *silkworm.Silkworm, logger log.Logger) *stagedsync.Sync { return stagedsync.New( cfg.Sync, - stagedsync.StateStages(ctx, stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil), stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, blockWriter), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications.Accumulator, cfg.StateStream, true, cfg.Dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg))), + stagedsync.StateStages(ctx, stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil), + stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, blockWriter), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), + stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications, cfg.StateStream, true, cfg.Dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg))), stagedsync.StateUnwindOrder, nil, /* pruneOrder */ logger, @@ -744,7 +767,7 @@ func NewPolygonSyncStages( config.LoopBlockLimit, ), stagedsync.StageSendersCfg(db, chainConfig, config.Sync, false, config.Dirs.Tmp, config.Prune, blockReader, nil), - stagedsync.StageExecuteBlocksCfg(db, config.Prune, config.BatchSize, chainConfig, consensusEngine, &vm.Config{}, notifications.Accumulator, config.StateStream, false, config.Dirs, blockReader, nil, config.Genesis, config.Sync, SilkwormForExecutionStage(silkworm, config)), + stagedsync.StageExecuteBlocksCfg(db, config.Prune, config.BatchSize, chainConfig, consensusEngine, &vm.Config{}, notifications, config.StateStream, false, config.Dirs, blockReader, nil, config.Genesis, config.Sync, SilkwormForExecutionStage(silkworm, config)), stagedsync.StageTxLookupCfg( db, config.Prune,