From a408b310497ce6906c41eee5ab8197c35c61f148 Mon Sep 17 00:00:00 2001 From: gop Date: Tue, 18 Feb 2025 00:59:56 -0600 Subject: [PATCH] Directly sending the transaction to the client instead of the local node --- cmd/utils/flags.go | 8 ++++ core/core.go | 4 +- core/tx_pool.go | 82 +++++++++++++++++++++++++++---------- internal/quaiapi/api.go | 10 ++--- internal/quaiapi/backend.go | 2 +- quai/api_backend.go | 4 +- 6 files changed, 79 insertions(+), 31 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 198dccfc1d..19ae54bd95 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -126,6 +126,7 @@ var NodeFlags = []Flag{ var TXPoolFlags = []Flag{ TxPoolLocalsFlag, TxPoolNoLocalsFlag, + TxPoolSyncTxWithReturnFlag, TxPoolJournalFlag, TxPoolRejournalFlag, TxPoolPriceLimitFlag, @@ -358,6 +359,12 @@ var ( Usage: "Disables price exemptions for locally submitted transactions" + generateEnvDoc(c_TXPoolPrefix+"nolocals"), } + TxPoolSyncTxWithReturnFlag = Flag{ + Name: c_TXPoolPrefix + "sync-tx-with-return", + Value: true, + Usage: "Shares the tx with the sharing client with syncronous return (also bypasses local pool, only use it with combination of sharing clients)" + generateEnvDoc(c_TXPoolPrefix+"sync-tx-with-return"), + } + TxPoolJournalFlag = Flag{ Name: c_TXPoolPrefix + "journal", Value: core.DefaultTxPoolConfig.Journal, @@ -1141,6 +1148,7 @@ func setTxPool(cfg *core.TxPoolConfig, nodeLocation common.Location) { if viper.IsSet(TxPoolNoLocalsFlag.Name) { cfg.NoLocals = viper.GetBool(TxPoolNoLocalsFlag.Name) } + cfg.SyncTxWithReturn = viper.GetBool(TxPoolSyncTxWithReturnFlag.Name) if viper.IsSet(TxPoolJournalFlag.Name) { cfg.Journal = viper.GetString(TxPoolJournalFlag.Name) } diff --git a/core/core.go b/core/core.go index 3718541115..fa81efc152 100644 --- a/core/core.go +++ b/core/core.go @@ -1362,8 +1362,8 @@ func (c *Core) ContentFrom(addr common.Address) (types.Transactions, types.Trans } return c.sl.txPool.ContentFrom(internal) } -func (c *Core) SendTxToSharingClients(tx *types.Transaction) { - c.sl.txPool.SendTxToSharingClients(tx) +func (c *Core) SendTxToSharingClients(tx *types.Transaction) error { + return c.sl.txPool.SendTxToSharingClients(tx) } func (c *Core) GetRollingFeeInfo() (min, max, avg *big.Int) { diff --git a/core/tx_pool.go b/core/tx_pool.go index 30b83c539a..af7cbaac47 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -104,11 +104,11 @@ var ( ) var ( - evictionInterval = time.Minute // Time interval to check for evictable transactions - statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats - qiExpirationCheckInterval = 10 * time.Minute // Time interval to check for expired Qi transactions - qiExpirationCheckDivisor = 5 // Check 1/nth of the pool for expired Qi transactions every interval - txSharingPoolTimeout = 200 * time.Millisecond // Time to exit the tx sharing call with client + evictionInterval = time.Minute // Time interval to check for evictable transactions + statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats + qiExpirationCheckInterval = 10 * time.Minute // Time interval to check for expired Qi transactions + qiExpirationCheckDivisor = 5 // Check 1/nth of the pool for expired Qi transactions every interval + txSharingPoolTimeout = 2 * time.Second // Time to exit the tx sharing call with client ) var ( @@ -177,10 +177,11 @@ type blockChain interface { // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { - Locals []common.InternalAddress // Addresses that should be treated by default as local - NoLocals bool // Whether local transaction handling should be disabled - Journal string // Journal of local transactions to survive node restarts - Rejournal time.Duration // Time interval to regenerate the local transaction journal + Locals []common.InternalAddress // Addresses that should be treated by default as local + NoLocals bool // Whether local transaction handling should be disabled + SyncTxWithReturn bool + Journal string // Journal of local transactions to survive node restarts + Rejournal time.Duration // Time interval to regenerate the local transaction journal PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -314,13 +315,14 @@ func (config *TxPoolConfig) sanitize(logger *log.Logger) TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - scope event.SubscriptionScope - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + scope event.SubscriptionScope + signer types.Signer + mu sync.RWMutex + sharingClientMu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head qiGasScalingFactor float64 @@ -1590,11 +1592,49 @@ func (pool *TxPool) queueTxEvent(tx *types.Transaction) { // SendTxToSharingClients sends the tx into the pool sharing tx ch and // if its full logs it -func (pool *TxPool) SendTxToSharingClients(tx *types.Transaction) { - select { - case pool.poolSharingTxCh <- tx: - default: - pool.logger.Warn("pool sharing tx ch is full") +func (pool *TxPool) SendTxToSharingClients(tx *types.Transaction) error { + pool.sharingClientMu.Lock() + defer pool.sharingClientMu.Unlock() + + // If there are no tx pool sharing clients just submit to the local pool + if len(pool.config.SharingClientsEndpoints) == 0 || !pool.config.SyncTxWithReturn { + err := pool.AddLocal(tx) + if err != nil { + return err + } + select { + case pool.poolSharingTxCh <- tx: + default: + pool.logger.Warn("pool sharing tx ch is full") + } + return err + } else { + // send to the first client, and then submit to the rest + client := pool.poolSharingClients[0] + ctx, cancel := context.WithTimeout(context.Background(), txSharingPoolTimeout) + defer cancel() + err := client.SendTransactionToPoolSharingClient(ctx, tx) + if err != nil { + pool.logger.WithField("err", err).Error("Error sending transaction to pool sharing client") + } + + if len(pool.poolSharingClients) > 1 { + // send to all pool sharing clients + for _, client := range pool.poolSharingClients[1:] { + if client != nil { + go func(*quaiclient.Client, *types.Transaction) { + ctx, cancel := context.WithTimeout(context.Background(), txSharingPoolTimeout) + defer cancel() + sendErr := client.SendTransactionToPoolSharingClient(ctx, tx) + if sendErr != nil { + pool.logger.WithField("err", sendErr).Error("Error sending transaction to pool sharing client") + } + }(client, tx) + } + } + } + + return err } } diff --git a/internal/quaiapi/api.go b/internal/quaiapi/api.go index 78b1b302f9..372453cc74 100644 --- a/internal/quaiapi/api.go +++ b/internal/quaiapi/api.go @@ -1502,7 +1502,8 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c return common.Hash{}, errors.New("submitTransaction call can only be made on chain processing the state") } if tx.Type() == types.QiTxType { - if err := b.SendTx(ctx, tx); err != nil { + // Send the tx to tx pool sharing clients + if err := b.SendTxToSharingClients(tx); err != nil { return common.Hash{}, err } } else { @@ -1511,7 +1512,9 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil { return common.Hash{}, err } - if err := b.SendTx(ctx, tx); err != nil { + + // Send the tx to tx pool sharing clients + if err := b.SendTxToSharingClients(tx); err != nil { return common.Hash{}, err } // Print a log with full tx details for manual investigations and interventions @@ -1556,9 +1559,6 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, input if err != nil { return common.Hash{}, err } - // Send the tx to tx pool sharing clients - s.b.SendTxToSharingClients(tx) - return SubmitTransaction(ctx, s.b, tx) } diff --git a/internal/quaiapi/backend.go b/internal/quaiapi/backend.go index de4fd68cb1..c0cedd3528 100644 --- a/internal/quaiapi/backend.go +++ b/internal/quaiapi/backend.go @@ -140,7 +140,7 @@ type Backend interface { TxPoolContent() (map[common.InternalAddress]types.Transactions, map[common.InternalAddress]types.Transactions) TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) GetPoolGasPrice() *big.Int - SendTxToSharingClients(tx *types.Transaction) + SendTxToSharingClients(tx *types.Transaction) error GetRollingFeeInfo() (min, max, avg *big.Int) // Filter API diff --git a/quai/api_backend.go b/quai/api_backend.go index 0a23f7f8c8..6bb696987f 100644 --- a/quai/api_backend.go +++ b/quai/api_backend.go @@ -402,8 +402,8 @@ func (b *QuaiAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) return b.quai.core.Nonce(addr), nil } -func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) { - b.quai.core.SendTxToSharingClients(tx) +func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) error { + return b.quai.core.SendTxToSharingClients(tx) } func (b *QuaiAPIBackend) GetRollingFeeInfo() (min, max, avg *big.Int) {