From 5556b33d392f7dc9a76da12db24b7cfc7c13b855 Mon Sep 17 00:00:00 2001 From: Dimitris Grigoriou Date: Mon, 20 Jan 2025 15:51:46 +0200 Subject: [PATCH] Txmv2 enablement (#15286) * TXMv2 alpha version * TXM fixes * Fix * Update orchestrator * Add builder * Add txmv2 tests * Update retry logic * Add backoff mechanism * Add multi address support * Add backoff mechanism for broadcasting and backfilling * Minor fix * Add check to dummy keystore * Fix inmemory store logging per address * Remove unnecessary const * Stuck tx detector alpha * Update stuck tx detection * Add stuck_tx_detection and dual broadcast client * Add support for TXMv2 * Fix txm to work with enabled addresses from keystore * Fix orchestrator's monitoring call * Fix AttemptBuilder * Enable DualBroadcast client * Enable dual broadcast * AttemptBuilder fixes * Add AttemptBuilder service exception * Add AttemptBuilder close * Minor updates * Switch DualBroadcast params to pointers * Update DualBroadcast types to pointers * Make purgable attempts empty * Fix Idempotency in Store Manager * Update trigger * Fix lint * Add context to client * Fix more lint * Fix lint * Fix lint * Fix lint * Fix DualBroadcast client * More lint fixes * More lint fixes * Fix lint * Fix lint * Fix lint final * Fix races * Update logs * Fix lint * Start documentation * Update DummyKeystore Add method * Txmv2 stuck tx detection (#15436) * Stuck tx detector alpha * Update stuck tx detection * Add stuck_tx_detection and dual broadcast client * Add support for TXMv2 * Fix orchestrator's monitoring call * Fix AttemptBuilder * Enable DualBroadcast client * Switch DualBroadcast params to pointers * Add context to client * Fix lint * Fix DualBroadcast client * More lint fixes * Fix lint * Make nonce nullable * Update configs * Add prom metrics * Add prom metrics * Add transaction confirmation metric * Improve logs * Add Abandon support * Address feedback * Update tests * Fix orchestrator log * Fix Nonce log * Improvements * Update tests * Add fixes * Improvements * Improve logs * Move initialization of nonce * Add Beholder metrics * Improve InMemoryStorage * Support different priceMax per key * Upgrades * Fix config tests * Fix docs * Fix config test lint * Update configs * Reuse transaction states * Fix docs * Deprecate DualBroadcastDetection * Add health report --- common/txmgr/txmgr.go | 39 ++++++++++++++++++- .../ocrimpls/contract_transmitter_test.go | 3 +- .../evm/txm/docs/TRANSACTION_MANAGER_V2.md | 2 +- core/chains/evm/txmgr/builder.go | 6 ++- core/chains/evm/txmgr/evm_tx_store_test.go | 2 +- core/chains/evm/txmgr/txmgr_test.go | 3 +- core/chains/legacyevm/evm_txm.go | 36 +++++++++-------- .../headreporter/prometheus_reporter_test.go | 3 +- core/services/vrf/delegate_test.go | 2 +- core/services/vrf/v2/integration_v2_test.go | 2 +- core/services/vrf/v2/listener_v2_test.go | 2 +- 11 files changed, 73 insertions(+), 27 deletions(-) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 20d872efd20..ec7841d470d 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -65,6 +65,20 @@ type TxManager[ GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error) } +type TxmV2Wrapper[ + CHAIN_ID types.ID, + HEAD types.Head[BLOCK_HASH], + ADDR types.Hashable, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + SEQ types.Sequence, + FEE fees.Fee, +] interface { + services.Service + CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + Reset(addr ADDR, abandon bool) error +} + type reset struct { // f is the function to execute between stopping/starting the // Broadcaster and Confirmer @@ -112,6 +126,7 @@ type Txm[ fwdMgr txmgrtypes.ForwarderManager[ADDR] txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] newErrorClassifier NewErrorClassifier + txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] } func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) { @@ -147,6 +162,7 @@ func NewTxm[ tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD], newErrorClassifierFunc NewErrorClassifier, + txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ logger: logger.Sugared(lggr), @@ -169,6 +185,7 @@ func NewTxm[ tracker: tracker, newErrorClassifier: newErrorClassifierFunc, finalizer: finalizer, + txmv2wrapper: txmv2wrapper, } if txCfg.ResendAfterThreshold() <= 0 { @@ -207,6 +224,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx return fmt.Errorf("Txm: Finalizer failed to start: %w", err) } + if b.txmv2wrapper != nil { + if err := ms.Start(ctx, b.txmv2wrapper); err != nil { + return fmt.Errorf("Txm: Txmv2 failed to start: %w", err) + } + } + b.logger.Info("Txm starting runLoop") b.wg.Add(1) go b.runLoop() @@ -237,6 +260,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr f := func() { if abandon { err = b.abandon(addr) + if b.txmv2wrapper != nil { + if err2 := b.txmv2wrapper.Reset(addr, abandon); err2 != nil { + b.logger.Error("failed to abandon transactions for dual broadcast", "err", err2) + } + } } } @@ -460,6 +488,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err) } + if b.txmv2wrapper != nil { + err = b.txmv2wrapper.Close() + if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { + b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err) + } + } return case <-keysChanged: // This check prevents the weird edge-case where you can select @@ -513,11 +547,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { // Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing // Skipping CreateTransaction to avoid double send + if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast { + return b.txmv2wrapper.CreateTransaction(ctx, txRequest) + } if txRequest.IdempotencyKey != nil { var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID) if err != nil { - return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err) + return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err) } if existingTx != nil { b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey) diff --git a/core/capabilities/ccip/ocrimpls/contract_transmitter_test.go b/core/capabilities/ccip/ocrimpls/contract_transmitter_test.go index 5e237ccbee0..adc4057fd56 100644 --- a/core/capabilities/ccip/ocrimpls/contract_transmitter_test.go +++ b/core/capabilities/ccip/ocrimpls/contract_transmitter_test.go @@ -476,7 +476,8 @@ func makeTestEvmTxm( lp, keyStore, estimator, - ht) + ht, + nil) require.NoError(t, err, "can't create tx manager") _, unsub := broadcaster.Subscribe(txm) diff --git a/core/chains/evm/txm/docs/TRANSACTION_MANAGER_V2.md b/core/chains/evm/txm/docs/TRANSACTION_MANAGER_V2.md index d408cc7d733..29531f58ebc 100644 --- a/core/chains/evm/txm/docs/TRANSACTION_MANAGER_V2.md +++ b/core/chains/evm/txm/docs/TRANSACTION_MANAGER_V2.md @@ -11,4 +11,4 @@ - `txm_num_broadcasted_transactions`: total number of successful broadcasted transactions. - `txm_num_confirmed_transactions`: total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs. - `txm_num_nonce_gaps`: total number of nonce gaps created that the transaction manager had to fill. -- `txm_time_until_tx_confirmed`: The amount of time elapsed from a transaction being broadcast to being included in a block. \ No newline at end of file +- `txm_time_until_tx_confirmed`: The amount of time elapsed from a transaction being broadcast to being included in a block. diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go index 4a09d16d214..aa6b9f4575b 100644 --- a/core/chains/evm/txmgr/builder.go +++ b/core/chains/evm/txmgr/builder.go @@ -43,6 +43,7 @@ func NewTxm( keyStore keystore.Eth, estimator gas.EvmFeeEstimator, headTracker latestAndFinalizedBlockHeadTracker, + txmv2wrapper TxManager, ) (txm TxManager, err error, ) { @@ -70,7 +71,7 @@ func NewTxm( if txConfig.ResendAfterThreshold() > 0 { evmResender = NewEvmResender(lggr, txStore, txmClient, evmTracker, keyStore, txmgr.DefaultResenderPollInterval, chainConfig, txConfig) } - txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, evmBroadcaster, evmConfirmer, evmResender, evmTracker, evmFinalizer) + txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, evmBroadcaster, evmConfirmer, evmResender, evmTracker, evmFinalizer, txmv2wrapper) return txm, nil } @@ -90,8 +91,9 @@ func NewEvmTxm( resender *Resender, tracker *Tracker, finalizer Finalizer, + txmv2wrapper TxManager, ) *Txm { - return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, finalizer, client.NewTxError) + return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, finalizer, client.NewTxError, txmv2wrapper) } func NewTxmV2( diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 9f515c22be4..4e85556b1e2 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1286,7 +1286,7 @@ func TestORM_UpdateTxUnstartedToInProgress(t *testing.T) { evmTxmCfg := txmgr.NewEvmTxmConfig(ccfg.EVM()) ec := evmtest.NewEthClientMockWithDefaultChain(t) txMgr := txmgr.NewEvmTxm(ec.ConfiguredChainID(), evmTxmCfg, ccfg.EVM().Transactions(), nil, logger.Test(t), nil, nil, - nil, txStore, nil, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil, nil, nil) err := txMgr.XXXTestAbandon(fromAddress) // mark transaction as abandoned require.NoError(t, err) diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 9ee2396846d..c02406a1c69 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -86,7 +86,8 @@ func makeTestEvmTxm( lp, keyStore, estimator, - ht) + ht, + nil) } func TestTxm_SendNativeToken_DoesNotSendToZero(t *testing.T) { diff --git a/core/chains/legacyevm/evm_txm.go b/core/chains/legacyevm/evm_txm.go index 15731790f67..078bd413636 100644 --- a/core/chains/legacyevm/evm_txm.go +++ b/core/chains/legacyevm/evm_txm.go @@ -40,8 +40,9 @@ func newEvmTxm( ) if opts.GenTxManager == nil { + var txmv2 txmgr.TxManager if cfg.Transactions().TransactionManagerV2().Enabled() { - txm, err = txmgr.NewTxmV2( + txmv2, err = txmgr.NewTxmV2( ds, cfg, txmgr.NewEvmTxmFeeConfig(cfg.GasEstimator()), @@ -53,22 +54,25 @@ func newEvmTxm( opts.KeyStore, estimator, ) - } else { - txm, err = txmgr.NewTxm( - ds, - cfg, - txmgr.NewEvmTxmFeeConfig(cfg.GasEstimator()), - cfg.Transactions(), - cfg.NodePool().Errors(), - databaseConfig, - listenerConfig, - client, - lggr, - logPoller, - opts.KeyStore, - estimator, - headTracker) + if cfg.Transactions().TransactionManagerV2().DualBroadcast() != nil && *cfg.Transactions().TransactionManagerV2().DualBroadcast() { + return txmv2, err + } } + txm, err = txmgr.NewTxm( + ds, + cfg, + txmgr.NewEvmTxmFeeConfig(cfg.GasEstimator()), + cfg.Transactions(), + cfg.NodePool().Errors(), + databaseConfig, + listenerConfig, + client, + lggr, + logPoller, + opts.KeyStore, + estimator, + headTracker, + txmv2) } else { txm = opts.GenTxManager(chainID) } diff --git a/core/services/headreporter/prometheus_reporter_test.go b/core/services/headreporter/prometheus_reporter_test.go index 9fd42baa15e..9d2a2bb18a6 100644 --- a/core/services/headreporter/prometheus_reporter_test.go +++ b/core/services/headreporter/prometheus_reporter_test.go @@ -135,7 +135,8 @@ func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainCon lp, keyStore, estimator, - ht) + ht, + nil) require.NoError(t, err) cfg := configtest.NewGeneralConfig(t, nil) diff --git a/core/services/vrf/delegate_test.go b/core/services/vrf/delegate_test.go index 7ef3febd021..03abaae29b6 100644 --- a/core/services/vrf/delegate_test.go +++ b/core/services/vrf/delegate_test.go @@ -82,7 +82,7 @@ func buildVrfUni(t *testing.T, db *sqlx.DB, cfg chainlink.GeneralConfig) vrfUniv btORM := bridges.NewORM(db) ks := keystore.NewInMemory(db, utils.FastScryptParams, lggr) _, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) - txm, err := txmgr.NewTxm(db, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), nil, dbConfig, dbConfig.Listener(), ec, logger.TestLogger(t), nil, ks.Eth(), nil, nil) + txm, err := txmgr.NewTxm(db, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), nil, dbConfig, dbConfig.Listener(), ec, logger.TestLogger(t), nil, ks.Eth(), nil, nil, nil) orm := headtracker.NewORM(*testutils.FixtureChainID, db) require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), cltest.Head(51))) jrm := job.NewORM(db, prm, btORM, ks, lggr) diff --git a/core/services/vrf/v2/integration_v2_test.go b/core/services/vrf/v2/integration_v2_test.go index 6cbcc799e1b..c07ad6c782f 100644 --- a/core/services/vrf/v2/integration_v2_test.go +++ b/core/services/vrf/v2/integration_v2_test.go @@ -141,7 +141,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M _, _, evmConfig := txmgr.MakeTestConfigs(t) txmConfig := txmgr.NewEvmTxmConfig(evmConfig) txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil, - nil, txStore, nil, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil, nil, nil) return txm } diff --git a/core/services/vrf/v2/listener_v2_test.go b/core/services/vrf/v2/listener_v2_test.go index b7a8710c4f8..e2a2a703de8 100644 --- a/core/services/vrf/v2/listener_v2_test.go +++ b/core/services/vrf/v2/listener_v2_test.go @@ -40,7 +40,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M ec := evmtest.NewEthClientMockWithDefaultChain(t) txmConfig := txmgr.NewEvmTxmConfig(evmConfig) txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil, - nil, txStore, nil, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil, nil, nil) return txm }