Skip to content

Commit

Permalink
Directly sending the transaction to the client instead of the local node
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Feb 18, 2025
1 parent 5b6b2bf commit ac37839
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 31 deletions.
10 changes: 10 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ var NodeFlags = []Flag{
var TXPoolFlags = []Flag{
TxPoolLocalsFlag,
TxPoolNoLocalsFlag,
TxPoolSyncTxWithReturnFlag,
TxPoolJournalFlag,
TxPoolRejournalFlag,
TxPoolPriceLimitFlag,
Expand Down Expand Up @@ -358,6 +359,12 @@ var (
Usage: "Disables price exemptions for locally submitted transactions" + generateEnvDoc(c_TXPoolPrefix+"nolocals"),
}

TxPoolSyncTxWithReturnFlag = Flag{
Name: c_TXPoolPrefix + "sync-tx-with-return",
Value: true,
Usage: "Shares the tx with the sharing client with syncronous return" + generateEnvDoc(c_TXPoolPrefix+"sync-tx-with-return"),
}

TxPoolJournalFlag = Flag{
Name: c_TXPoolPrefix + "journal",
Value: core.DefaultTxPoolConfig.Journal,
Expand Down Expand Up @@ -1141,6 +1148,9 @@ func setTxPool(cfg *core.TxPoolConfig, nodeLocation common.Location) {
if viper.IsSet(TxPoolNoLocalsFlag.Name) {
cfg.NoLocals = viper.GetBool(TxPoolNoLocalsFlag.Name)
}
if viper.IsSet(TxPoolSyncTxWithReturnFlag.Name) {
cfg.SyncTxWithReturn = viper.GetBool(TxPoolSyncTxWithReturnFlag.Name)
}
if viper.IsSet(TxPoolJournalFlag.Name) {
cfg.Journal = viper.GetString(TxPoolJournalFlag.Name)
}
Expand Down
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,8 +1362,8 @@ func (c *Core) ContentFrom(addr common.Address) (types.Transactions, types.Trans
}
return c.sl.txPool.ContentFrom(internal)
}
func (c *Core) SendTxToSharingClients(tx *types.Transaction) {
c.sl.txPool.SendTxToSharingClients(tx)
func (c *Core) SendTxToSharingClients(tx *types.Transaction) error {
return c.sl.txPool.SendTxToSharingClients(tx)
}

func (c *Core) GetRollingFeeInfo() (min, max, avg *big.Int) {
Expand Down
82 changes: 61 additions & 21 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ var (
)

var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats
qiExpirationCheckInterval = 10 * time.Minute // Time interval to check for expired Qi transactions
qiExpirationCheckDivisor = 5 // Check 1/nth of the pool for expired Qi transactions every interval
txSharingPoolTimeout = 200 * time.Millisecond // Time to exit the tx sharing call with client
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats
qiExpirationCheckInterval = 10 * time.Minute // Time interval to check for expired Qi transactions
qiExpirationCheckDivisor = 5 // Check 1/nth of the pool for expired Qi transactions every interval
txSharingPoolTimeout = 1000 * time.Millisecond // Time to exit the tx sharing call with client
)

var (
Expand Down Expand Up @@ -177,10 +177,11 @@ type blockChain interface {

// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
Locals []common.InternalAddress // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal
Locals []common.InternalAddress // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
SyncTxWithReturn bool
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal

PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
Expand Down Expand Up @@ -314,13 +315,14 @@ func (config *TxPoolConfig) sanitize(logger *log.Logger) TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
sharingClientMu sync.RWMutex

currentState *state.StateDB // Current state in the blockchain head
qiGasScalingFactor float64
Expand Down Expand Up @@ -1590,11 +1592,49 @@ func (pool *TxPool) queueTxEvent(tx *types.Transaction) {

// SendTxToSharingClients sends the tx into the pool sharing tx ch and
// if its full logs it
func (pool *TxPool) SendTxToSharingClients(tx *types.Transaction) {
select {
case pool.poolSharingTxCh <- tx:
default:
pool.logger.Warn("pool sharing tx ch is full")
func (pool *TxPool) SendTxToSharingClients(tx *types.Transaction) error {
pool.sharingClientMu.Lock()
defer pool.sharingClientMu.Unlock()

// If there are no tx pool sharing clients just submit to the local pool
if len(pool.config.SharingClientsEndpoints) == 0 || !pool.config.SyncTxWithReturn {
err := pool.AddLocal(tx)
if err != nil {
return err
}
select {
case pool.poolSharingTxCh <- tx:
default:
pool.logger.Warn("pool sharing tx ch is full")
}
return err
} else {
// send to the first client, and then submit to the rest
client := pool.poolSharingClients[0]
ctx, cancel := context.WithTimeout(context.Background(), txSharingPoolTimeout)
defer cancel()
err := client.SendTransactionToPoolSharingClient(ctx, tx)
if err != nil {
pool.logger.WithField("err", err).Error("Error sending transaction to pool sharing client")
}

if len(pool.poolSharingClients) > 1 {
// send to all pool sharing clients
for _, client := range pool.poolSharingClients[1:] {
if client != nil {
go func(*quaiclient.Client, *types.Transaction) {
ctx, cancel := context.WithTimeout(context.Background(), txSharingPoolTimeout)
defer cancel()
sendErr := client.SendTransactionToPoolSharingClient(ctx, tx)
if sendErr != nil {
pool.logger.WithField("err", sendErr).Error("Error sending transaction to pool sharing client")
}
}(client, tx)
}
}
}

return err
}
}

Expand Down
10 changes: 5 additions & 5 deletions internal/quaiapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,8 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c
return common.Hash{}, errors.New("submitTransaction call can only be made on chain processing the state")
}
if tx.Type() == types.QiTxType {
if err := b.SendTx(ctx, tx); err != nil {
// Send the tx to tx pool sharing clients
if err := b.SendTxToSharingClients(tx); err != nil {
return common.Hash{}, err
}
} else {
Expand All @@ -1511,7 +1512,9 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
return common.Hash{}, err
}
if err := b.SendTx(ctx, tx); err != nil {

// Send the tx to tx pool sharing clients
if err := b.SendTxToSharingClients(tx); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
Expand Down Expand Up @@ -1556,9 +1559,6 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, input
if err != nil {
return common.Hash{}, err
}
// Send the tx to tx pool sharing clients
s.b.SendTxToSharingClients(tx)

return SubmitTransaction(ctx, s.b, tx)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type Backend interface {
TxPoolContent() (map[common.InternalAddress]types.Transactions, map[common.InternalAddress]types.Transactions)
TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions)
GetPoolGasPrice() *big.Int
SendTxToSharingClients(tx *types.Transaction)
SendTxToSharingClients(tx *types.Transaction) error
GetRollingFeeInfo() (min, max, avg *big.Int)

// Filter API
Expand Down
4 changes: 2 additions & 2 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ func (b *QuaiAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address)
return b.quai.core.Nonce(addr), nil
}

func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) {
b.quai.core.SendTxToSharingClients(tx)
func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) error {
return b.quai.core.SendTxToSharingClients(tx)
}

func (b *QuaiAPIBackend) GetRollingFeeInfo() (min, max, avg *big.Int) {
Expand Down

0 comments on commit ac37839

Please sign in to comment.