Skip to content

Commit

Permalink
e3: support logs subscriptions (#11949)
Browse files Browse the repository at this point in the history
- E3 doesn't store receipts in db. So, let's store some limited amount
in RAM. Maybe later will find better option.
- need support unwind of receipts (notify their info)
- need send notification after `rwtx.Commit` (or user will recv
notification, but can't request new data by RPC)
  • Loading branch information
AskAlexSharov authored Sep 12, 2024
1 parent 3b7f6b9 commit b0814e1
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 176 deletions.
4 changes: 2 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
blockSnapBuildSema := semaphore.NewWeighted(int64(dbg.BuildSnapshotAllowance))
agg.SetSnapshotBuildSema(blockSnapBuildSema)

notifications := &shards.Notifications{}
notifications := shards.NewNotifications(nil)
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, chainConfig, notifications.Events, blockSnapBuildSema, logger)

var (
Expand Down Expand Up @@ -1504,7 +1504,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
sentryControlServer.ChainConfig,
sentryControlServer.Engine,
&vm.Config{},
notifications.Accumulator,
notifications,
cfg.StateStream,
/*stateStream=*/ false,
dirs,
Expand Down
4 changes: 2 additions & 2 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
must(batchSize.UnmarshalText([]byte(batchSizeStr)))

stateStages.DisableStages(stages.Snapshots, stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders)
changesAcc := shards.NewAccumulator()
notifications := shards.NewNotifications(nil)

genesis := core.GenesisBlockByChainName(chain)
syncCfg := ethconfig.Defaults.Sync
syncCfg.ExecWorkerCount = int(workers)
syncCfg.ReconWorkerCount = int(reconWorkers)

br, _ := blocksIO(db, logger1)
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, changesAcc, false, true, dirs, br, nil, genesis, syncCfg, nil)
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, genesis, syncCfg, nil)

execUntilFunc := func(execToBlock uint64) stagedsync.ExecFunc {
return func(badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, txc wrap.TxContainer, logger log.Logger) error {
Expand Down
15 changes: 6 additions & 9 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ type Ethereum struct {

downloaderClient protodownloader.DownloaderClient

notifications *shards.Notifications
notifications *shards.Notifications

unsubscribeEthstat func()

waitForStageLoopStop chan struct{}
Expand Down Expand Up @@ -283,11 +284,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
etherbase: config.Miner.Etherbase,
waitForStageLoopStop: make(chan struct{}),
waitForMiningStop: make(chan struct{}),
notifications: &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
},
logger: logger,
logger: logger,
stopNode: func() error {
return stack.Close()
},
Expand Down Expand Up @@ -360,7 +357,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}

kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger)
backend.notifications.StateChangesConsumer = kvRPC
backend.notifications = shards.NewNotifications(kvRPC)
backend.kvRPC = kvRPC

backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice)
Expand Down Expand Up @@ -689,7 +686,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
chainConfig,
backend.engine,
&vm.Config{},
backend.notifications.Accumulator,
backend.notifications,
config.StateStream,
/*stateStream=*/ false,
dirs,
Expand Down Expand Up @@ -731,7 +728,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
chainConfig,
backend.engine,
&vm.Config{},
backend.notifications.Accumulator,
backend.notifications,
config.StateStream,
/*stateStream=*/ false,
dirs,
Expand Down
7 changes: 5 additions & 2 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func ExecV3(ctx context.Context,
shouldReportToTxPool := maxBlockNum-blockNum <= 64
var accumulator *shards.Accumulator
if shouldReportToTxPool {
accumulator = cfg.accumulator
accumulator = cfg.notifications.Accumulator
if accumulator == nil {
accumulator = shards.NewAccumulator()
}
Expand Down Expand Up @@ -872,7 +872,10 @@ Loop:
blobGasUsed += txTask.Tx.GetBlobGas()
}
if txTask.Final {
checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts
if !isMining && !inMemExec && !execStage.CurrentSyncCycle.IsInitialCycle {
cfg.notifications.RecentLogs.Add(receipts)
}
checkReceipts := !cfg.vmConfig.StatelessExec && chainConfig.IsByzantium(txTask.BlockNum) && !cfg.vmConfig.NoReceipts && !isMining
if txTask.BlockNum > 0 && !skipPostEvaluation { //Disable check for genesis. Maybe need somehow improve it in future - to satisfy TestExecutionSpec
if err := core.BlockPostValidation(usedGas, blobGasUsed, checkReceipts, receipts, txTask.Header, isMining); err != nil {
return fmt.Errorf("%w, txnIdx=%d, %v", consensus.ErrInvalidBlock, txTask.TxIndex, err) //same as in stage_exec.go
Expand Down
30 changes: 15 additions & 15 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ type headerDownloader interface {
}

type ExecuteBlockCfg struct {
db kv.RwDB
batchSize datasize.ByteSize
prune prune.Mode
chainConfig *chain.Config
engine consensus.Engine
vmConfig *vm.Config
badBlockHalt bool
stateStream bool
accumulator *shards.Accumulator
blockReader services.FullBlockReader
hd headerDownloader
author *common.Address
db kv.RwDB
batchSize datasize.ByteSize
prune prune.Mode
chainConfig *chain.Config
notifications *shards.Notifications
engine consensus.Engine
vmConfig *vm.Config
badBlockHalt bool
stateStream bool
blockReader services.FullBlockReader
hd headerDownloader
author *common.Address
// last valid number of the stage

dirs datadir.Dirs
Expand All @@ -97,7 +97,7 @@ func StageExecuteBlocksCfg(
chainConfig *chain.Config,
engine consensus.Engine,
vmConfig *vm.Config,
accumulator *shards.Accumulator,
notifications *shards.Notifications,
stateStream bool,
badBlockHalt bool,

Expand All @@ -120,7 +120,7 @@ func StageExecuteBlocksCfg(
engine: engine,
vmConfig: vmConfig,
dirs: dirs,
accumulator: accumulator,
notifications: notifications,
stateStream: stateStream,
badBlockHalt: badBlockHalt,
blockReader: blockReader,
Expand Down Expand Up @@ -357,7 +357,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c
func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) error {
var accumulator *shards.Accumulator
if cfg.stateStream && s.BlockNumber-u.UnwindPoint < stateStreamLimit {
accumulator = cfg.accumulator
accumulator = cfg.notifications.Accumulator

hash, err := cfg.blockReader.CanonicalHash(ctx, txc.Tx, u.UnwindPoint)
if err != nil {
Expand Down
115 changes: 6 additions & 109 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,17 @@
package stagedsync

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"time"

"github.com/erigontech/erigon-lib/kv/dbutils"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/gointerfaces"
remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto"
types2 "github.com/erigontech/erigon-lib/gointerfaces/typesproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
bortypes "github.com/erigontech/erigon/polygon/bor/types"
"github.com/erigontech/erigon/turbo/engineapi/engine_helpers"
"github.com/erigontech/erigon/turbo/services"

"github.com/erigontech/erigon/core/rawdb"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/ethdb/cbor"
"github.com/erigontech/erigon/params"
"github.com/erigontech/erigon/turbo/engineapi/engine_helpers"
)

type FinishCfg struct {
Expand Down Expand Up @@ -142,41 +130,24 @@ func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context)
return nil
}

func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishStageAfterSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx, logger log.Logger, blockReader services.FullBlockReader) error {
// [from,to)
func NotifyNewHeaders(ctx context.Context, notifyFrom, notifyTo uint64, notifier ChainEventNotifier, tx kv.Tx, logger log.Logger) error {
t := time.Now()
if notifier == nil {
logger.Trace("RPC Daemon notification channel not set. No headers notifications will be sent")
return nil
}
// Notify all headers we have (either canonical or not) in a maximum range span of 1024
var notifyFrom uint64
var isUnwind bool
if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync {
notifyFrom = *unwindTo
isUnwind = true
} else {
heightSpan := finishStageAfterSync - finishStageBeforeSync
if heightSpan > 1024 {
heightSpan = 1024
}
notifyFrom = finishStageAfterSync - heightSpan
}
notifyFrom++

var notifyTo = notifyFrom
var notifyToHash libcommon.Hash
var headersRlp [][]byte
if err := tx.ForEach(kv.HeaderCanonical, hexutility.EncodeTs(notifyFrom), func(k, hash []byte) (err error) {
if len(hash) == 0 {
return nil
}
blockNum := binary.BigEndian.Uint64(k)
if blockNum > finishStageAfterSync { //[from,to)
if blockNum >= notifyTo { //[from,to)
return nil
}
notifyTo = blockNum
notifyToHash = libcommon.BytesToHash(hash)
headerRLP := rawdb.ReadHeaderRLP(tx, notifyToHash, notifyTo)
headerRLP := rawdb.ReadHeaderRLP(tx, libcommon.BytesToHash(hash), blockNum)
if headerRLP != nil {
headersRlp = append(headersRlp, libcommon.CopyBytes(headerRLP))
}
Expand All @@ -189,81 +160,7 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS
if len(headersRlp) > 0 {
notifier.OnNewHeader(headersRlp)
headerTiming := time.Since(t)

t = time.Now()
if notifier.HasLogSubsriptions() {
logs, err := ReadLogs(tx, notifyFrom, isUnwind, blockReader)
if err != nil {
return err
}
notifier.OnLogs(logs)
}
logTiming := time.Since(t)
logger.Debug("RPC Daemon notified of new headers", "from", notifyFrom-1, "to", notifyTo, "amount", len(headersRlp), "hash", notifyToHash, "header sending", headerTiming, "log sending", logTiming)
logger.Debug("RPC Daemon notified of new headers", "from", notifyFrom-1, "to", notifyTo, "amount", len(headersRlp), "header sending", headerTiming)
}
return nil
}

func ReadLogs(tx kv.Tx, from uint64, isUnwind bool, blockReader services.FullBlockReader) ([]*remote.SubscribeLogsReply, error) {
logs, err := tx.Cursor(kv.Log)
if err != nil {
return nil, err
}
defer logs.Close()
reply := make([]*remote.SubscribeLogsReply, 0)
reader := bytes.NewReader(nil)

var prevBlockNum uint64
var block *types.Block
var logIndex uint64
for k, v, err := logs.Seek(dbutils.LogKey(from, 0)); k != nil; k, v, err = logs.Next() {
if err != nil {
return nil, err
}
blockNum := binary.BigEndian.Uint64(k[:8])
if block == nil || blockNum != prevBlockNum {
logIndex = 0
prevBlockNum = blockNum
if block, err = blockReader.BlockByNumber(context.Background(), tx, blockNum); err != nil {
return nil, err
}
}

txIndex := uint64(binary.BigEndian.Uint32(k[8:]))

var txHash libcommon.Hash

// bor transactions are at the end of the bodies transactions (added manually but not actually part of the block)
if txIndex == uint64(len(block.Transactions())) {
txHash = bortypes.ComputeBorTxHash(blockNum, block.Hash())
} else {
txHash = block.Transactions()[txIndex].Hash()
}

var ll types.Logs
reader.Reset(v)
if err := cbor.Unmarshal(&ll, reader); err != nil {
return nil, fmt.Errorf("receipt unmarshal failed: %w, blocl=%d", err, blockNum)
}
for _, l := range ll {
r := &remote.SubscribeLogsReply{
Address: gointerfaces.ConvertAddressToH160(l.Address),
BlockHash: gointerfaces.ConvertHashToH256(block.Hash()),
BlockNumber: blockNum,
Data: l.Data,
LogIndex: logIndex,
Topics: make([]*types2.H256, 0, len(l.Topics)),
TransactionHash: gointerfaces.ConvertHashToH256(txHash),
TransactionIndex: txIndex,
Removed: isUnwind,
}
logIndex++
for _, topic := range l.Topics {
r.Topics = append(r.Topics, gointerfaces.ConvertHashToH256(topic))
}
reply = append(reply, r)
}
}

return reply, nil
}
6 changes: 2 additions & 4 deletions turbo/engineapi/engine_helpers/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,8 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
var txc wrap.TxContainer
txc.Tx = tx
txc.Doms = fv.sharedDom
fv.extendingForkNotifications = &shards.Notifications{
Events: shards.NewEvents(),
Accumulator: shards.NewAccumulator(),
}

fv.extendingForkNotifications = shards.NewNotifications(nil)
return fv.validateAndStorePayload(txc, header, body, unwindPoint, headersChain, bodiesChain, fv.extendingForkNotifications)
}

Expand Down
Loading

0 comments on commit b0814e1

Please sign in to comment.