Skip to content

Commit

Permalink
Added tx filtering in worker so an account can have multiple txs per …
Browse files Browse the repository at this point in the history
…block
  • Loading branch information
jdowning100 committed Feb 18, 2025
1 parent 05ce3fa commit 13dfca0
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 72 deletions.
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,8 +1342,8 @@ func (c *Core) AddRemotes(txs types.Transactions) {
}
}

func (c *Core) TxPoolPending(enforceTips bool) (map[common.AddressBytes]types.Transactions, error) {
return c.sl.txPool.TxPoolPending(enforceTips)
func (c *Core) TxPoolPending() (map[common.AddressBytes]types.Transactions, error) {
return c.sl.txPool.TxPoolPending()
}

func (c *Core) Get(hash common.Hash) *types.Transaction {
Expand Down
2 changes: 1 addition & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (pool *TxPool) GetTxsFromBroadcastSet(hash common.Hash) (types.Transactions
// The enforceTips parameter can be used to do an extra filtering on the pending
// transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment.
func (pool *TxPool) TxPoolPending(enforceTips bool) (map[common.AddressBytes]types.Transactions, error) {
func (pool *TxPool) TxPoolPending() (map[common.AddressBytes]types.Transactions, error) {
pool.mu.RLock()
defer pool.mu.RUnlock()

Expand Down
106 changes: 40 additions & 66 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package types

import (
"bytes"
"container/heap"
"errors"
"io"
"math/big"
Expand Down Expand Up @@ -982,9 +981,7 @@ func (s *TxByPriceAndTime) Pop() interface{} {
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type TransactionsByPriceAndNonce struct {
txs map[common.AddressBytes]Transactions // Per account nonce-sorted list of transactions
heads TxByPriceAndTime // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
transactions TxByPriceAndTime // Next transaction sorted by gas price and nonce
}

// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
Expand All @@ -993,100 +990,77 @@ type TransactionsByPriceAndNonce struct {
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewTransactionsByPriceAndNonce(signer Signer, qiTxs []*TxWithMinerFee, txs map[common.AddressBytes]Transactions) *TransactionsByPriceAndNonce {
// Initialize a price and received time based heap with the head transactions
heads := make(TxByPriceAndTime, 0, len(txs))
// Initialize a price and received time based slice with all valid transactions
sortedTransactions := make(TxByPriceAndTime, 0, len(txs))

quaiTxCount := 0
quaiTxLoop:
for from, accTxs := range txs {
acc, err := Sender(signer, accTxs[0])
if err != nil {
continue
}
wrapped, err := NewTxWithMinerFee(accTxs[0], nil, time.Time{})
// Remove transaction if sender doesn't match from, or if wrapping fails.
if acc.Bytes20() != from || err != nil {
delete(txs, from)
continue
}
quaiTxCount++
heads = append(heads, wrapped)
txs[from] = accTxs[1:]
if quaiTxCount > c_MaxTxForSorting {
break
accIncluded := false
largestAcceptableGasPrice := common.Big0
for _, tx := range accTxs {
// Rule: all transactions in a block must be sorted by strictly ascending nonce and monotonically descending gas price
// Filter out any tx that has a bigger gas price than the previously included tx for a given account
if tx.GasPrice().Cmp(largestAcceptableGasPrice) <= 0 || !accIncluded {
acc, err := Sender(signer, tx)
if err != nil {
break
}
wrapped, err := NewTxWithMinerFee(tx, nil, time.Time{})
// Remove transaction if sender doesn't match from, or if wrapping fails.
if acc.Bytes20() != from || err != nil {
break
}
quaiTxCount++
sortedTransactions = append(sortedTransactions, wrapped)
accIncluded = true
largestAcceptableGasPrice = tx.GasPrice()
if quaiTxCount > c_MaxTxForSorting {
break quaiTxLoop
}
} else {
break // Skip the rest of the transactions for this account because they can't be included
}
}

}
qiTxCount := 0
for _, qiTx := range qiTxs {
qiTxCount++
heads = append(heads, qiTx)
sortedTransactions = append(sortedTransactions, qiTx)
if qiTxCount > c_MaxTxForSorting {
break
}
}

// Sort Eligible Transactions by Gas Used in Descending Order
sort.Slice(heads, func(i, j int) bool {
return heads[i].MinerFee().Cmp(heads[j].MinerFee()) > 0
// Sort Eligible Transactions by Gas Price in Descending Order
sort.Slice(sortedTransactions, func(i, j int) bool {
return sortedTransactions[i].MinerFee().Cmp(sortedTransactions[j].MinerFee()) > 0
})

// Assemble and return the transaction set
return &TransactionsByPriceAndNonce{
txs: txs,
heads: heads,
signer: signer,
transactions: sortedTransactions,
}
}

// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
return nil
}
return t.heads[0].tx
}

func (t *TransactionsByPriceAndNonce) PeekAndGetFee() *TxWithMinerFee {
if len(t.heads) == 0 {
return nil
}
return t.heads[0]
}

func (t *TransactionsByPriceAndNonce) GetFee() *big.Int {
if len(t.heads) == 0 {
if len(t.transactions) == 0 {
return nil
}
return t.heads[0].minerFee
}

// Shift replaces the current best head with the next one from the same account.
func (t *TransactionsByPriceAndNonce) Shift(acc common.AddressBytes) {
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
if wrapped, err := NewTxWithMinerFee(txs[0], nil, time.Time{}); err == nil {
t.heads[0], t.txs[acc] = wrapped, txs[1:]
heap.Fix(&t.heads, 0)
return
}
}
heap.Pop(&t.heads)
return t.transactions[0].tx
}

// Pop the first transaction without sorting
func (t *TransactionsByPriceAndNonce) PopNoSort() {
if len(t.heads) > 1 {
t.heads = t.heads[1:]
if len(t.transactions) > 1 {
t.transactions = t.transactions[1:]
} else {
t.heads = make(TxByPriceAndTime, 0)
t.transactions = make(TxByPriceAndTime, 0)
}
}

// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *TransactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down
2 changes: 1 addition & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2081,7 +2081,7 @@ func (w *worker) fillTransactions(env *environment, primeTerminus *types.WorkObj
return nil
}

pending, err := w.txPool.TxPoolPending(false)
pending, err := w.txPool.TxPoolPending()
if err != nil {
w.logger.WithField("err", err).Error("Failed to get pending transactions")
return fmt.Errorf("failed to get pending transactions: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/quaiapi/quai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *PublicBlockChainQuaiAPI) GetBalance(ctx context.Context, address common
}
nodeCtx := s.b.NodeCtx()
if nodeCtx != common.ZONE_CTX {
return nil, errors.New("getBalance call can only be made in zone chain")
return nil, fmt.Errorf("getBalance call can only be made in zone chain, current context is %d", nodeCtx)
}
if !s.b.ProcessingState() {
return nil, errors.New("getBalance call can only be made on chain processing the state")
Expand Down
2 changes: 1 addition & 1 deletion quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (b *QuaiAPIBackend) GetPoolTransactions() (types.Transactions, error) {
if nodeCtx != common.ZONE_CTX {
return nil, errors.New("getPoolTransactions can only be called in zone chain")
}
pending, err := b.quai.core.TxPoolPending(false)
pending, err := b.quai.core.TxPoolPending()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 13dfca0

Please sign in to comment.