Skip to content

Commit

Permalink
perf: Use Caching in Priority Nonce Mempool for Tx Look ups (backport #…
Browse files Browse the repository at this point in the history
…520) (#524)

* perf: Use Caching in Priority Nonce Mempool for Tx Look ups (#520)

* benchmark contains

* use sender/nonce when caching

* nit

* nits

* nit

(cherry picked from commit 3376dd3)

# Conflicts:
#	block/base/mempool_test.go

* nit

---------

Co-authored-by: David Terpay <[email protected]>
Co-authored-by: David Terpay <[email protected]>
  • Loading branch information
3 people authored Jun 19, 2024
1 parent 72a11dc commit d076b5e
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 45 deletions.
1 change: 0 additions & 1 deletion block/base/lane.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func NewBaseLane(

lane.LaneMempool = NewMempool(
DefaultTxPriority(),
lane.cfg.TxEncoder,
lane.cfg.SignerExtractor,
lane.cfg.MaxTxs,
)
Expand Down
40 changes: 4 additions & 36 deletions block/base/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"

signer_extraction "github.com/skip-mev/block-sdk/v2/adapters/signer_extraction_adapter"
"github.com/skip-mev/block-sdk/v2/block/utils"
)

type (
Expand All @@ -20,7 +19,7 @@ type (
// transactions.
Mempool[C comparable] struct {
// index defines an index of transactions.
index sdkmempool.Mempool
index MempoolInterface

// signerExtractor defines the signer extraction adapter that allows us to
// extract the signer from a transaction.
Expand All @@ -31,19 +30,11 @@ type (
// of two transactions. The index utilizes this struct to order transactions
// in the mempool.
txPriority TxPriority[C]

// txEncoder defines the sdk.Tx encoder that allows us to encode transactions
// to bytes.
txEncoder sdk.TxEncoder

// txCache is a map of all transactions in the mempool. It is used
// to quickly check if a transaction is already in the mempool.
txCache map[string]struct{}
}
)

// NewMempool returns a new Mempool.
func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, extractor signer_extraction.Adapter, maxTx int) *Mempool[C] {
func NewMempool[C comparable](txPriority TxPriority[C], extractor signer_extraction.Adapter, maxTx int) *Mempool[C] {
return &Mempool[C]{
index: NewPriorityMempool(
PriorityNonceMempoolConfig[C]{
Expand All @@ -54,8 +45,6 @@ func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder,
),
extractor: extractor,
txPriority: txPriority,
txEncoder: txEncoder,
txCache: make(map[string]struct{}),
}
}

Expand All @@ -67,17 +56,9 @@ func (cm *Mempool[C]) Priority(ctx sdk.Context, tx sdk.Tx) any {
// Insert inserts a transaction into the mempool.
func (cm *Mempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
if err := cm.index.Insert(ctx, tx); err != nil {
return fmt.Errorf("failed to insert tx into auction index: %w", err)
return fmt.Errorf("failed to insert tx into mempool: %w", err)
}

hash, err := utils.GetTxHash(cm.txEncoder, tx)
if err != nil {
cm.Remove(tx)
return err
}

cm.txCache[hash] = struct{}{}

return nil
}

Expand All @@ -87,13 +68,6 @@ func (cm *Mempool[C]) Remove(tx sdk.Tx) error {
return fmt.Errorf("failed to remove transaction from the mempool: %w", err)
}

hash, err := utils.GetTxHash(cm.txEncoder, tx)
if err != nil {
return fmt.Errorf("failed to get tx hash string: %w", err)
}

delete(cm.txCache, hash)

return nil
}

Expand All @@ -112,13 +86,7 @@ func (cm *Mempool[C]) CountTx() int {

// Contains returns true if the transaction is contained in the mempool.
func (cm *Mempool[C]) Contains(tx sdk.Tx) bool {
hash, err := utils.GetTxHash(cm.txEncoder, tx)
if err != nil {
return false
}

_, ok := cm.txCache[hash]
return ok
return cm.index.Contains(tx)
}

// Compare determines the relative priority of two transactions belonging in the same lane.
Expand Down
39 changes: 37 additions & 2 deletions block/base/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,49 @@ type txGen struct {
amount sdk.Coin
}

var (
numAccounts = 10
numTxsPerAcct = 100
)

func BenchmarkContains(b *testing.B) {
acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), numAccounts)
txc := testutils.CreateTestEncodingConfig().TxConfig

mp := base.NewMempool(
base.DefaultTxPriority(),
signerextraction.NewDefaultAdapter(),
1000,
)

txs := make([]sdk.Tx, numAccounts*numTxsPerAcct)
for i := 0; i < numAccounts; i++ {
for j := 0; j < numTxsPerAcct; j++ {
tx, err := testutils.CreateTx(txc, acct[i], uint64(j), 0, nil, sdk.NewCoin("stake", sdkmath.NewInt(1)))
require.NoError(b, err)
err = mp.Insert(sdk.Context{}, tx)
require.NoError(b, err)
txs[i*numTxsPerAcct+j] = tx
}
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, tx := range txs {
found := mp.Contains(tx)
if !found {
b.Fatalf("tx not found in mempool")
}
}
}
}

func TestMempoolComparison(t *testing.T) {
acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), 2)
txc := testutils.CreateTestEncodingConfig().TxConfig
ctx := testutils.CreateBaseSDKContext(t)
mp := base.NewMempool(
base.DefaultTxPriority(),
txc.TxEncoder(),
signerextraction.NewDefaultAdapter(),
1000,
)
Expand Down Expand Up @@ -102,7 +138,6 @@ func TestMempoolSelect(t *testing.T) {
se := signerextraction.NewDefaultAdapter()
mp := base.NewMempool(
base.DefaultTxPriority(),
txc.TxEncoder(),
se,
1000,
)
Expand Down
1 change: 0 additions & 1 deletion block/base/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func WithMempoolConfigs[C comparable](cfg LaneConfig, txPriority TxPriority[C])
return func(l *BaseLane) {
l.LaneMempool = NewMempool(
txPriority,
cfg.TxEncoder,
cfg.SignerExtractor,
cfg.MaxTxs,
)
Expand Down
28 changes: 27 additions & 1 deletion block/base/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,19 @@ import (
)

var (
_ sdkmempool.Mempool = (*PriorityNonceMempool[int64])(nil)
_ MempoolInterface = (*PriorityNonceMempool[int64])(nil)
_ sdkmempool.Iterator = (*PriorityNonceIterator[int64])(nil)
)

type (
// MempoolInterface defines the interface a mempool should implement.
MempoolInterface interface {
sdkmempool.Mempool

// Contains returns true if the transaction is in the mempool.
Contains(tx sdk.Tx) bool
}

// PriorityNonceMempoolConfig defines the configuration used to configure the
// PriorityNonceMempool.
PriorityNonceMempoolConfig[C comparable] struct {
Expand Down Expand Up @@ -462,6 +470,24 @@ func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error {
return nil
}

// Contains returns true if the transaction is in the mempool.
func (mp *PriorityNonceMempool[C]) Contains(tx sdk.Tx) bool {
signers, err := mp.signerExtractor.GetSigners(tx)
if err != nil {
return false
}
if len(signers) == 0 {
return false
}

sig := signers[0]
nonce := sig.Sequence
sender := sig.Signer.String()

_, ok := mp.scores[txMeta[C]{nonce: nonce, sender: sender}]
return ok
}

func IsEmpty[C comparable](mempool sdkmempool.Mempool) error {
mp := mempool.(*PriorityNonceMempool[C])
if mp.priorityIndex.Len() != 0 {
Expand Down
8 changes: 4 additions & 4 deletions lanes/base/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *BaseTestSuite) TestCompareTxPriority() {
}

func (s *BaseTestSuite) TestInsert() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

s.Run("should be able to insert a transaction", func() {
tx, err := testutils.CreateRandomTx(
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *BaseTestSuite) TestInsert() {
}

func (s *BaseTestSuite) TestRemove() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

s.Run("should be able to remove a transaction", func() {
tx, err := testutils.CreateRandomTx(
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *BaseTestSuite) TestRemove() {

func (s *BaseTestSuite) TestSelect() {
s.Run("should be able to select transactions in the correct order", func() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

tx1, err := testutils.CreateRandomTx(
s.encodingConfig.TxConfig,
Expand Down Expand Up @@ -261,7 +261,7 @@ func (s *BaseTestSuite) TestSelect() {
})

s.Run("should be able to select a single transaction", func() {
mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3)
mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3)

tx1, err := testutils.CreateRandomTx(
s.encodingConfig.TxConfig,
Expand Down

0 comments on commit d076b5e

Please sign in to comment.