Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directly sending the transaction to the client instead of the local node #2549

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ var NodeFlags = []Flag{
var TXPoolFlags = []Flag{
TxPoolLocalsFlag,
TxPoolNoLocalsFlag,
TxPoolSyncTxWithReturnFlag,
TxPoolJournalFlag,
TxPoolRejournalFlag,
TxPoolPriceLimitFlag,
Expand Down Expand Up @@ -358,6 +359,12 @@ var (
Usage: "Disables price exemptions for locally submitted transactions" + generateEnvDoc(c_TXPoolPrefix+"nolocals"),
}

TxPoolSyncTxWithReturnFlag = Flag{
Name: c_TXPoolPrefix + "sync-tx-with-return",
Value: true,
Usage: "Shares the tx with the sharing client with syncronous return (also bypasses local pool, only use it with combination of sharing clients)" + generateEnvDoc(c_TXPoolPrefix+"sync-tx-with-return"),
}

TxPoolJournalFlag = Flag{
Name: c_TXPoolPrefix + "journal",
Value: core.DefaultTxPoolConfig.Journal,
Expand Down Expand Up @@ -1141,6 +1148,7 @@ func setTxPool(cfg *core.TxPoolConfig, nodeLocation common.Location) {
if viper.IsSet(TxPoolNoLocalsFlag.Name) {
cfg.NoLocals = viper.GetBool(TxPoolNoLocalsFlag.Name)
}
cfg.SyncTxWithReturn = viper.GetBool(TxPoolSyncTxWithReturnFlag.Name)
if viper.IsSet(TxPoolJournalFlag.Name) {
cfg.Journal = viper.GetString(TxPoolJournalFlag.Name)
}
Expand Down
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,8 +1362,8 @@ func (c *Core) ContentFrom(addr common.Address) (types.Transactions, types.Trans
}
return c.sl.txPool.ContentFrom(internal)
}
func (c *Core) SendTxToSharingClients(tx *types.Transaction) {
c.sl.txPool.SendTxToSharingClients(tx)
func (c *Core) SendTxToSharingClients(tx *types.Transaction) error {
return c.sl.txPool.SendTxToSharingClients(tx)
}

func (c *Core) GetRollingFeeInfo() (min, max, avg *big.Int) {
Expand Down
82 changes: 61 additions & 21 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ var (
)

var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats
qiExpirationCheckInterval = 10 * time.Minute // Time interval to check for expired Qi transactions
qiExpirationCheckDivisor = 5 // Check 1/nth of the pool for expired Qi transactions every interval
txSharingPoolTimeout = 200 * time.Millisecond // Time to exit the tx sharing call with client
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats
qiExpirationCheckInterval = 10 * time.Minute // Time interval to check for expired Qi transactions
qiExpirationCheckDivisor = 5 // Check 1/nth of the pool for expired Qi transactions every interval
txSharingPoolTimeout = 2 * time.Second // Time to exit the tx sharing call with client
)

var (
Expand Down Expand Up @@ -177,10 +177,11 @@ type blockChain interface {

// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
Locals []common.InternalAddress // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal
Locals []common.InternalAddress // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
SyncTxWithReturn bool
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal

PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
Expand Down Expand Up @@ -314,13 +315,14 @@ func (config *TxPoolConfig) sanitize(logger *log.Logger) TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
sharingClientMu sync.RWMutex

currentState *state.StateDB // Current state in the blockchain head
qiGasScalingFactor float64
Expand Down Expand Up @@ -1590,11 +1592,49 @@ func (pool *TxPool) queueTxEvent(tx *types.Transaction) {

// SendTxToSharingClients sends the tx into the pool sharing tx ch and
// if its full logs it
func (pool *TxPool) SendTxToSharingClients(tx *types.Transaction) {
select {
case pool.poolSharingTxCh <- tx:
default:
pool.logger.Warn("pool sharing tx ch is full")
func (pool *TxPool) SendTxToSharingClients(tx *types.Transaction) error {
pool.sharingClientMu.Lock()
defer pool.sharingClientMu.Unlock()

// If there are no tx pool sharing clients just submit to the local pool
if len(pool.config.SharingClientsEndpoints) == 0 || !pool.config.SyncTxWithReturn {
err := pool.AddLocal(tx)
if err != nil {
return err
}
select {
case pool.poolSharingTxCh <- tx:
default:
pool.logger.Warn("pool sharing tx ch is full")
}
return err
} else {
// send to the first client, and then submit to the rest
client := pool.poolSharingClients[0]
ctx, cancel := context.WithTimeout(context.Background(), txSharingPoolTimeout)
defer cancel()
err := client.SendTransactionToPoolSharingClient(ctx, tx)
if err != nil {
pool.logger.WithField("err", err).Error("Error sending transaction to pool sharing client")
}

if len(pool.poolSharingClients) > 1 {
// send to all pool sharing clients
for _, client := range pool.poolSharingClients[1:] {
if client != nil {
go func(*quaiclient.Client, *types.Transaction) {
ctx, cancel := context.WithTimeout(context.Background(), txSharingPoolTimeout)
defer cancel()
sendErr := client.SendTransactionToPoolSharingClient(ctx, tx)
if sendErr != nil {
pool.logger.WithField("err", sendErr).Error("Error sending transaction to pool sharing client")
}
}(client, tx)
}
}
}

return err
}
}

Expand Down
10 changes: 5 additions & 5 deletions internal/quaiapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,8 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c
return common.Hash{}, errors.New("submitTransaction call can only be made on chain processing the state")
}
if tx.Type() == types.QiTxType {
if err := b.SendTx(ctx, tx); err != nil {
// Send the tx to tx pool sharing clients
if err := b.SendTxToSharingClients(tx); err != nil {
return common.Hash{}, err
}
} else {
Expand All @@ -1511,7 +1512,9 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
return common.Hash{}, err
}
if err := b.SendTx(ctx, tx); err != nil {

// Send the tx to tx pool sharing clients
if err := b.SendTxToSharingClients(tx); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
Expand Down Expand Up @@ -1556,9 +1559,6 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, input
if err != nil {
return common.Hash{}, err
}
// Send the tx to tx pool sharing clients
s.b.SendTxToSharingClients(tx)

return SubmitTransaction(ctx, s.b, tx)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type Backend interface {
TxPoolContent() (map[common.InternalAddress]types.Transactions, map[common.InternalAddress]types.Transactions)
TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions)
GetPoolGasPrice() *big.Int
SendTxToSharingClients(tx *types.Transaction)
SendTxToSharingClients(tx *types.Transaction) error
GetRollingFeeInfo() (min, max, avg *big.Int)

// Filter API
Expand Down
4 changes: 2 additions & 2 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ func (b *QuaiAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address)
return b.quai.core.Nonce(addr), nil
}

func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) {
b.quai.core.SendTxToSharingClients(tx)
func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) error {
return b.quai.core.SendTxToSharingClients(tx)
}

func (b *QuaiAPIBackend) GetRollingFeeInfo() (min, max, avg *big.Int) {
Expand Down
Loading