Skip to content

Commit

Permalink
Ante imprvs (berachain#1448)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

- **New Features**
- Enhanced transaction validation process for improved network
reliability.
- Introduced new telemetry metrics to monitor transaction pool
performance.

- **Improvements**
- Improved handling of pending block states to enhance client
compatibility.

- **Refactor**
- Streamlined transaction pool interaction with consensus algorithm
telemetry.

- **Documentation**
- Updated inline documentation to reflect new methods and functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Po Bera <[email protected]>
  • Loading branch information
2 people authored and hoank101 committed Jan 20, 2024
1 parent ce15df3 commit 67f21ee
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 793 deletions.
4 changes: 2 additions & 2 deletions cosmos/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func RecommendedCometBFTConfig() *cmtcfg.Config {
cfg.Mempool.Recheck = true
cfg.Mempool.Type = "flood"

cfg.P2P.MaxNumInboundPeers = 10
cfg.P2P.MaxNumOutboundPeers = 15
cfg.P2P.MaxNumInboundPeers = 40
cfg.P2P.MaxNumOutboundPeers = 20

cfg.TxIndex.Indexer = "null"

Expand Down
48 changes: 38 additions & 10 deletions cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
)

Expand All @@ -40,6 +39,9 @@ import (
func (m *Mempool) AnteHandle(
ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler,
) (sdk.Context, error) {
// The transaction put into this function by CheckTx
// is a transaction from CometBFT mempool.
telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs)
msgs := tx.GetMsgs()

// TODO: Record the time it takes to build a payload.
Expand All @@ -52,6 +54,7 @@ func (m *Mempool) AnteHandle(
ctx.BlockTime().Unix(), ethTx,
); shouldEject {
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
}
Expand All @@ -67,20 +70,45 @@ func (m *Mempool) shouldEjectFromCometMempool(
if tx == nil {
return false
}
txHash := tx.Hash()

// 1. If the transaction has been included in a block.
// 2. If the transaction has been in the mempool for longer than the configured timeout.
// 3. If the transaction's gas params are over the configured limit.
includedInBlock := m.includedCanonicalChain(txHash)
// First check things that are stateless.
if m.validateStateless(tx, currentTime) {
return true
}

// Then check for things that are stateful.
return m.validateStateful(tx)
}

// validateStateless returns whether the tx of the given hash is stateless.
func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool {
txHash := tx.Hash()
// 1. If the transaction has been in the mempool for longer than the configured timeout.
// 2. If the transaction's gas params are less than or equal to the configured limit.
expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime
priceOverLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0
priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0

return includedInBlock || expired || priceOverLimit
if expired {
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectExpiredTx)
}
if priceLeLimit {
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

return expired || priceLeLimit
}

// includedCanonicalChain returns whether the tx of the given hash is included in the canonical
// Eth chain.
func (m *Mempool) includedCanonicalChain(hash common.Hash) bool {
return m.chain.GetTransactionLookup(hash) != nil
func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool {
// // 1. If the transaction has been included in a block.
// signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID)
// if _, err := ethtypes.Sender(signer, tx); err != nil {
// return true
// }

// tx.Nonce() <
included := m.chain.GetTransactionLookup(tx.Hash()) != nil
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
return included
}
20 changes: 20 additions & 0 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
txChanSize = 4096
maxRetries = 5
retryDelay = 50 * time.Millisecond
statPeriod = 60 * time.Second
)

// SdkTx is used to generate mocks.
Expand All @@ -52,6 +53,7 @@ type SdkTx interface {
// TxSubProvider.
type TxSubProvider interface {
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
Stats() (int, int)
}

// TxSerializer provides an interface to Serialize Geth Transactions to Bytes (via sdk.Tx).
Expand Down Expand Up @@ -121,6 +123,7 @@ func (h *handler) Start() error {
}
go h.mainLoop()
go h.failedLoop() // Start the retry policy
go h.statLoop()
return nil
}

Expand Down Expand Up @@ -152,6 +155,7 @@ func (h *handler) mainLoop() {
case err = <-h.txsSub.Err():
h.stopCh <- struct{}{}
case event := <-h.txsCh:
telemetry.IncrCounter(float32(len(event.Txs)), MetricKeyCometLocalTxs)
h.broadcastTransactions(event.Txs)
}
}
Expand All @@ -168,6 +172,7 @@ func (h *handler) failedLoop() {
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry)
h.broadcastTransaction(failed.tx, failed.retries-1)
}

Expand All @@ -176,6 +181,21 @@ func (h *handler) failedLoop() {
}
}

