From 090341b27b2b66885d95e69ba688dc1fab3cda86 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Mon, 26 Aug 2024 09:45:57 -0500 Subject: [PATCH 01/42] update insufficient fund error to retry new estimation --- common/txmgr/broadcaster.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 1606f58ce0d..cdaafe8897b 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -560,15 +560,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: - // NOTE: This bails out of the entire cycle and essentially "blocks" on - // any transaction that gets insufficient_funds. This is OK if a - // transaction with a large VALUE blocks because this always comes last - // in the processing list. - // If it blocks because of a transaction that is expensive due to large - // gas limit, we could have smaller transactions "above" it that could - // theoretically be sent, but will instead be blocked. eb.SvcErrBuffer.Append(err) - fallthrough + return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) case client.Retryable: return err, true case client.FeeOutOfValidRange: From 7e81cec026f5308e8ec5988e6037032efab66c6f Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Mon, 26 Aug 2024 09:53:45 -0500 Subject: [PATCH 02/42] add comment --- common/txmgr/broadcaster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index cdaafe8897b..ba441cc61cc 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -560,6 +560,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: + // NOTE: Replacing existing attempt with a new gas estimation to overcome the occasional gas spike eb.SvcErrBuffer.Append(err) return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) case client.Retryable: From 7b293b434989af7e340ec10570438a71912ec558 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Mon, 26 Aug 2024 16:17:41 -0500 Subject: [PATCH 03/42] add change set --- .changeset/pretty-trees-smile.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/pretty-trees-smile.md diff --git a/.changeset/pretty-trees-smile.md b/.changeset/pretty-trees-smile.md new file mode 100644 index 00000000000..c6bf34f7691 --- /dev/null +++ b/.changeset/pretty-trees-smile.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +use new estimation for insufficient fund instead of retry to overcome gas spike #added From 8eb8a920ed43419d62a097f45d1ba1c8c03c5416 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Mon, 26 Aug 2024 17:19:20 -0500 Subject: [PATCH 04/42] modify --- common/txmgr/broadcaster.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index ba441cc61cc..6c280e8b6cc 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -560,9 +560,21 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: - // NOTE: Replacing existing attempt with a new gas estimation to overcome the occasional gas spike + // NOTE: This bails out of the entire cycle and essentially "blocks" on + // any transaction that gets insufficient_funds. This is OK if a + // transaction with a large VALUE blocks because this always comes last + // in the processing list. + // If it blocks because of a transaction that is expensive due to large + // gas limit, we could have smaller transactions "above" it that could + // theoretically be sent, but will instead be blocked. eb.SvcErrBuffer.Append(err) - return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) + if attempt.TxType != 0x2 { + // NOTE: Replacing existing attempt with a new gas estimation to overcome the occasional gas spike + // no need for type-2 tx due to re-estimation is not supported + return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) + } + + fallthrough case client.Retryable: return err, true case client.FeeOutOfValidRange: From a6168e1590ec844a4e1ef3c534658ce23faaec08 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Mon, 26 Aug 2024 21:14:48 -0500 Subject: [PATCH 05/42] fix test --- core/chains/evm/txmgr/broadcaster_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 4edc5572f04..5db4f704ed6 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1037,6 +1037,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { db := pgtest.NewSqlxDB(t) cfg := configtest.NewTestGeneralConfig(t) + cfg.EVMConfigs()[0].GasEstimator.EIP1559DynamicFees = ptr(true) txStore := cltest.NewTestTxStore(t, db) ethKeyStore := cltest.NewKeyStore(t, db).Eth() @@ -1524,7 +1525,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes WHERE nonce = $1`, localNextNonce) }) - t.Run("eth tx is left in progress if eth node returns insufficient eth", func(t *testing.T) { + t.Run("type 2 eth tx is left in progress if eth node returns insufficient eth", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) From b68fb2d01ea43df62749aadc4c62c064a612449f Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 13:40:29 -0500 Subject: [PATCH 06/42] rewrite unit test to modify chain config locally in the unit test --- core/chains/evm/txmgr/broadcaster_test.go | 25 +++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 5db4f704ed6..4ce0193868e 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1525,23 +1525,30 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes WHERE nonce = $1`, localNextNonce) }) - t.Run("type 2 eth tx is left in progress if eth node returns insufficient eth", func(t *testing.T) { + pgtest.MustExec(t, db, `DELETE FROM evm.txes`) + + t.Run("eth tx is left in progress if eth node returns insufficient eth in EIP-1559 mode", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" + evmcfg2 := evmtest.NewChainScopedConfig(t, configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].GasEstimator.EIP1559DynamicFees = ptr(true) + })) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) - etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) + ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce - }), fromAddress).Return(commonclient.InsufficientFunds, errors.New(insufficientEthError)).Once() + }), fromAddress).Return(commonclient.InsufficientFunds, errors.New(insufficientEthError)) + eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, cfg, evmcfg2, &testCheckerFactory{}, false, nonceTracker) + etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) - retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) + // Check gas tip cap verification + retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "insufficient funds for transfer") assert.True(t, retryable) - // Check it was saved correctly with its attempt etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) - require.NoError(t, err) - + // Check it was saved correctly with its attempt assert.Nil(t, etx.BroadcastAt) assert.Nil(t, etx.InitialBroadcastAt) require.NotNil(t, etx.Sequence) @@ -1551,9 +1558,9 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { attempt := etx.TxAttempts[0] assert.Equal(t, txmgrtypes.TxAttemptInProgress, attempt.State) assert.Nil(t, attempt.BroadcastBeforeBlockNum) - }) - pgtest.MustExec(t, db, `DELETE FROM evm.txes`) + pgtest.MustExec(t, db, `DELETE FROM evm.txes`) + }) t.Run("eth tx is left in progress if nonce is too high", func(t *testing.T) { localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) From c93063d40458c5694c89bb10f86f19dff89def51 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 13:55:44 -0500 Subject: [PATCH 07/42] remove a previous change --- core/chains/evm/txmgr/broadcaster_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index e2365aff0b7..3d1ed72db1c 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1037,7 +1037,6 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { db := pgtest.NewSqlxDB(t) cfg := configtest.NewTestGeneralConfig(t) - cfg.EVMConfigs()[0].GasEstimator.EIP1559DynamicFees = ptr(true) txStore := cltest.NewTestTxStore(t, db) ethKeyStore := cltest.NewKeyStore(t, db).Eth() From d7f19d147c5e13cae65771839de7b4ef459ede63 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 14:01:12 -0500 Subject: [PATCH 08/42] remove unrelated comment --- core/chains/evm/txmgr/broadcaster_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 3d1ed72db1c..6c98101d60a 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1540,7 +1540,6 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, cfg, evmcfg2, &testCheckerFactory{}, false, nonceTracker) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) - // Check gas tip cap verification retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "insufficient funds for transfer") From e858f608872b79c5ba542c94308e76e5a79fc104 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 14:02:52 -0500 Subject: [PATCH 09/42] extra --- core/chains/evm/txmgr/broadcaster_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 6c98101d60a..172db211bba 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1524,8 +1524,6 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes WHERE nonce = $1`, localNextNonce) }) - pgtest.MustExec(t, db, `DELETE FROM evm.txes`) - t.Run("eth tx is left in progress if eth node returns insufficient eth in EIP-1559 mode", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" evmcfg2 := evmtest.NewChainScopedConfig(t, configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { From a581071753d7e5894e19698264eca782f3c88a3d Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 14:04:23 -0500 Subject: [PATCH 10/42] rephrase --- common/txmgr/broadcaster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 6c280e8b6cc..7ce4479429d 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -570,7 +570,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.SvcErrBuffer.Append(err) if attempt.TxType != 0x2 { // NOTE: Replacing existing attempt with a new gas estimation to overcome the occasional gas spike - // no need for type-2 tx due to re-estimation is not supported + // can't re-estimate type-2 tx due since it's not supported return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) } From acc9bfaa3aa10e4d0b2cb682a4a2e3980aa9613d Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 14:06:47 -0500 Subject: [PATCH 11/42] update comments --- common/txmgr/broadcaster.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 7ce4479429d..76bce062500 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -560,17 +560,17 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: - // NOTE: This bails out of the entire cycle and essentially "blocks" on + // NOTE: For EIP-1559 transactions, this bails out of the entire cycle and essentially "blocks" on // any transaction that gets insufficient_funds. This is OK if a // transaction with a large VALUE blocks because this always comes last // in the processing list. // If it blocks because of a transaction that is expensive due to large // gas limit, we could have smaller transactions "above" it that could // theoretically be sent, but will instead be blocked. + + // NOTE: For non-EIP-1559 transactions, replacing existing attempt with a new gas estimation to overcome the occasional gas spike eb.SvcErrBuffer.Append(err) if attempt.TxType != 0x2 { - // NOTE: Replacing existing attempt with a new gas estimation to overcome the occasional gas spike - // can't re-estimate type-2 tx due since it's not supported return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) } From 3cff2fca866ba1938946ea11b46c8d405f627c09 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 15:05:38 -0500 Subject: [PATCH 12/42] refactor --- common/txmgr/broadcaster.go | 39 +++++++++++++------------------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 76bce062500..208fd7ea287 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -560,25 +560,14 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: - // NOTE: For EIP-1559 transactions, this bails out of the entire cycle and essentially "blocks" on - // any transaction that gets insufficient_funds. This is OK if a - // transaction with a large VALUE blocks because this always comes last - // in the processing list. - // If it blocks because of a transaction that is expensive due to large - // gas limit, we could have smaller transactions "above" it that could - // theoretically be sent, but will instead be blocked. - - // NOTE: For non-EIP-1559 transactions, replacing existing attempt with a new gas estimation to overcome the occasional gas spike + // NOTE: This can potentially happen during gas spike. We want to re-estimate the tx, and save the replaced attempt and process it after the back off, + // so tryAgainWithNewEstimation should return retryable after saved the tx attempt, instead of calling handleInProgressTx() again eb.SvcErrBuffer.Append(err) - if attempt.TxType != 0x2 { - return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) - } - - fallthrough + return eb.tryAgainWithNewEstimation(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt) case client.Retryable: return err, true case client.FeeOutOfValidRange: - return eb.tryAgainWithNewEstimation(ctx, lgr, err, etx, attempt, initialBroadcastAt) + return eb.tryAgainWithNewEstimation(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt) case client.Unsupported: return err, false case client.ExceedsMaxFee: @@ -705,16 +694,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable } - return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry) + return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry, txError, client.Underpriced) } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) { - if attempt.TxType == 0x2 { - err = fmt.Errorf("re-estimation is not supported for EIP-1559 transactions. Node returned error: %v. This is a bug", txError.Error()) - logger.Sugared(eb.lggr).AssumptionViolation(err.Error()) - return err, false - } - +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) { replacementAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { return fmt.Errorf("tryAgainWithNewEstimation failed to build new attempt: %w", err), retryable @@ -722,14 +705,20 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0) + return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0, txError, errType) } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int) (err error, retyrable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int, txError error, errType client.SendTxReturnCode) (err error, retyrable bool) { if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil { return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) + + // this avoids re-estimated insufficient fund tx gets processed immediately, we want to back off the tx when gas spikes + if errType == client.InsufficientFunds { + return txError, true + } + return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry) } From 5cde6d344793fa25557efaf0186039ebc89d363c Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 15:14:52 -0500 Subject: [PATCH 13/42] revert unit test --- core/chains/evm/txmgr/broadcaster_test.go | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 172db211bba..41f50f44347 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1524,27 +1524,23 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes WHERE nonce = $1`, localNextNonce) }) - t.Run("eth tx is left in progress if eth node returns insufficient eth in EIP-1559 mode", func(t *testing.T) { + t.Run("eth tx is left in progress if eth node returns insufficient eth", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" - evmcfg2 := evmtest.NewChainScopedConfig(t, configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].GasEstimator.EIP1559DynamicFees = ptr(true) - })) - localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() + etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce - }), fromAddress).Return(commonclient.InsufficientFunds, errors.New(insufficientEthError)) - eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, cfg, evmcfg2, &testCheckerFactory{}, false, nonceTracker) - etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) + }), fromAddress).Return(commonclient.InsufficientFunds, errors.New(insufficientEthError)).Once() - retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "insufficient funds for transfer") assert.True(t, retryable) - etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) // Check it was saved correctly with its attempt + etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) + require.NoError(t, err) + assert.Nil(t, etx.BroadcastAt) assert.Nil(t, etx.InitialBroadcastAt) require.NotNil(t, etx.Sequence) @@ -1554,10 +1550,10 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { attempt := etx.TxAttempts[0] assert.Equal(t, txmgrtypes.TxAttemptInProgress, attempt.State) assert.Nil(t, attempt.BroadcastBeforeBlockNum) - - pgtest.MustExec(t, db, `DELETE FROM evm.txes`) }) + pgtest.MustExec(t, db, `DELETE FROM evm.txes`) + t.Run("eth tx is left in progress if nonce is too high", func(t *testing.T) { localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) nonceGapError := "NonceGap, Future nonce. Expected nonce: " + strconv.FormatUint(localNextNonce, 10) From aa7f9e2e430e5c68ee583816c7e0ea499d025544 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 15:44:59 -0500 Subject: [PATCH 14/42] update comments --- common/txmgr/broadcaster.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 208fd7ea287..580a192f7b9 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -560,8 +560,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: - // NOTE: This can potentially happen during gas spike. We want to re-estimate the tx, and save the replaced attempt and process it after the back off, - // so tryAgainWithNewEstimation should return retryable after saved the tx attempt, instead of calling handleInProgressTx() again + // NOTE: + // This can potentially happen during gas spike. + // We want to re-estimate the tx, and save the replaced attempt, + // and process it after the back off duration. + // This is done by tryAgainWithNewEstimation return retryable after saved the tx attempt, + // instead of calling handleInProgressTx() again eb.SvcErrBuffer.Append(err) return eb.tryAgainWithNewEstimation(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt) case client.Retryable: From 6f490e3d4e09f717ec7d9e819c4928cbb1ba944d Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 16:13:10 -0500 Subject: [PATCH 15/42] refactor func call args --- common/txmgr/broadcaster.go | 63 ++++++++++++++++++++++++++++++------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 580a192f7b9..90f5f8c0c0b 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -64,6 +64,26 @@ var ErrTxRemoved = errors.New("tx removed") type ProcessUnstartedTxs[ADDR types.Hashable] func(ctx context.Context, fromAddress ADDR) (retryable bool, err error) +type tryAgainAttemptParam[ + CHAIN_ID types.ID, + HEAD types.Head[BLOCK_HASH], + ADDR types.Hashable, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + initialBroadcastAt time.Time + newFee FEE + newFeeLimit uint64 + retry int + txError error + errType client.SendTxReturnCode +} + // TransmitCheckerFactory creates a transmit checker based on a spec. type TransmitCheckerFactory[ CHAIN_ID types.ID, @@ -558,7 +578,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: - return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) + return eb.tryAgainBumpingGas(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: // NOTE: // This can potentially happen during gas spike. @@ -679,7 +699,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next return etx, nil } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) { // This log error is not applicable to Hedera since the action required would not be needed for its gas estimator if eb.chainType != hederaChainType { logger.With(lgr, @@ -697,8 +717,18 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA if err != nil { return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable } - - return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry, txError, client.Underpriced) + params := tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + etx: etx, + attempt: attempt, + replacementAttempt: replacementAttempt, + initialBroadcastAt: initialBroadcastAt, + newFee: bumpedFee, + newFeeLimit: bumpedFeeLimit, + retry: 0, + txError: txError, + errType: errType, + } + return eb.saveTryAgainAttempt(ctx, lgr, params) } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) { @@ -709,21 +739,32 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0, txError, errType) + params := tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + etx: etx, + attempt: attempt, + replacementAttempt: replacementAttempt, + initialBroadcastAt: initialBroadcastAt, + newFee: fee, + newFeeLimit: feeLimit, + retry: 0, + txError: txError, + errType: errType, + } + return eb.saveTryAgainAttempt(ctx, lgr, params) } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int, txError error, errType client.SendTxReturnCode) (err error, retyrable bool) { - if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, param tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (err error, retyrable bool) { + if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, param.attempt, ¶m.replacementAttempt); err != nil { return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true } - lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) + lgr.Debugw("Bumped fee on initial send", "oldFee", param.attempt.TxFee.String(), "newFee", param.newFee.String(), "newFeeLimit", param.newFeeLimit) // this avoids re-estimated insufficient fund tx gets processed immediately, we want to back off the tx when gas spikes - if errType == client.InsufficientFunds { - return txError, true + if param.errType == client.InsufficientFunds { + return param.txError, true } - return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry) + return eb.handleInProgressTx(ctx, param.etx, param.replacementAttempt, param.initialBroadcastAt, param.retry) } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From 614da699b3cf7a34892798e7e97426835c436655 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 16:19:18 -0500 Subject: [PATCH 16/42] rename --- common/txmgr/broadcaster.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 90f5f8c0c0b..4d4e9d74414 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -64,7 +64,7 @@ var ErrTxRemoved = errors.New("tx removed") type ProcessUnstartedTxs[ADDR types.Hashable] func(ctx context.Context, fromAddress ADDR) (retryable bool, err error) -type tryAgainAttemptParam[ +type saveTryAgainAttemptParams[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], ADDR types.Hashable, @@ -717,7 +717,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA if err != nil { return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable } - params := tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + params := saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ etx: etx, attempt: attempt, replacementAttempt: replacementAttempt, @@ -739,7 +739,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - params := tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + params := saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ etx: etx, attempt: attempt, replacementAttempt: replacementAttempt, @@ -753,7 +753,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA return eb.saveTryAgainAttempt(ctx, lgr, params) } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, param tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (err error, retyrable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, param saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (err error, retyrable bool) { if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, param.attempt, ¶m.replacementAttempt); err != nil { return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true } From 27d6bdfd76e3a6546560b3c6855c757db51daae3 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 27 Aug 2024 16:43:14 -0500 Subject: [PATCH 17/42] modify test --- core/chains/evm/txmgr/broadcaster_test.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 41f50f44347..0b41996bb8f 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1524,7 +1524,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes WHERE nonce = $1`, localNextNonce) }) - t.Run("eth tx is left in progress if eth node returns insufficient eth", func(t *testing.T) { + t.Run("eth tx is replaced with new re-estimated tx if eth node returns insufficient eth", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) @@ -1537,17 +1537,20 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { assert.Contains(t, err.Error(), "insufficient funds for transfer") assert.True(t, retryable) - // Check it was saved correctly with its attempt - etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) + // Check it was saved correctly with replaced attempt + updated_etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) + assert.Nil(t, updated_etx.BroadcastAt) + assert.Nil(t, updated_etx.InitialBroadcastAt) + require.NotNil(t, updated_etx.Sequence) + assert.False(t, updated_etx.Error.Valid) + assert.Equal(t, txmgrcommon.TxUnstarted, etx.State) + require.Len(t, updated_etx.TxAttempts, 1) - assert.Nil(t, etx.BroadcastAt) - assert.Nil(t, etx.InitialBroadcastAt) - require.NotNil(t, etx.Sequence) - assert.False(t, etx.Error.Valid) - assert.Equal(t, txmgrcommon.TxInProgress, etx.State) - require.Len(t, etx.TxAttempts, 1) - attempt := etx.TxAttempts[0] + // check new attempt created + assert.NotEqual(t, updated_etx.CreatedAt, etx.CreatedAt) + + attempt := updated_etx.TxAttempts[0] assert.Equal(t, txmgrtypes.TxAttemptInProgress, attempt.State) assert.Nil(t, attempt.BroadcastBeforeBlockNum) }) From ff694a15e2a41d1038169f06fa10f0ec89723b73 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 10:04:01 -0500 Subject: [PATCH 18/42] fix --- common/txmgr/broadcaster.go | 2 +- core/chains/evm/txmgr/broadcaster_test.go | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 4d4e9d74414..de4e8ebe16f 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -753,7 +753,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA return eb.saveTryAgainAttempt(ctx, lgr, params) } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, param saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (err error, retyrable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, param saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (err error, retryable bool) { if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, param.attempt, ¶m.replacementAttempt); err != nil { return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true } diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 0b41996bb8f..570a55f302a 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1547,9 +1547,6 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { assert.Equal(t, txmgrcommon.TxUnstarted, etx.State) require.Len(t, updated_etx.TxAttempts, 1) - // check new attempt created - assert.NotEqual(t, updated_etx.CreatedAt, etx.CreatedAt) - attempt := updated_etx.TxAttempts[0] assert.Equal(t, txmgrtypes.TxAttemptInProgress, attempt.State) assert.Nil(t, attempt.BroadcastBeforeBlockNum) From c3861941ffe540fe76f82e41b1499acd6973eaf4 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 10:24:11 -0500 Subject: [PATCH 19/42] revert function param --- common/txmgr/broadcaster.go | 59 ++++++------------------------------- 1 file changed, 9 insertions(+), 50 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index de4e8ebe16f..aec31ddfa13 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -64,26 +64,6 @@ var ErrTxRemoved = errors.New("tx removed") type ProcessUnstartedTxs[ADDR types.Hashable] func(ctx context.Context, fromAddress ADDR) (retryable bool, err error) -type saveTryAgainAttemptParams[ - CHAIN_ID types.ID, - HEAD types.Head[BLOCK_HASH], - ADDR types.Hashable, - TX_HASH types.Hashable, - BLOCK_HASH types.Hashable, - SEQ types.Sequence, - FEE feetypes.Fee, -] struct { - etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - initialBroadcastAt time.Time - newFee FEE - newFeeLimit uint64 - retry int - txError error - errType client.SendTxReturnCode -} - // TransmitCheckerFactory creates a transmit checker based on a spec. type TransmitCheckerFactory[ CHAIN_ID types.ID, @@ -717,18 +697,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA if err != nil { return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable } - params := saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ - etx: etx, - attempt: attempt, - replacementAttempt: replacementAttempt, - initialBroadcastAt: initialBroadcastAt, - newFee: bumpedFee, - newFeeLimit: bumpedFeeLimit, - retry: 0, - txError: txError, - errType: errType, - } - return eb.saveTryAgainAttempt(ctx, lgr, params) + + return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry, txError, errType) } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) { @@ -739,32 +709,21 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - params := saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ - etx: etx, - attempt: attempt, - replacementAttempt: replacementAttempt, - initialBroadcastAt: initialBroadcastAt, - newFee: fee, - newFeeLimit: feeLimit, - retry: 0, - txError: txError, - errType: errType, - } - return eb.saveTryAgainAttempt(ctx, lgr, params) + return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0, txError, errType) } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, param saveTryAgainAttemptParams[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (err error, retryable bool) { - if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, param.attempt, ¶m.replacementAttempt); err != nil { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int, txError error, errType client.SendTxReturnCode) (err error, retryable bool) { + if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil { return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true } - lgr.Debugw("Bumped fee on initial send", "oldFee", param.attempt.TxFee.String(), "newFee", param.newFee.String(), "newFeeLimit", param.newFeeLimit) + lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) // this avoids re-estimated insufficient fund tx gets processed immediately, we want to back off the tx when gas spikes - if param.errType == client.InsufficientFunds { - return param.txError, true + if errType == client.InsufficientFunds { + return txError, true } - return eb.handleInProgressTx(ctx, param.etx, param.replacementAttempt, param.initialBroadcastAt, param.retry) + return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry) } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From a8c424c15556b1fdaadba5611591ddf0f48e683a Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 10:28:53 -0500 Subject: [PATCH 20/42] changeset --- .changeset/pretty-trees-smile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/pretty-trees-smile.md b/.changeset/pretty-trees-smile.md index c6bf34f7691..56efcd714d2 100644 --- a/.changeset/pretty-trees-smile.md +++ b/.changeset/pretty-trees-smile.md @@ -1,5 +1,5 @@ --- -"chainlink": minor +"chainlink": internal --- use new estimation for insufficient fund instead of retry to overcome gas spike #added From 0ce01b482bb269c40385020d4ea3c480363d96d0 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 10:37:41 -0500 Subject: [PATCH 21/42] changeset --- .changeset/pretty-trees-smile.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.changeset/pretty-trees-smile.md b/.changeset/pretty-trees-smile.md index 56efcd714d2..2019ff0a58c 100644 --- a/.changeset/pretty-trees-smile.md +++ b/.changeset/pretty-trees-smile.md @@ -1,5 +1,5 @@ --- -"chainlink": internal +"chainlink": minor --- -use new estimation for insufficient fund instead of retry to overcome gas spike #added +use new estimation for insufficient fund instead of retry to overcome gas spike #internal From 84cc573eb97ca3f6f66c837fbbc740d8762ff440 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 10:50:15 -0500 Subject: [PATCH 22/42] rephrase --- common/txmgr/broadcaster.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index aec31ddfa13..fd5d69a162d 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -560,12 +560,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: - // NOTE: - // This can potentially happen during gas spike. - // We want to re-estimate the tx, and save the replaced attempt, - // and process it after the back off duration. - // This is done by tryAgainWithNewEstimation return retryable after saved the tx attempt, - // instead of calling handleInProgressTx() again + // NOTE: This can occur due to either insufficient funds or a gas spike + // combined with a high gas limit. Regardless of the cause, we need to obtain a new estimate, + // replace the current attempt, and retry after the backoff duration. + // The new attempt must be replaced immediately because of a database constraint. eb.SvcErrBuffer.Append(err) return eb.tryAgainWithNewEstimation(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt) case client.Retryable: From 765f06141b39d1a50bbd2c241744ce7ad660b811 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 13:17:30 -0500 Subject: [PATCH 23/42] address comments, refactor --- common/txmgr/broadcaster.go | 42 +++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index fd5d69a162d..823a5b95328 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -558,18 +558,30 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: - return eb.tryAgainBumpingGas(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt, retryCount+1) + replacementAttempt, newErr, retryable := eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt) + if newErr != nil { + return newErr, retryable + } + return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, 0) case client.InsufficientFunds: // NOTE: This can occur due to either insufficient funds or a gas spike // combined with a high gas limit. Regardless of the cause, we need to obtain a new estimate, // replace the current attempt, and retry after the backoff duration. // The new attempt must be replaced immediately because of a database constraint. eb.SvcErrBuffer.Append(err) - return eb.tryAgainWithNewEstimation(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt) + _, newErr, retryable := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) + if newErr != nil { + return newErr, retryable + } + return err, retryable case client.Retryable: return err, true case client.FeeOutOfValidRange: - return eb.tryAgainWithNewEstimation(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt) + replacementAttempt, err, retryable := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) + if err != nil { + return err, retryable + } + return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, 0) case client.Unsupported: return err, false case client.ExceedsMaxFee: @@ -677,7 +689,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next return etx, nil } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { // This log error is not applicable to Hedera since the action required would not be needed for its gas estimator if eb.chainType != hederaChainType { logger.With(lgr, @@ -693,35 +705,29 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA replacementAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) if err != nil { - return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable + return replacementAttempt, fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable } - return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry, txError, errType) + return replacementAttempt, eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, bumpedFee, bumpedFeeLimit), retryable } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { replacementAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { - return fmt.Errorf("tryAgainWithNewEstimation failed to build new attempt: %w", err), retryable + return replacementAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed to build new attempt: %w", err), retryable } lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0, txError, errType) + return replacementAttempt, eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, fee, feeLimit), retryable } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int, txError error, errType client.SendTxReturnCode) (err error, retryable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveReplacementAttempt(ctx context.Context, lgr logger.Logger, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], newFee FEE, newFeeLimit uint64) (err error) { if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil { - return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true + return fmt.Errorf("saveReplacementAttempt failed: %w", err) } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) - - // this avoids re-estimated insufficient fund tx gets processed immediately, we want to back off the tx when gas spikes - if errType == client.InsufficientFunds { - return txError, true - } - - return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry) + return nil } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From 6689d8a8a71505b3c39f82c4c5c2c85c9156495c Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 13:19:16 -0500 Subject: [PATCH 24/42] refactor func name --- common/txmgr/broadcaster.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 823a5b95328..a3fddd66254 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -558,7 +558,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: - replacementAttempt, newErr, retryable := eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt) + replacementAttempt, newErr, retryable := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) if newErr != nil { return newErr, retryable } @@ -689,7 +689,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next return etx, nil } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithBumpedGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { // This log error is not applicable to Hedera since the action required would not be needed for its gas estimator if eb.chainType != hederaChainType { logger.With(lgr, @@ -705,7 +705,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA replacementAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) if err != nil { - return replacementAttempt, fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable + return replacementAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err), retryable } return replacementAttempt, eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, bumpedFee, bumpedFeeLimit), retryable From 0fdbc371358bb8b76f2443764224b0fbc595c46c Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 13:36:49 -0500 Subject: [PATCH 25/42] modify retrycount --- common/txmgr/broadcaster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index a3fddd66254..9573770f5f1 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -562,7 +562,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand if newErr != nil { return newErr, retryable } - return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, 0) + return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: // NOTE: This can occur due to either insufficient funds or a gas spike // combined with a high gas limit. Regardless of the cause, we need to obtain a new estimate, From 15cbf29aeadc8e72e39b1b795cacd0e20002469c Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 14:56:12 -0500 Subject: [PATCH 26/42] fix unit tests --- common/txmgr/broadcaster.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 9573770f5f1..1bb095d869d 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -708,10 +708,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl return replacementAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err), retryable } - return replacementAttempt, eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, bumpedFee, bumpedFeeLimit), retryable + replacementAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, bumpedFee, bumpedFeeLimit) + return replacementAttempt, err, retryable } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { replacementAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { return replacementAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed to build new attempt: %w", err), retryable @@ -719,15 +720,17 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - return replacementAttempt, eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, fee, feeLimit), retryable + replacementAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, fee, feeLimit) + return replacementAttempt, err, retryable } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveReplacementAttempt(ctx context.Context, lgr logger.Logger, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], newFee FEE, newFeeLimit uint64) (err error) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveReplacementAttempt(ctx context.Context, lgr logger.Logger, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], newFee FEE, newFeeLimit uint64) (newAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil { - return fmt.Errorf("saveReplacementAttempt failed: %w", err) + err = fmt.Errorf("saveReplacementAttempt failed: %w", err) + return } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) - return nil + return replacementAttempt, err } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From 119f77d5228e9122835b46539eb74f678a2aec3a Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 15:06:56 -0500 Subject: [PATCH 27/42] rename --- common/txmgr/broadcaster.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 1bb095d869d..8419e5bb33a 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -558,11 +558,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: - replacementAttempt, newErr, retryable := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) + bumpedAttempt, newErr, retryable := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) if newErr != nil { return newErr, retryable } - return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retryCount+1) + return eb.handleInProgressTx(ctx, etx, bumpedAttempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: // NOTE: This can occur due to either insufficient funds or a gas spike // combined with a high gas limit. Regardless of the cause, we need to obtain a new estimate, @@ -577,9 +577,9 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand case client.Retryable: return err, true case client.FeeOutOfValidRange: - replacementAttempt, err, retryable := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) - if err != nil { - return err, retryable + replacementAttempt, newErr, retryable := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) + if newErr != nil { + return newErr, retryable } return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, 0) case client.Unsupported: @@ -703,13 +703,13 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault()) } - replacementAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) + bumpedAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) if err != nil { - return replacementAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err), retryable + return bumpedAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err), retryable } - replacementAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, bumpedFee, bumpedFeeLimit) - return replacementAttempt, err, retryable + bumpedAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, bumpedAttempt, bumpedFee, bumpedFeeLimit) + return bumpedAttempt, err, retryable } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { From 10c5cf96cfa2355030e681a1acd04a0410c7f0d4 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 15:08:13 -0500 Subject: [PATCH 28/42] rename --- common/txmgr/broadcaster.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 8419e5bb33a..2f2d7f43518 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -713,15 +713,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { - replacementAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) + newEstimatedAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { - return replacementAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed to build new attempt: %w", err), retryable + return newEstimatedAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed to build new attempt: %w", err), retryable } lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - replacementAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, replacementAttempt, fee, feeLimit) - return replacementAttempt, err, retryable + newEstimatedAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, newEstimatedAttempt, fee, feeLimit) + return newEstimatedAttempt, err, retryable } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveReplacementAttempt(ctx context.Context, lgr logger.Logger, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], newFee FEE, newFeeLimit uint64) (newAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { From 064c0eaa71b6f8e8715b8a2103ded5e289e45e2d Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 15:20:51 -0500 Subject: [PATCH 29/42] small refactor --- common/txmgr/broadcaster.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 2f2d7f43518..21d81e54af6 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -708,7 +708,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl return bumpedAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err), retryable } - bumpedAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, bumpedAttempt, bumpedFee, bumpedFeeLimit) + err = eb.saveReplacementAttempt(ctx, lgr, attempt, &bumpedAttempt, bumpedFee, bumpedFeeLimit) return bumpedAttempt, err, retryable } @@ -720,17 +720,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - newEstimatedAttempt, err = eb.saveReplacementAttempt(ctx, lgr, attempt, newEstimatedAttempt, fee, feeLimit) + err = eb.saveReplacementAttempt(ctx, lgr, attempt, &newEstimatedAttempt, fee, feeLimit) return newEstimatedAttempt, err, retryable } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveReplacementAttempt(ctx context.Context, lgr logger.Logger, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], newFee FEE, newFeeLimit uint64) (newAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil { - err = fmt.Errorf("saveReplacementAttempt failed: %w", err) - return +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveReplacementAttempt(ctx context.Context, lgr logger.Logger, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], newFee FEE, newFeeLimit uint64) (err error) { + if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, replacementAttempt); err != nil { + return fmt.Errorf("saveReplacementAttempt failed: %w", err) } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) - return replacementAttempt, err + return nil } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From 5a6dfcca5590dffb1ac50440710569bebafb4129 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 16:25:14 -0500 Subject: [PATCH 30/42] nit --- common/txmgr/broadcaster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 21d81e54af6..32d8f1ac0bd 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -729,7 +729,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save return fmt.Errorf("saveReplacementAttempt failed: %w", err) } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) - return nil + return } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From 2113d3478014bc161157fea00548f0a6b80aff70 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 16:56:18 -0500 Subject: [PATCH 31/42] update error --- common/txmgr/broadcaster.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 32d8f1ac0bd..85042277d40 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -568,11 +568,13 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // combined with a high gas limit. Regardless of the cause, we need to obtain a new estimate, // replace the current attempt, and retry after the backoff duration. // The new attempt must be replaced immediately because of a database constraint. - eb.SvcErrBuffer.Append(err) _, newErr, retryable := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) if newErr != nil { + eb.SvcErrBuffer.Append(fmt.Errorf("%v, %v, retryable: %v", err, newErr, retryable)) return newErr, retryable } + + eb.SvcErrBuffer.Append(fmt.Errorf("%v, retryable: %v", err, retryable)) return err, retryable case client.Retryable: return err, true From 776a9db9cd86bc94fcc9e82c5eeb55a8c9b5d40d Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 17:35:49 -0500 Subject: [PATCH 32/42] modify test --- core/chains/evm/txmgr/broadcaster_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 570a55f302a..10c58136821 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1527,7 +1527,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("eth tx is replaced with new re-estimated tx if eth node returns insufficient eth", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) - etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) + mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.InsufficientFunds, errors.New(insufficientEthError)).Once() @@ -1538,13 +1538,16 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { assert.True(t, retryable) // Check it was saved correctly with replaced attempt - updated_etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) + output, err := txStore.FindTxsByStateAndFromAddresses(ctx, []gethCommon.Address{fromAddress}, txmgrcommon.TxInProgress, testutils.FixtureChainID) + updated_etx := output[0] require.NoError(t, err) + assert.NotEqual(t, 0, len(output)) + assert.NotNil(t, updated_etx.CreatedAt) assert.Nil(t, updated_etx.BroadcastAt) assert.Nil(t, updated_etx.InitialBroadcastAt) require.NotNil(t, updated_etx.Sequence) assert.False(t, updated_etx.Error.Valid) - assert.Equal(t, txmgrcommon.TxUnstarted, etx.State) + assert.Equal(t, txmgrcommon.TxInProgress, updated_etx.State) require.Len(t, updated_etx.TxAttempts, 1) attempt := updated_etx.TxAttempts[0] From 940bdb9ad3076238090dadd89d322bec2fb6b6a6 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 17:41:43 -0500 Subject: [PATCH 33/42] add comments --- common/txmgr/broadcaster.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 85042277d40..a1a3b442a2f 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -691,6 +691,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next return etx, nil } +// replaceAttemptWithBumpedGas perform the replacement of the existing tx attempt with a new bumped fee attempt, +// and it returns three values: new attempt, error, and retryable flag func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithBumpedGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { // This log error is not applicable to Hedera since the action required would not be needed for its gas estimator if eb.chainType != hederaChainType { @@ -714,6 +716,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl return bumpedAttempt, err, retryable } +// replaceAttemptWithNewEstimation perform the replacement of the existing tx attempt with a new estimated attempt, +// and it returns three values: new attempt, error, and retryable flag func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { newEstimatedAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { From 26d071c388bdb3b0384507f739d44966a5f4cabb Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 28 Aug 2024 18:16:52 -0500 Subject: [PATCH 34/42] rm unused --- common/txmgr/broadcaster.go | 66 ++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index a1a3b442a2f..90de572a2a0 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -558,9 +558,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: - bumpedAttempt, newErr, retryable := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) + bumpedAttempt, newErr := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) if newErr != nil { - return newErr, retryable + eb.SvcErrBuffer.Append(fmt.Errorf("%v, failed to create bumped attempt, %v", err, newErr)) + return newErr, true } return eb.handleInProgressTx(ctx, etx, bumpedAttempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: @@ -568,22 +569,23 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // combined with a high gas limit. Regardless of the cause, we need to obtain a new estimate, // replace the current attempt, and retry after the backoff duration. // The new attempt must be replaced immediately because of a database constraint. - _, newErr, retryable := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) - if newErr != nil { - eb.SvcErrBuffer.Append(fmt.Errorf("%v, %v, retryable: %v", err, newErr, retryable)) - return newErr, retryable + eb.SvcErrBuffer.Append(err) + if _, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt); newErr != nil { + eb.SvcErrBuffer.Append(fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr)) } - - eb.SvcErrBuffer.Append(fmt.Errorf("%v, retryable: %v", err, retryable)) - return err, retryable + return err, true case client.Retryable: return err, true case client.FeeOutOfValidRange: - replacementAttempt, newErr, retryable := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) - if newErr != nil { - return newErr, retryable + replacementAttempt, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) + if newErr != nil || replacementAttempt == nil { + eb.SvcErrBuffer.Append(fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr)) + return newErr, true } - return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, 0) + + lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", + "etxID", etx.ID, "err", err, "newGasPrice", replacementAttempt.TxFee, "newGasLimit", replacementAttempt.ChainSpecificFeeLimit) + return eb.handleInProgressTx(ctx, etx, *replacementAttempt, initialBroadcastAt, 0) case client.Unsupported: return err, false case client.ExceedsMaxFee: @@ -693,7 +695,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next // replaceAttemptWithBumpedGas perform the replacement of the existing tx attempt with a new bumped fee attempt, // and it returns three values: new attempt, error, and retryable flag -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithBumpedGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithBumpedGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { // This log error is not applicable to Hedera since the action required would not be needed for its gas estimator if eb.chainType != hederaChainType { logger.With(lgr, @@ -707,35 +709,33 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault()) } - bumpedAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) + bumpedAttempt, bumpedFee, bumpedFeeLimit, _, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) if err != nil { - return bumpedAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err), retryable + return bumpedAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err) } - err = eb.saveReplacementAttempt(ctx, lgr, attempt, &bumpedAttempt, bumpedFee, bumpedFeeLimit) - return bumpedAttempt, err, retryable + if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &bumpedAttempt); err != nil { + return bumpedAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err) + } + + lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", bumpedFee.String(), "newFeeLimit", bumpedFeeLimit) + return bumpedAttempt, err } // replaceAttemptWithNewEstimation perform the replacement of the existing tx attempt with a new estimated attempt, -// and it returns three values: new attempt, error, and retryable flag -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error, retryable bool) { - newEstimatedAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) +// and it returns three values: new attempt, error +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { + newEstimatedAttempt, fee, feeLimit, _, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { - return newEstimatedAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed to build new attempt: %w", err), retryable + return &newEstimatedAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed to build new attempt: %w", err) } - lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", - "etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) - err = eb.saveReplacementAttempt(ctx, lgr, attempt, &newEstimatedAttempt, fee, feeLimit) - return newEstimatedAttempt, err, retryable -} - -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveReplacementAttempt(ctx context.Context, lgr logger.Logger, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], newFee FEE, newFeeLimit uint64) (err error) { - if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, replacementAttempt); err != nil { - return fmt.Errorf("saveReplacementAttempt failed: %w", err) + if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &newEstimatedAttempt); err != nil { + return &newEstimatedAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed: %w", err) } - lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) - return + + lgr.Debugw("new estimated fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", fee.String(), "newFeeLimit", feeLimit) + return &newEstimatedAttempt, err } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From 2badc93ca246c223d605476b5ddc287fe6c0a8e9 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Thu, 29 Aug 2024 09:29:46 -0500 Subject: [PATCH 35/42] comments --- common/txmgr/broadcaster.go | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 90de572a2a0..0824cddd7c1 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -559,10 +559,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand return err, true case client.Underpriced: bumpedAttempt, newErr := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) - if newErr != nil { - eb.SvcErrBuffer.Append(fmt.Errorf("%v, failed to create bumped attempt, %v", err, newErr)) - return newErr, true - } + lgr.Debugf("%v, failed to create bumped attempt, %v", err, newErr) return eb.handleInProgressTx(ctx, etx, bumpedAttempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: // NOTE: This can occur due to either insufficient funds or a gas spike @@ -571,16 +568,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // The new attempt must be replaced immediately because of a database constraint. eb.SvcErrBuffer.Append(err) if _, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt); newErr != nil { - eb.SvcErrBuffer.Append(fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr)) + err = fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr) } return err, true case client.Retryable: return err, true case client.FeeOutOfValidRange: replacementAttempt, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) - if newErr != nil || replacementAttempt == nil { - eb.SvcErrBuffer.Append(fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr)) - return newErr, true + if newErr != nil { + err = fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr) + return err, true } lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", @@ -693,8 +690,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next return etx, nil } -// replaceAttemptWithBumpedGas perform the replacement of the existing tx attempt with a new bumped fee attempt, -// and it returns three values: new attempt, error, and retryable flag +// replaceAttemptWithBumpedGas performs the replacement of the existing tx attempt with a new bumped fee attempt. func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithBumpedGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { // This log error is not applicable to Hedera since the action required would not be needed for its gas estimator if eb.chainType != hederaChainType { @@ -711,27 +707,26 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl bumpedAttempt, bumpedFee, bumpedFeeLimit, _, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) if err != nil { - return bumpedAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err) + return bumpedAttempt, err } if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &bumpedAttempt); err != nil { - return bumpedAttempt, fmt.Errorf("replaceAttemptWithBumpedGas failed: %w", err) + return bumpedAttempt, err } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", bumpedFee.String(), "newFeeLimit", bumpedFeeLimit) return bumpedAttempt, err } -// replaceAttemptWithNewEstimation perform the replacement of the existing tx attempt with a new estimated attempt, -// and it returns three values: new attempt, error +// replaceAttemptWithNewEstimation performs the replacement of the existing tx attempt with a new estimated fee attempt. func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { newEstimatedAttempt, fee, feeLimit, _, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { - return &newEstimatedAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed to build new attempt: %w", err) + return &newEstimatedAttempt, err } if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &newEstimatedAttempt); err != nil { - return &newEstimatedAttempt, fmt.Errorf("replaceAttemptWithNewEstimation failed: %w", err) + return &newEstimatedAttempt, err } lgr.Debugw("new estimated fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", fee.String(), "newFeeLimit", feeLimit) From d6e780bcf0fe5dafbe031e17b057bcc829e59cba Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Thu, 29 Aug 2024 09:35:16 -0500 Subject: [PATCH 36/42] fix --- common/txmgr/broadcaster.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 0824cddd7c1..bb38dd98bfa 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -559,7 +559,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand return err, true case client.Underpriced: bumpedAttempt, newErr := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) - lgr.Debugf("%v, failed to create bumped attempt, %v", err, newErr) + if newErr != nil { + err = fmt.Errorf("%v, failed to create bumped attempt, %v", err, newErr) + return err, true + } + return eb.handleInProgressTx(ctx, etx, bumpedAttempt, initialBroadcastAt, retryCount+1) case client.InsufficientFunds: // NOTE: This can occur due to either insufficient funds or a gas spike From 54e7c4dd0c9e1cdb9512eef0646d647e32d93121 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Thu, 29 Aug 2024 09:54:20 -0500 Subject: [PATCH 37/42] adding returned retryable --- common/txmgr/broadcaster.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index bb38dd98bfa..f460de07fd2 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -558,10 +558,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: - bumpedAttempt, newErr := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) + bumpedAttempt, retryable, newErr := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) if newErr != nil { err = fmt.Errorf("%v, failed to create bumped attempt, %v", err, newErr) - return err, true + return err, retryable } return eb.handleInProgressTx(ctx, etx, bumpedAttempt, initialBroadcastAt, retryCount+1) @@ -571,17 +571,17 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // replace the current attempt, and retry after the backoff duration. // The new attempt must be replaced immediately because of a database constraint. eb.SvcErrBuffer.Append(err) - if _, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt); newErr != nil { + if _, _, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt); newErr != nil { err = fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr) } return err, true case client.Retryable: return err, true case client.FeeOutOfValidRange: - replacementAttempt, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) + replacementAttempt, retryable, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) if newErr != nil { err = fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr) - return err, true + return err, retryable } lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", @@ -695,7 +695,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next } // replaceAttemptWithBumpedGas performs the replacement of the existing tx attempt with a new bumped fee attempt. -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithBumpedGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithBumpedGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (replacedAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], retryable bool, err error) { // This log error is not applicable to Hedera since the action required would not be needed for its gas estimator if eb.chainType != hederaChainType { logger.With(lgr, @@ -709,32 +709,32 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault()) } - bumpedAttempt, bumpedFee, bumpedFeeLimit, _, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) + bumpedAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) if err != nil { - return bumpedAttempt, err + return bumpedAttempt, retryable, err } if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &bumpedAttempt); err != nil { - return bumpedAttempt, err + return bumpedAttempt, retryable, err } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", bumpedFee.String(), "newFeeLimit", bumpedFeeLimit) - return bumpedAttempt, err + return bumpedAttempt, retryable, err } // replaceAttemptWithNewEstimation performs the replacement of the existing tx attempt with a new estimated fee attempt. -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], retryable bool, err error) { newEstimatedAttempt, fee, feeLimit, _, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { - return &newEstimatedAttempt, err + return &newEstimatedAttempt, retryable, err } if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &newEstimatedAttempt); err != nil { - return &newEstimatedAttempt, err + return &newEstimatedAttempt, retryable, err } lgr.Debugw("new estimated fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", fee.String(), "newFeeLimit", feeLimit) - return &newEstimatedAttempt, err + return &newEstimatedAttempt, retryable, err } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From 9882b193ca6302cfb09eedfba6f665ee115a2cfa Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Thu, 29 Aug 2024 12:42:44 -0500 Subject: [PATCH 38/42] return true for retryable --- common/txmgr/broadcaster.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index f460de07fd2..a0fc9ed979e 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -715,11 +715,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl } if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &bumpedAttempt); err != nil { - return bumpedAttempt, retryable, err + return bumpedAttempt, true, err } lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", bumpedFee.String(), "newFeeLimit", bumpedFeeLimit) - return bumpedAttempt, retryable, err + return bumpedAttempt, true, err } // replaceAttemptWithNewEstimation performs the replacement of the existing tx attempt with a new estimated fee attempt. @@ -730,11 +730,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl } if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &newEstimatedAttempt); err != nil { - return &newEstimatedAttempt, retryable, err + return &newEstimatedAttempt, true, err } lgr.Debugw("new estimated fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", fee.String(), "newFeeLimit", feeLimit) - return &newEstimatedAttempt, retryable, err + return &newEstimatedAttempt, true, err } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { From ff4c290338a032a3989898d796dfcac6229db672 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Thu, 29 Aug 2024 12:44:00 -0500 Subject: [PATCH 39/42] one more --- common/txmgr/broadcaster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index a0fc9ed979e..755d31c8cd9 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -724,7 +724,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) repl // replaceAttemptWithNewEstimation performs the replacement of the existing tx attempt with a new estimated fee attempt. func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) replaceAttemptWithNewEstimation(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (updatedAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], retryable bool, err error) { - newEstimatedAttempt, fee, feeLimit, _, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) + newEstimatedAttempt, fee, feeLimit, retryable, err := eb.NewTxAttemptWithType(ctx, etx, lgr, attempt.TxType, feetypes.OptForceRefetch) if err != nil { return &newEstimatedAttempt, retryable, err } From f0195048098cb8cef096bb8b318cc78a8bcfaff3 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Thu, 29 Aug 2024 14:04:46 -0500 Subject: [PATCH 40/42] address comments --- common/txmgr/broadcaster.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 755d31c8cd9..86475c2e0e7 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -558,10 +558,9 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: - bumpedAttempt, retryable, newErr := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) - if newErr != nil { - err = fmt.Errorf("%v, failed to create bumped attempt, %v", err, newErr) - return err, retryable + bumpedAttempt, retryable, replaceErr := eb.replaceAttemptWithBumpedGas(ctx, lgr, err, etx, attempt) + if replaceErr != nil { + return replaceErr, retryable } return eb.handleInProgressTx(ctx, etx, bumpedAttempt, initialBroadcastAt, retryCount+1) @@ -571,17 +570,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // replace the current attempt, and retry after the backoff duration. // The new attempt must be replaced immediately because of a database constraint. eb.SvcErrBuffer.Append(err) - if _, _, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt); newErr != nil { - err = fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr) + if _, _, replaceErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt); replaceErr != nil { + return replaceErr, true } return err, true case client.Retryable: return err, true case client.FeeOutOfValidRange: - replacementAttempt, retryable, newErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) - if newErr != nil { - err = fmt.Errorf("%v, failed to create re-estimated attempt, %v", err, newErr) - return err, retryable + replacementAttempt, retryable, replaceErr := eb.replaceAttemptWithNewEstimation(ctx, lgr, etx, attempt) + if replaceErr != nil { + return replaceErr, retryable } lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", From dfea00887a8066d2b25fe0b978c5fef1c80ca41e Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Mon, 2 Sep 2024 17:18:06 -0500 Subject: [PATCH 41/42] revert changes in unit test, just update comment --- core/chains/evm/txmgr/broadcaster_test.go | 27 ++++++++++------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 10c58136821..1a83d9662bf 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1524,10 +1524,10 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes WHERE nonce = $1`, localNextNonce) }) - t.Run("eth tx is replaced with new re-estimated tx if eth node returns insufficient eth", func(t *testing.T) { + t.Run("if eth node returns insufficient eth, eth tx is replaced with new re-estimated tx, and keep in progress", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) - mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) + etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.InsufficientFunds, errors.New(insufficientEthError)).Once() @@ -1537,20 +1537,17 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { assert.Contains(t, err.Error(), "insufficient funds for transfer") assert.True(t, retryable) - // Check it was saved correctly with replaced attempt - output, err := txStore.FindTxsByStateAndFromAddresses(ctx, []gethCommon.Address{fromAddress}, txmgrcommon.TxInProgress, testutils.FixtureChainID) - updated_etx := output[0] + // Check it was saved correctly with its attempt + etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) - assert.NotEqual(t, 0, len(output)) - assert.NotNil(t, updated_etx.CreatedAt) - assert.Nil(t, updated_etx.BroadcastAt) - assert.Nil(t, updated_etx.InitialBroadcastAt) - require.NotNil(t, updated_etx.Sequence) - assert.False(t, updated_etx.Error.Valid) - assert.Equal(t, txmgrcommon.TxInProgress, updated_etx.State) - require.Len(t, updated_etx.TxAttempts, 1) - - attempt := updated_etx.TxAttempts[0] + + assert.Nil(t, etx.BroadcastAt) + assert.Nil(t, etx.InitialBroadcastAt) + require.NotNil(t, etx.Sequence) + assert.False(t, etx.Error.Valid) + assert.Equal(t, txmgrcommon.TxInProgress, etx.State) + require.Len(t, etx.TxAttempts, 1) + attempt := etx.TxAttempts[0] assert.Equal(t, txmgrtypes.TxAttemptInProgress, attempt.State) assert.Nil(t, attempt.BroadcastBeforeBlockNum) }) From 12b2d92db1328dd8433cbf63408cc63076772a0d Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Tue, 3 Sep 2024 09:21:07 -0500 Subject: [PATCH 42/42] update comment --- core/chains/evm/txmgr/broadcaster_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 1a83d9662bf..6454cb548b3 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -1524,7 +1524,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes WHERE nonce = $1`, localNextNonce) }) - t.Run("if eth node returns insufficient eth, eth tx is replaced with new re-estimated tx, and keep in progress", func(t *testing.T) { + t.Run("tx is left in progress and its attempt gets replaced with a new re-estimated attempt if node returns insufficient eth", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, testutils.FixtureChainID)