Skip to content

Commit

Permalink
Txmv2 enablement (#15286)
Browse files Browse the repository at this point in the history
* TXMv2 alpha version

* TXM fixes

* Fix

* Update orchestrator

* Add builder

* Add txmv2 tests

* Update retry logic

* Add backoff mechanism

* Add multi address support

* Add backoff mechanism for broadcasting and backfilling

* Minor fix

* Add check to dummy keystore

* Fix inmemory store logging per address

* Remove unnecessary const

* Stuck tx detector alpha

* Update stuck tx detection

* Add stuck_tx_detection and dual broadcast client

* Add support for TXMv2

* Fix txm to work with enabled addresses from keystore

* Fix orchestrator's monitoring call

* Fix AttemptBuilder

* Enable DualBroadcast client

* Enable dual broadcast

* AttemptBuilder fixes

* Add AttemptBuilder service exception

* Add AttemptBuilder close

* Minor updates

* Switch DualBroadcast params to pointers

* Update DualBroadcast types to pointers

* Make purgable attempts empty

* Fix Idempotency in Store Manager

* Update trigger

* Fix lint

* Add context to client

* Fix more lint

* Fix lint

* Fix lint

* Fix lint

* Fix DualBroadcast client

* More lint fixes

* More lint fixes

* Fix lint

* Fix lint

* Fix lint final

* Fix races

* Update logs

* Fix lint

* Start documentation

* Update DummyKeystore Add method

* Txmv2 stuck tx detection (#15436)

* Stuck tx detector alpha

* Update stuck tx detection

* Add stuck_tx_detection and dual broadcast client

* Add support for TXMv2

* Fix orchestrator's monitoring call

* Fix AttemptBuilder

* Enable DualBroadcast client

* Switch DualBroadcast params to pointers

* Add context to client

* Fix lint

* Fix DualBroadcast client

* More lint fixes

* Fix lint

* Make nonce nullable

* Update configs

* Add prom metrics

* Add prom metrics

* Add transaction confirmation metric

* Improve logs

* Add Abandon support

* Address feedback

* Update tests

* Fix orchestrator log

* Fix Nonce log

* Improvements

* Update tests

* Add fixes

* Improvements

* Improve logs

* Move initialization of nonce

* Add Beholder metrics

* Improve InMemoryStorage

* Support different priceMax per key

* Upgrades

* Fix config tests

* Fix docs

* Fix config test lint

* Update configs

* Reuse transaction states

* Fix docs

* Deprecate DualBroadcastDetection

* Add health report
  • Loading branch information
dimriou authored Jan 20, 2025
1 parent 4e28497 commit 657bdc4
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 27 deletions.
39 changes: 38 additions & 1 deletion common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ type TxManager[
GetTransactionStatus(ctx context.Context, transactionID string) (state commontypes.TransactionStatus, err error)
}

type TxmV2Wrapper[
CHAIN_ID types.ID,
HEAD types.Head[BLOCK_HASH],
ADDR types.Hashable,
TX_HASH types.Hashable,
BLOCK_HASH types.Hashable,
SEQ types.Sequence,
FEE fees.Fee,
] interface {
services.Service
CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Reset(addr ADDR, abandon bool) error
}

type reset struct {
// f is the function to execute between stopping/starting the
// Broadcaster and Confirmer
Expand Down Expand Up @@ -112,6 +126,7 @@ type Txm[
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -147,6 +162,7 @@ func NewTxm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
txmv2wrapper TxmV2Wrapper[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
logger: logger.Sugared(lggr),
Expand All @@ -169,6 +185,7 @@ func NewTxm[
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
txmv2wrapper: txmv2wrapper,
}

if txCfg.ResendAfterThreshold() <= 0 {
Expand Down Expand Up @@ -207,6 +224,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

if b.txmv2wrapper != nil {
if err := ms.Start(ctx, b.txmv2wrapper); err != nil {
return fmt.Errorf("Txm: Txmv2 failed to start: %w", err)
}
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -237,6 +260,11 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr
f := func() {
if abandon {
err = b.abandon(addr)
if b.txmv2wrapper != nil {
if err2 := b.txmv2wrapper.Reset(addr, abandon); err2 != nil {
b.logger.Error("failed to abandon transactions for dual broadcast", "err", err2)
}
}
}
}

Expand Down Expand Up @@ -460,6 +488,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
if b.txmv2wrapper != nil {
err = b.txmv2wrapper.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -513,11 +547,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing
// Skipping CreateTransaction to avoid double send
if b.txmv2wrapper != nil && txRequest.Meta != nil && txRequest.Meta.DualBroadcast != nil && *txRequest.Meta.DualBroadcast {
return b.txmv2wrapper.CreateTransaction(ctx, txRequest)
}
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil {
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
return tx, fmt.Errorf("failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/ccip/ocrimpls/contract_transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ func makeTestEvmTxm(
lp,
keyStore,
estimator,
ht)
ht,
nil)
require.NoError(t, err, "can't create tx manager")

_, unsub := broadcaster.Subscribe(txm)
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/txm/docs/TRANSACTION_MANAGER_V2.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
- `txm_num_broadcasted_transactions`: total number of successful broadcasted transactions.
- `txm_num_confirmed_transactions`: total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs.
- `txm_num_nonce_gaps`: total number of nonce gaps created that the transaction manager had to fill.
- `txm_time_until_tx_confirmed`: The amount of time elapsed from a transaction being broadcast to being included in a block.
- `txm_time_until_tx_confirmed`: The amount of time elapsed from a transaction being broadcast to being included in a block.
6 changes: 4 additions & 2 deletions core/chains/evm/txmgr/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewTxm(
keyStore keystore.Eth,
estimator gas.EvmFeeEstimator,
headTracker latestAndFinalizedBlockHeadTracker,
txmv2wrapper TxManager,
) (txm TxManager,
err error,
) {
Expand Down Expand Up @@ -70,7 +71,7 @@ func NewTxm(
if txConfig.ResendAfterThreshold() > 0 {
evmResender = NewEvmResender(lggr, txStore, txmClient, evmTracker, keyStore, txmgr.DefaultResenderPollInterval, chainConfig, txConfig)
}
txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, evmBroadcaster, evmConfirmer, evmResender, evmTracker, evmFinalizer)
txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, evmBroadcaster, evmConfirmer, evmResender, evmTracker, evmFinalizer, txmv2wrapper)
return txm, nil
}

Expand All @@ -90,8 +91,9 @@ func NewEvmTxm(
resender *Resender,
tracker *Tracker,
finalizer Finalizer,
txmv2wrapper TxManager,
) *Txm {
return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, finalizer, client.NewTxError)
return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, finalizer, client.NewTxError, txmv2wrapper)
}

func NewTxmV2(
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ func TestORM_UpdateTxUnstartedToInProgress(t *testing.T) {
evmTxmCfg := txmgr.NewEvmTxmConfig(ccfg.EVM())
ec := evmtest.NewEthClientMockWithDefaultChain(t)
txMgr := txmgr.NewEvmTxm(ec.ConfiguredChainID(), evmTxmCfg, ccfg.EVM().Transactions(), nil, logger.Test(t), nil, nil,
nil, txStore, nil, nil, nil, nil, nil)
nil, txStore, nil, nil, nil, nil, nil, nil)
err := txMgr.XXXTestAbandon(fromAddress) // mark transaction as abandoned
require.NoError(t, err)

Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func makeTestEvmTxm(
lp,
keyStore,
estimator,
ht)
ht,
nil)
}

func TestTxm_SendNativeToken_DoesNotSendToZero(t *testing.T) {
Expand Down
36 changes: 20 additions & 16 deletions core/chains/legacyevm/evm_txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func newEvmTxm(
)

if opts.GenTxManager == nil {
var txmv2 txmgr.TxManager
if cfg.Transactions().TransactionManagerV2().Enabled() {
txm, err = txmgr.NewTxmV2(
txmv2, err = txmgr.NewTxmV2(
ds,
cfg,
txmgr.NewEvmTxmFeeConfig(cfg.GasEstimator()),
Expand All @@ -53,22 +54,25 @@ func newEvmTxm(
opts.KeyStore,
estimator,
)
} else {
txm, err = txmgr.NewTxm(
ds,
cfg,
txmgr.NewEvmTxmFeeConfig(cfg.GasEstimator()),
cfg.Transactions(),
cfg.NodePool().Errors(),
databaseConfig,
listenerConfig,
client,
lggr,
logPoller,
opts.KeyStore,
estimator,
headTracker)
if cfg.Transactions().TransactionManagerV2().DualBroadcast() != nil && *cfg.Transactions().TransactionManagerV2().DualBroadcast() {
return txmv2, err
}
}
txm, err = txmgr.NewTxm(
ds,
cfg,
txmgr.NewEvmTxmFeeConfig(cfg.GasEstimator()),
cfg.Transactions(),
cfg.NodePool().Errors(),
databaseConfig,
listenerConfig,
client,
lggr,
logPoller,
opts.KeyStore,
estimator,
headTracker,
txmv2)
} else {
txm = opts.GenTxManager(chainID)
}
Expand Down
3 changes: 2 additions & 1 deletion core/services/headreporter/prometheus_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainCon
lp,
keyStore,
estimator,
ht)
ht,
nil)
require.NoError(t, err)

cfg := configtest.NewGeneralConfig(t, nil)
Expand Down
2 changes: 1 addition & 1 deletion core/services/vrf/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func buildVrfUni(t *testing.T, db *sqlx.DB, cfg chainlink.GeneralConfig) vrfUniv
btORM := bridges.NewORM(db)
ks := keystore.NewInMemory(db, utils.FastScryptParams, lggr)
_, dbConfig, evmConfig := txmgr.MakeTestConfigs(t)
txm, err := txmgr.NewTxm(db, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), nil, dbConfig, dbConfig.Listener(), ec, logger.TestLogger(t), nil, ks.Eth(), nil, nil)
txm, err := txmgr.NewTxm(db, evmConfig, evmConfig.GasEstimator(), evmConfig.Transactions(), nil, dbConfig, dbConfig.Listener(), ec, logger.TestLogger(t), nil, ks.Eth(), nil, nil, nil)
orm := headtracker.NewORM(*testutils.FixtureChainID, db)
require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), cltest.Head(51)))
jrm := job.NewORM(db, prm, btORM, ks, lggr)
Expand Down
2 changes: 1 addition & 1 deletion core/services/vrf/v2/integration_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M
_, _, evmConfig := txmgr.MakeTestConfigs(t)
txmConfig := txmgr.NewEvmTxmConfig(evmConfig)
txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil,
nil, txStore, nil, nil, nil, nil, nil)
nil, txStore, nil, nil, nil, nil, nil, nil)

return txm
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/vrf/v2/listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M
ec := evmtest.NewEthClientMockWithDefaultChain(t)
txmConfig := txmgr.NewEvmTxmConfig(evmConfig)
txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil,
nil, txStore, nil, nil, nil, nil, nil)
nil, txStore, nil, nil, nil, nil, nil, nil)

return txm
}
Expand Down

0 comments on commit 657bdc4

Please sign in to comment.