func (h *handler) statLoop() {
ticker := time.NewTicker(statPeriod)
defer ticker.Stop()
for {
select {
case <-h.stopCh:
return
case <-ticker.C:
pending, queue := h.txPool.Stats()
telemetry.SetGauge(float32(pending), MetricKeyTxPoolPending)
telemetry.SetGauge(float32(queue), MetricKeyTxPoolQueue)
}
}
}

// Running returns true if the handler is running.
func (h *handler) Running() bool {
return h.running.Load()
Expand Down
2 changes: 2 additions & 0 deletions cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/berachain/polaris/eth/core"
"github.com/berachain/polaris/lib/utils"

"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"

Expand Down Expand Up @@ -130,6 +131,7 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
// Handle case where a node broadcasts to itself, we don't want it to fail CheckTx.
if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) &&
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
return nil
}
return errs[0]
Expand Down
4 changes: 4 additions & 0 deletions cosmos/runtime/txpool/mocks/tx_sub_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions cosmos/runtime/txpool/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,27 @@

package txpool

// Mempool metrics.
const (
MetricKeyMempoolFull = "polaris_cometbft_mempool_full"
MetricKeyCometPrefix = "polaris_cometbft_"

MetricKeyCometPoolTxs = "polaris_cometbft_comet_pool_txs"
MetricKeyCometLocalTxs = "polaris_cometbft_local_txs"

MetricKeyTimeShouldEject = "polaris_cometbft_time_should_eject"
MetricKeyAnteEjectedTxs = "polaris_cometbft_ante_ejected_txs"
MetricKeyAnteShouldEjectInclusion = "polaris_cometbft_ante_should_eject_included"
MetricKeyAnteShouldEjectExpiredTx = "polaris_cometbft_ante_should_eject_expired"
MetricKeyAnteShouldEjectPriceLimit = "polaris_cometbft_ante_should_eject_price_limit"

MetricKeyTxPoolPending = "polaris_cometbft_txpool_pending"
MetricKeyTxPoolQueue = "polaris_cometbft_txpool_queue"

MetricKeyMempoolFull = "polaris_cometbft_mempool_full"
MetricKeyMempoolSize = "polaris_cometbft_mempool_size"
MetricKeyMempoolKnownTxs = "polaris_cometbft_mempool_known_txs"

MetricKeyBroadcastTxs = "polaris_cometbft_broadcast_txs"
MetricKeyBroadcastFailure = "polaris_cometbft_broadcast_failure"
MetricKeyTimeShouldEject = "polaris_cometbft_time_should_eject"
MetricKeyBroadcastRetry = "polaris_cometbft_broadcast_retry"
)
25 changes: 13 additions & 12 deletions eth/polar/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,12 @@ func (b *backend) HeaderByNumber(
case rpc.PendingBlockNumber:
// TODO: handle "miner" stuff, Pending block is only known by the miner
block := b.polar.miner.PendingBlock()
if block == nil {
return nil, nil //nolint:nilnil // it's ok.
if block != nil {
return block.Header(), nil
}
return block.Header(), nil
// To improve client compatibility we return the latest state if
// pending is not available.
return b.polar.blockchain.CurrentHeader(), nil
case rpc.LatestBlockNumber:
return b.polar.blockchain.CurrentHeader(), nil
case rpc.FinalizedBlockNumber:
Expand Down Expand Up @@ -257,6 +259,14 @@ func (b *backend) BlockByNumber(
switch number {
case rpc.PendingBlockNumber:
block := b.polar.miner.PendingBlock()
if block == nil {
// To improve client compatibility we return the latest state if
// pending is not available.
header := b.polar.blockchain.CurrentBlock()
return b.polar.blockchain.GetBlock(
header.Hash(), header.Number.Uint64(),
), nil
}
return block, nil
// Otherwise resolve and return the block
case rpc.LatestBlockNumber:
Expand Down Expand Up @@ -320,15 +330,6 @@ func (b *backend) StateAndHeaderByNumber(
ctx context.Context,
number rpc.BlockNumber,
) (state.StateDB, *ethtypes.Header, error) {
// Pending state is only known by the miner
if number == rpc.PendingBlockNumber {
block, state := b.polar.miner.Pending()
if block == nil || state == nil {
return nil, nil, errors.New("pending state is not available")
}
return state, block.Header(), nil
}

// Otherwise resolve the block number and return its state
header, err := b.HeaderByNumber(ctx, number)
if err != nil {
Expand Down
Loading

0 comments on commit 67f21ee

Please sign in to comment.